当前位置: 首页 > news >正文

详解node.js中的可读流(Readable)和可写流(Writeable)

Node.js的流就是为了在有限的内存中实现我们操作"海量"数据的目标。

流是一组有序的,有起点和终点的字节数据传输手段,它是一个抽象的接口,被 Node 中的很多对象所实现。node里很多内容都应用到流,比如HTTP 服务器request和response对象都是流。

它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理。

Node.js中Stream 有四种流类型。

可读流(Readable)

  • 可读流分为:流动模式(flowing mode)暂停模式(paused mode)

  • 可读流在创建时都是暂停模式。暂停模式和流动模式可以互相转换。

1) 流动模式(flowing mode)

流动模式下,数据会源源不断地生产出来,形成“流动”现象。监听流的data事件便可进入该模式。

2) 暂停模式(paused mode)

暂停模式下,需要显示地调用read(),触发data事件。

在初始状态下,监听data事件,会使流进入流动模式。但如果在暂停模式下,监听data事件并不会使它进入流动模式。为了消耗流,需要显示调用read()方法。

3)相互转化

  • 如果不存在管道目标,调用readable.resume()可使流进入流动模式

  • 如果存在管道目标,调用 stream.unpipe()并取消'data'事件监听

创建可读流

var rs = fs.createReadStream(path,[options]);复制代码
  1. path读取文件的路径

  2. options

    • flags 打开文件要做的操作,默认为'r'读取

    • encoding 默认为null,代表buffer。如果指定utf8编码highWaterMark要大于3个字节

    • start 开始读取的索引位置

    • end 结束读取的索引位置(包括结束位置)

    • highWaterMark 读取缓存区默认的大小64kb

    • autoClose 读取完毕后是否自动关闭

相关方法

flowing流动模式
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'),{ //这里的参数一般不会写
  flags:'r',//文件的操作是读取操作
  encoding:'utf8', // 默认是null null代表的是buffer
  autoClose:true, // 读取完毕后自动关闭
  highWaterMark:3,// 默认是64k  64*1024b
  start:0, //读取的起始位置 
  end:3 // 读取的结束位置,包前又包后,相当于闭区间
})
//默认情况下 不会将文件中的内容输出
//内部会先创建一个buffer先读取3b
//相当于有盖子的水管,不会流出来,存储在管中
​
//有两种模式 非流动模式/暂停模式
//因为创建时第二个参数一般不会写,读出来的类型是buffer,这个方法可以指定编码
rs.setEncoding('utf8');
​
//打开文件
rs.on('open',function(data){ 
  console.log(data)
})
//关闭文件
rs.on('close',function(data){ 
  console.log(data)
})
//有错误就会报错误
rs.on('err',function(data){ 
  console.log(data)
})
​
//暂停模式->流动模式
//流动模式只要监听了会疯狂的触发data事件,直到读取完毕
rs.on('data',function(data){
  console.log(data);
  //一打开水龙头就哗哗出水,有个方法可以让它暂停
  rs.pause(); //暂停方法,表示暂停读取,暂停data事件触发
})
setInterval(function(){
  rs.resume(); //恢复data事件的触发,变为流动模式继续读取
},3000)
rs.on('end',function(data){ //先end再close关闭
  console.log(data)
})
    复制代码
paused暂停模式
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'));
​
rs.setEncoding('utf8');
​
// 当我只要创建一个流,就会先把缓存区填满,等待着你自己消费
// 如果当前缓存区被清空后会再次触发readable事件
// 当你消费小于最高水位线时,会自动添加highWater这么多数据
rs.on('readable', () => {
    let d = rs.read(1)
    console.log(d)
})复制代码

实现可读流功能原理

流动模式
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
    constructor(path,options){
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null
​
        this.open();//打开文件 fd
​
        this.flowing = null; // null就是暂停模式
        // 看是否监听了data事件,如果监听了 就要变成流动模式
​
        // 要建立一个buffer 这个buffer就是要一次读多少
        this.buffer = Buffer.alloc(this.highWaterMark);
​
        this.pos = this.start; // pos 读取的位置 可变 start不变的
        this.on('newListener',(eventName,callback)=>{
            if(eventName === 'data'){
                // 相当于用户监听了data事件
                this.flowing  = true;
                // 监听了 就去读
                this.read(); // 去读内容了
            }
        })
    }
    read(){
        // 此时文件还没打开呢
        if(typeof this.fd !== 'number'){
            // 当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd肯定有了
            return this.once('open',()=>this.read())
        }
        // 此时有fd了
        let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
            // 读到了多少个 累加
            if(bytesRead>0){
                this.pos+= bytesRead;
                let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
                this.emit('data',data);
                // 当读取的位置 大于了末尾 就是读取完毕了
                if(this.pos > this.end){
                    this.emit('end');
                    this.destroy();
                }
                if(this.flowing) { // 流动模式继续触发
                    this.read(); 
                }
            }else{
                this.emit('end');
                this.destroy();
            }
        });
    }
    resume(){
        this.flowing = true;
        this.read();
    }
    pause(){
        this.flowing = false;
    }
    destroy(){
        // 先判断有没有fd 有关闭文件 触发close事件
        if(typeof this.fd ==='number'){
            fs.close(this.fd,()=>{
                this.emit('close');
            });
            return;
        }
        this.emit('close'); // 销毁
    };
    open(){
        // copy 先打开文件
        fs.open(this.path,this.flags,(err,fd)=>{
            if(err){
                this.emit('error',err);
                if(this.autoClose){ // 是否自动关闭
                    this.destroy();
                }
                return;
            }
            this.fd = fd; // 保存文件描述符
            this.emit('open'); // 文件打开了
        });
    }
}
module.exports = ReadStream;复制代码
pipe

.pipe()函数是接受一个源头src并将数据输出到一个可写的流dst

简单来说,边读边写东西,读太快,来不及写,就先暂停读,等写完了再继续读。

let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');
let rs = new ReadStream(path.join(__dirname,'./1.txt'),{
    highWaterMark:4
});
let ws = new WriteStream(path.join(__dirname,'./2.txt'),{
    highWaterMark:1
});
// 读四个,写一个
rs.pipe(ws); // pipe就是读一点写一点复制代码

pipe原理实现,写在ReadStream的方法中

pipe(ws){
    this.on('data',(chunk)=>{
        let flag = ws.write(chunk);
        if(!flag){
            this.pause();
        }
    });
    ws.on('drain',()=>{
        this.resume();
    })
}复制代码
暂停模式
let fs = require('fs');
let EventEmitter = require('events');
//当读取内容大于缓存区,重新计算读取数量n的大小的方法
function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || 'r';
​
        this.buffers = []; // 缓存区 
        this.pos = this.start;
        this.length = 0; // 缓存区大小
        this.emittedReadable = false;
        this.reading = false; // 不是正在读取的
        this.open();
        this.on('newListener', (eventName) => {
            if (eventName === 'readable') {
                this.read();
            }
        })
    }
    read(n) { // 想取1个
​
        if(n>this.length){
            // 更改缓存区大小  读取五个就找 2的几次放最近的
            this.highWaterMark = computeNewHighWaterMark(n)
            this.emittedReadable = true;
            this._read();
        }
​
​
        // 如果n>0 去缓存区中取吧
        let buffer=null;
        let index = 0; // 维护buffer的索引的
        let flag = true;
        if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多
            // 在缓存区中取 [[2,3],[4,5,6]]
            buffer = Buffer.alloc(n); // 这是要返回的buffer
            let buf;
            while (flag&&(buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){ // 拷贝够了 不需要拷贝了
                        flag = false;
                        this.length -= n;
                        let bufferArr = buf.slice(i+1); // 取出留下的部分
                        // 如果有剩下的内容 在放入到缓存中
                        if(bufferArr.length > 0){
                            this.buffers.unshift(bufferArr);
                        }
                        break;
                    }
                }
            }
        }
        // 当前缓存区 小于highWaterMark时在去读取
        if (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(!this.reading){
                this.reading = true;
                this._read(); // 异步的
            }
        }
        return buffer
    }
    // 封装的读取的方法
    _read() {
        // 当文件打开后在去读取
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this._read());
        }
        // 上来我要喝水 先倒三升水 []
        let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if (bytesRead > 0) {
                // 默认读取的内容放到缓存区中
                this.buffers.push(buffer.slice(0, bytesRead));
                this.pos += bytesRead; // 维护读取的索引
                this.length += bytesRead;// 维护缓存区的大小
                this.reading = false;
                // 是否需要触发readable事件
                if (this.emittedReadable) {
                    this.emittedReadable = false; // 下次默认不触发
                    this.emit('readable');
                }
            } else {
                this.emit('end');
                this.destroy();
            }
        })
    }
    destroy() {
        if (typeof this.fd !== 'number') {
            return this.emit('close')
        }
        fs.close(this.fd, () => {
            this.emit('close')
        })
    }
    open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        });
    }
}
​
module.exports = ReadStream;复制代码

可写流(Writeable)

创建可写流

var ws = fs.createWriteStream(path,[options]);

  1. path写入的文件路径

  2. options

    • flags打开文件要做的操作,默认为'w'

    • encoding默认为utf8

    • highWaterMark写入缓存区的默认大小16kb

相关方法

let fs=require('fs');
let path=require('path');
//写的时候文件不存在,会创建文件
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    mode:0o666,
    autoClose:true,
    highWaterMark:3, // 默认写是16k
    encoding:'utf8',
    start:0
});
​
//第一个参数写入的数据必须是字符串或者Buffer
//第二个参数写入以什么编码写进去
//第三个参数callback
//有返回值,代表是否能继续写,写的时候,有个缓存区的概念。但是返回false,也不会丢失,就是会把内容放到内存中
let flag=ws.write(1+'','utf8',()=>{})//这是异步的方法
​
//传入的参数,写完后也会写入文件内
ws.end('ok'); //当写完后,就不能再继续写了
​
//抽干方法,当写入完后会触发drain方法
//缓存区必须满了,满了清空后才会触发drain
//如果调用end后,再调用这个方法没有意义了
ws.on('drain',function(){
  console.log('drain')
})
​复制代码

实现可写流功能原理

let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
    constructor(path,options){
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark||16*1024;
        this.autoClose = options.autoClose||true;
        this.mode = options.mode;
        this.start = options.start||0;
        this.flags = options.flags||'w';
        this.encoding = options.encoding || 'utf8';
​
        // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中
        // 在源码中是一个链表 => []
​
        this.buffers = [];
​
        // 标识 是否正在写入
        this.writing = false;
​
        // 是否满足触发drain事件
        this.needDrain = false;
​
        // 记录写入的位置
        this.pos = 0;
​
        // 记录缓存区的大小
        this.length = 0;
        this.open();
    }
    destroy(){
        if(typeof this.fd !=='number'){
            return this.emit('close');
        }
        fs.close(this.fd,()=>{
            this.emit('close')
        })
    }
    open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit('error',err);
                if(this.autoClose){
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        })
    }
    write(chunk,encoding=this.encoding,callback=()=>{}){
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
        // write 返回一个boolean类型 
        this.length+=chunk.length; 
        let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小
        this.needDrain = !ret; // 是否需要触发needDrain
        // 判断是否正在写入 如果是正在写入 就写入到缓存区中
        if(this.writing){
            this.buffers.push({
                encoding,
                chunk,
                callback
            }); // []
        }else{
            // 专门用来将内容 写入到文件内
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback();
                this.clearBuffer();
            }); // 8
        }
        return ret;
    }
    clearBuffer(){
        let buffer = this.buffers.shift();
        if(buffer){
            this._write(buffer.chunk,buffer.encoding,()=>{
                buffer.callback();
                this.clearBuffer()
            });
        }else{
            this.writing = false;
            if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件
                this.needDrain = false;
                this.emit('drain');
            }
        }
    }
    _write(chunk,encoding,callback){
        if(typeof this.fd !== 'number'){
            return this.once('open',()=>this._write(chunk,encoding,callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.length -= byteWritten;
            this.pos += byteWritten;
            
            callback(); // 清空缓存区的内容
        });
    }
}
​
module.exports = WriteStream;复制代码


啊~~文章似乎太长太啰嗦了,看来怎么把给自己看的笔记整理成一个好的文章也是一门学问!


相关文章:

  • 一文看懂JeffDean等提出的ENAS到底好在哪?
  • MXNet 作者李沐:用深度学习做图像分类,教程+代码
  • Map集合、散列表、红黑树介绍
  • centos7.4系统的虚拟机网络配置教程
  • win10 php安装redis 扩展
  • 6、通过Appium Desktop 实现录制功能
  • 文件上传漏洞攻击
  • Micropython TPYBoard V10X拼插编程实践之定时器 代码不精通?...
  • 从抖音关闭评论,看服务治理的重要性
  • diango-团队介绍
  • ModeBusRtu概述
  • Project中最常用的注意点
  • 云时代,ERP选型莫走入低价、免费误区
  • 以太坊智能合约开发第六篇:truffle开发框架
  • CSS居中完全指南——构建CSS居中决策树
  • [ JavaScript ] 数据结构与算法 —— 链表
  • [Vue CLI 3] 配置解析之 css.extract
  • Android 控件背景颜色处理
  • C++回声服务器_9-epoll边缘触发模式版本服务器
  • classpath对获取配置文件的影响
  • CSS3 变换
  • ECMAScript 6 学习之路 ( 四 ) String 字符串扩展
  • es6(二):字符串的扩展
  • ES6简单总结(搭配简单的讲解和小案例)
  • express.js的介绍及使用
  • Nginx 通过 Lua + Redis 实现动态封禁 IP
  • nodejs:开发并发布一个nodejs包
  • Shadow DOM 内部构造及如何构建独立组件
  • 聚类分析——Kmeans
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 设计模式 开闭原则
  • 深度学习在携程攻略社区的应用
  • 实战:基于Spring Boot快速开发RESTful风格API接口
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • 交换综合实验一
  • ​决定德拉瓦州地区版图的关键历史事件
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • #我与Java虚拟机的故事#连载05:Java虚拟机的修炼之道
  • (1)(1.8) MSP(MultiWii 串行协议)(4.1 版)
  • (1/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (2022 CVPR) Unbiased Teacher v2
  • (C语言)求出1,2,5三个数不同个数组合为100的组合个数
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (附源码)springboot 基于HTML5的个人网页的网站设计与实现 毕业设计 031623
  • (免费领源码)python#django#mysql校园校园宿舍管理系统84831-计算机毕业设计项目选题推荐
  • (三分钟了解debug)SLAM研究方向-Debug总结
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • . Flume面试题
  • .gitignore文件---让git自动忽略指定文件
  • .net 生成二级域名
  • .NET开源项目介绍及资源推荐:数据持久层 (微软MVP写作)
  • .NET企业级应用架构设计系列之开场白
  • .考试倒计时43天!来提分啦!
  • @Autowired和@Resource装配