当前位置: 首页 > news >正文

窥探Node.js里的Stream

流是什么?

流的常见的应用场景有哪些?

流的实现机制是什么?

流是什么?

什么是流呢?我画了个污水处理的简化流程来表达我对流的理解(原谅我拙劣的想象力和画技)。

这也是我早期在使用gulp的时候,对它的工作机制的理解。污水通过进水口进入,通过污水处理器中一系列的处理,最终会在出水口流出清水。

回归到代码中来,使用gulp时,我输入一个less文件,gulp会使用less插件、自动补全插件、压缩插件等一系列的处理,最终输出我们想要的css文件。一条龙服务,有木有?

在学习流的过程中,我还了解到了一个有趣的模型——生产者/消费者模型。理解了这个模型,那理解流就基本没什么压力了。

举个生活中的例子,帮助我们理解生产者/消费者模型。

生产者,比作生活中的工厂,不断生成泡面。

消费者,比作广大人民群众,需要不断的买泡面续命。

如果我直接去工厂里买泡面,虽然会很便宜,但人家零售就会亏本,所以只会批发给我,这足以刷爆我的小金库。

对于工厂,如果直接面对买家,那工厂生产一天的泡面,就要停产一个月,因为接下来一个月都需要去卖泡面。很快,工厂就会倒闭了。

既然工厂和消费者不能直接对接,那如果引入一个第三方——超市,作为工厂的代理商。那么,工厂就可以专注于生产泡面了,生产的泡面直接批发给超市。而消费者也不用去工厂批发了,虽然贵一点,但自由啊。想吃,就去超市买一包。

通过以上两个现实小栗子,宏观理解一下流。

三个角色:生产者、消费者、第三方中介

下方虚线的第三方中介,与上方的第三方中介,就是一个。之所以分开表示,是为了突出消费者与生产者通过第三方中介解耦了。

流的一些应用场景

我们从Node.js官网中可以找到,Node.js有四种基本的流类型:

  • Readable——可读流
  • Writable——可写流
  • Duplex——可读写的流,也叫双工流
  • Transform——在读写过程中可以修改和变换数据的一种特殊的 Duplex 流

Node.js中很多内建/核心模块都是基于流实现的:

由此图可看出,流在Node.js中的地位。可以说理解了流,将会很大程度的帮助我们去理解和使用上图列举的内建模块。

流的实现机制

了解流内部机制的最佳方式除了看 Node.js 官方文档,还可以去看看 Node.js 的 源码:

  • _stream_readable.js
  • _stream_writable.js
  • _stream_duplex.js
  • _stream_transform.js

从源码中我们可以看出,这四种基本类型的流都是流的一种抽象,提供给开发者去扩展使用的,所以源码看起来得有一定的使用基础。接下来我将另辟蹊径,不借助这四种基本类型的流,去实现 fs readable stream 的主要逻辑,管中窥豹,加深对流的理解

fs Readable Stream

Readable stream有两种模式:

  • flowing:在该模式下,会尽快获取数据向外输出。因此如果没有事件监听,也没有pipe()来引导数据流向,数据可能会丢失。
  • paused:默认模式。在该模式下,需要手动调用stream.read(..)来获取数据。

可以通过以下几种方法切换到flowing模式:

  • 添加'data'事件监听器
  • 调用stream.resume(..)方法
  • 调用stream.pipe()方法将数据发送给消费者Writable

可以通过以下几种方法切换到paused模式:

  • 如果没有调用stream.pipe(..),则调用stream.pause(..)即可
  • 如果有调用stream.pipe(..),那么需要通过stream.unpipe(..)移除所有的pipe

需要特别注意以下几点:

  1. 只有提供消费者去消费数据,比如,添加'data'事件监听器,可读流才会去生产数据。
  2. 如果移除'data'事件监听器,将不会自动的停止流。
  3. 如果调用了stream.pipe(..),再调用stream.pause(),将不会停止这个流。
  4. 如果可读流被切换到了流的模式,但是却没有添加'data'事件监听器,那么数据将会丢失掉。比如调用了stream.resume(),却没有添加'data'事件监听器,或者'data'事件监听器被移除了。
  5. 选择一种方式去消耗可读流生产的数据。比较推荐stream.pipe(..)。也可使用可控性比较强的事件机制,再配合readable.pause()/readable.resume()APIs
  6. 如果readabledata被同时使用了,那么readable事件的优先级会比data事件高。此时,必须在readable事件内显示调用stream.read(..)才能读到数据。

更多API使用可参考官网stream_readable_streams

flowing模式

下面尝试实现文件可读流的流动模式,一观它的内部机制。

// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');

// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法
// 由此可看出,流就是基于事件机制实现的
util.inherits(FsReadableStream, EventEmitter);

// 声明一个文件可读流的构造函数,并初始化相关参数
function FsReadableStream(path, options) {
    const self = this; // 防止this指针的指向混乱

    // 这里为了主要说明实现流程,省略参数的边界限制。
    self.path = path;

    // 打开文件时的参数
    // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
    self.flags = options.flags || 'r';
    self.mode = options.mode || 0o66;

    // 文件内容的读取起止位置
    self.start = options.start || 0;
    self.end = options.end;
    // 每次读取内容的水位线,即一次读取,最大缓存
    self.highWaterMark = options.highWaterMark || 64 * 1024;
    // 内容读取完毕之后,是否自动关闭文件
    self.autoClose = options.autoClose === undefined ? true : options.autoClose;
    // 最后输出的数据将以何种编码方式解码
    self.encoding = options.encoding || 'utf8';

    // 文件有效的描述符
    self.fd = null;

    // 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度
    self.pos = self.start;

    // 申请水位线大小的空间作为buffer缓存
    self.buffer = Buffer.alloc(self.highWaterMark);

    // new这个构造函数时,就打开文件,为后续做准备
    self.open();

    // 模式  初始为暂停模式
    self.flowing = null;

    // 一旦有新的事件被监听,且,是'data'事件,
    // 就将模式切换至流动模式,并读取数据
    self.on('newListener', function (eventName) {
        if (eventName === 'data') {
            self.flowing = true;
            self.read();
        }
    });
}

// 在对文件操作之前,需要先打开文件,获取文件的有效描述符
FsReadableStream.prototype.open = function () {
    const self = this;
    fs.open(self.path, self.flags, self.mode, function (err, fd) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        self.fd = fd;
        self.emit('open', fd);
    });
};

// 读取文件里的内容
FsReadableStream.prototype.read = function () {
    const self = this;
    // self.open()是异步方法,此时,需判断文件是否被打开了
    if (typeof self.fd !== 'number') {
        // 文件未打开,可以添加open事件监听器
        self.once('open', self.read);
        return;
    }
    // 需计算每次需要读取多少数据。
    const howMuchToRead = self.end ? Math.min(self.highWaterMark, self.end - self.pos + 1) : self.highWaterMark;
    fs.read(self.fd, self.buffer, 0, howMuchToRead, self.pos, function (err, bytesRead) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        if (bytesRead > 0) {
            // 更新读取位置
            self.pos = self.pos + bytesRead;
            // 有可能读取的内容长度比buffer缓存长度小,就必须截取出来,防止乱码情况
            const data = self.encoding ? self.buffer.slice(0, bytesRead).toString(self.encoding) : self.buffer.slice(0, bytesRead);
            self.emit('data', data);
            // 如果下一次读取的起始位置比结束位置要大,则表明已经读完了
            if (self.pos > self.end) {
                self.emit('end');
                if (self.autoClose) {
                    self.destroy();
                }
            }
            // 如果仍然处于流动模式,将会继续读取数据
            if (self.flowing) {
                self.read();
            }
        } else {
            // 文件内容已经读取完毕了
            self.emit('end');
            if (self.autoClose) {
                self.destroy();
            }
        }
    });
};

// 将流动模式切换到暂停模式
FsReadableStream.prototype.pause = function () {
    if (this.flowing !== false) {
        this.flowing = false;
    }
};

// 将暂停模式切换到流动模式
FsReadableStream.prototype.resume = function () {
    // 如果直接调用resume,却没有添加data监听器
    // 数据则会丢失
    if (!this.flowing) {
        this.flowing = true;
        this.read();
    }
};

// 关闭文件
FsReadableStream.prototype.destroy = function () {
    const self = this;
    if (typeof self.fd === 'number') {
        fs.close(self.fd, function (err) {
            if (err) {
                self.emit('error', err);
                return;
            }
            self.fd = null;
            self.emit('close');
        });
        return;
    }
    this.emit('close');
};
复制代码

我们会发现流动模式,是会源源不断的生成数据的,直到数据源枯竭为止。当然,也可以通过stream.pause(..)/stream.resume去精准控制。这种模式的控制权在于开发者,开发者必须熟悉这种模式的运行机制,谨慎运用,否则很容易出现,消费者被撑爆或者数据中途丢失的情况。

paused模式

由于流动模式和暂停模式是互斥的,所以采用分开实现可读流的两种模式。暂停模式下,我们需要监听另外一个事件——'readable',并显示调用stream.read(n)才能读到数据。

// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js
// 文件可读流的暂停模式
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');

// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法
// 由此可看出,流就是基于事件机制实现的
util.inherits(FsReadableStream, EventEmitter);

// 声明一个文件可读流的构造函数,并初始化相关参数
function FsReadableStream(path, options) {
    const self = this; // 防止this指针的指向混乱

    // 这里为了主要说明实现流程,省略参数的边界限制。
    self.path = path;

    // 打开文件时的参数
    // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
    self.flags = options.flags || 'r';
    self.mode = options.mode || 0o66;

    // 文件内容的读取起止位置
    self.start = options.start || 0;
    self.end = options.end;
    // 每次读取内容的水位线,即一次读取,最大缓存
    self.highWaterMark = options.highWaterMark || 64 * 1024;
    // 内容读取完毕之后,是否自动关闭文件
    self.autoClose = options.autoClose === undefined ? true : options.autoClose;
    // 最后输出的数据将以何种编码方式解码
    self.encoding = options.encoding || 'utf8';

    // 文件有效的描述符
    self.fd = null;

    // 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度
    self.pos = self.start;

    // 作为缓存存放每次读到的数据  [Buffer, Buffer, Buffer...]
    self.buffers = [];
    // 当前缓存的长度   self.buffers.length 只能读到数组有多少个元素。
    // 这里是缓存 buffers 每一项的长度之和
    self.length = 0;

    // 因为读取文件是异步操作,所以这里需要一个标记
    // 如果处于正在读状态,则将数据存放在缓存中
    this.reading = false;
    // 是否达到 发送 'readable' 事件的条件
    // 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。
    // 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null
    this.emittedReadable = false;

    // new这个构造函数时,就打开文件,为后续做准备
    self.open();

    // 一旦有新的事件被监听,且,是'readable'事件,
    // 开启 emittedReadable
    self.on('newListener', function (eventName) {
        if (eventName === 'readable') {
            self.read();
        }
    });
}

FsReadableStream.prototype.read = function (n) {
    const self = this;
    let buffer = null;
    // 当索要的数据长度  大于  缓存区的长度
    if (n > self.length) {
        // 此时要索要的数据,超过了缓存大小
        // 就会提升水位线,来适应这种需求量
        // computeNewHighWaterMark 是node源码中提高水位线的方法
        self.highWaterMark = computeNewHighWaterMark(n);
        self.emitReadable = true;
        self._read();
    }
    // 当索要的数据长度  大于  0  且  小于等于  缓存区的长度
    if (n > 0 && n <= self.length) {
        // 先申请Buffer内存
        buffer = Buffer.alloc(n);
        let index = 0; // 循环次数
        let flag = true; // 控制while的标记
        let b;
        while (flag && (b = self.buffers.shift())) {
            for (let i = 0; i < b.length; i++) {
                buffer[index++] = b[i]; // 赋值
                if (n === index) {
                    let arr = b.slice(index);
                    if (arr.length) {
                        // 不要的 再塞回缓存
                        self.buffers.unshift(arr);
                    }
                    self.length = self.length - n;
                    flag = false;
                }
            }
        }
    }
    // 如果当期缓存区没有数据
    if (self.length === 0) {
        self.emittedReadable = true;
    }
    // 当缓存区的数据长度  小于  水位线了   就去生成数据,继续放在缓存区
    if (self.length < self.highWaterMark) {
        if (!self.reading) {
            self.reading = true;
            self._read();
        }
    }
    // 返回读到的数据
    return buffer;
};

FsReadableStream.prototype._read = function () {
    const self = this;
    if (typeof self.fd !== 'number') {
        return self.once('open', self._read);
    }
    const buffer = Buffer.alloc(self.highWaterMark);
    fs.read(self.fd, buffer, 0, self.highWaterMark, self.pos, function (err, bytesRead) {
        if (bytesRead > 0) {
            // 默认将读取的内容放到缓存区中
            self.buffers.push(buffer.slice(0, bytesRead));
            self.pos = self.pos + bytesRead; // 维护读取的索引
            self.length = self.length + bytesRead; // 维护缓存区的大小
            self.reading = false; // 读取完成
            // 是否需要触发readable事件
            if (self.emittedReadable) {
                self.emittedReadable = false; // 下次默认不触发
                self.emit('readable');
            }
        } else {
            self.emit('end');
            if (self.autoClose) {
                self.destroy();
            }
        }
    });
};

// 在对文件操作之前,需要先打开文件,获取文件的有效描述符
FsReadableStream.prototype.open = function () {
    const self = this;
    fs.open(self.path, self.flags, self.mode, function (err, fd) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        self.fd = fd;
        self.emit('open', fd);
    });
};

// 关闭文件
FsReadableStream.prototype.destroy = function () {
    const self = this;
    if (typeof self.fd === 'number') {
        fs.close(self.fd, function (err) {
            if (err) {
                self.emit('error', err);
                return;
            }
            self.fd = null;
            self.emit('close');
        });
        return;
    }
    this.emit('close');
};
复制代码

这种模式,我们会发现,'readable'事件是告诉我们什么时候可以去索取数据了。如果直接去调stream.read(n)的话,会因为fs.read(..)异步操作,还没将数据读出并放至缓存区,导致结果将返回null。

只要缓存区内容被消耗至水位线以下,就会自动续杯,生成水位线大小的数据放到缓存中。

那什么时候会触发'readable'事件呢?缓存区为空,然后生产了水位线大小的数据放在缓存区之后,便会触发。下一次触发的时机,仍然是缓存区被消耗干净了,再次续满杯之后。

stream.read(n)想要多少数据,就传入多长。可以通过stream.length查看当前缓存区数据的长度,再决定索取多少数据。实际场景运用中则需要使用stream._readableState.length

fs Writable Stream

fs writable stream它的机制和可读流有些相似。话不多说,先上代码:

// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');

// 使用Node.js内部工具模块,让文件可写流继承事件的很多事件方法
// 由此可看出,流就是基于事件机制实现的
util.inherits(FsWritableStream, EventEmitter);

// 声明一个文件可写流的构造函数,并初始化相关参数
function FsWritableStream(path, options) {
    const self = this; // 防止this指针的指向混乱

    // 这里为了主要说明实现流程,省略参数的边界限制。
    self.path = path;

    // 打开文件时的参数
    // 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
    self.flags = options.flags || 'r';
    self.mode = options.mode || 0o66;

    // 文件内容的写入开始位置
    self.start = options.start || 0;
    // 每次写入内容的水位线,即最大缓存
    self.highWaterMark = options.highWaterMark || 64 * 1024;
    // 内容写入完毕之后,是否自动关闭文件
    self.autoClose = options.autoClose === undefined ? true : options.autoClose;
    // 告诉程序写入的数据将以何种编码方式解码
    self.encoding = options.encoding || 'utf8';

    // 文件有效的描述符
    self.fd = null;

    // 文件写入的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次写入的内容长度
    self.pos = self.start;

    // 作为缓存,存放来不及写入文件的数据  [Buffer, Buffer, Buffer...]
    self.buffers = [];
    // 当前缓存的长度   self.buffers.length 只能读到数组有多少个元素。
    // 这里是缓存 buffers 每一项的长度之和
    self.length = 0;

    // 因为读取文件是异步操作,所以这里需要一个标记
    // 如果处于正在读状态,则将数据存放在缓存中
    this.writing = false;

    // 控制是否通知'drain'事件监听器,表示,缓存区从满状态,被消耗空了
    self.needDrain = false;

    // new这个构造函数时,就打开文件,为后续做准备
    self.open();
}

FsWriteStream.prototype.open = function () {
    const self = this;
    fs.open(self.path, self.flags, self.mode, function (err, fd) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        self.fd = fd;
        self.emit('open', fd);
    });
};

FsWriteStream.prototype.destroy = function () {
    const self = this;
    if (typeof self.fd === 'number') {
        fs.close(self.fd, function (err) {
            if (err) {
                self.emit('error', err);
                return;
            }
            self.emit('close');
        });
    } else {
        self.emit('close');
    }
};

// 主动发起调用,这里默认三个参数都会传
FsWriteStream.prototype.write = function (chunk, encoding, callback) {
    const self = this;
    const bufferChunk = Buffer.isBuffer(chunk) && chunk || Buffer.from(chunk, encoding);
    self.length = self.length + bufferChunk.length;
    // 如果当前缓存长度 小于 水位线,表示缓存区未满
    const ret = self.length < self.highWaterMark;
    // 当缓存区满了,则打开 向drain事件 发通知的开关
    self.needDrain = !ret;
    if (self.writing) {
        // 正在 写入/消费 数据,所以先存到缓存中
        self.buffers.push({
            chunk,
            encoding,
            callback,
        });
    } else {
        // 相当于去发ajax,这里先来一个loading
        self.writing = true;
        self._write(chunk, encoding, function () {
            callback();
            // 当写入完成时,清除缓存中的内容
            self.clearBuffer();
        });
    }
    // 每次调用write时,到会返回当前缓存区是否满了
    return ret;
};

FsWriteStream.prototype._write = function (chunk, encoding, callback) {
    const self = this;
    if (typeof self.fd !== 'number') {
        self.once('open', function () {
            self._write(chunk, encoding, callback);
        });
        return;
    }
    fs.write(self.fd, chunk, 0, chunk.length, self.pos, function (err, writtenBytes) {
        if (err) {
            self.emit('error', err);
            self.writing = null;
            if (self.autoClose) {
                self.destroy();
            }
            return false;
        }
        // 长度 减掉已经消费成功的数据长度
        self.length = self.length - writtenBytes;
        // 更新下一次写入的开始位置
        self.pos = self.pos + writtenBytes;
        callback();
    });
};

FsWriteStream.prototype.clearBuffer = function () {
    const self = this;
    const buffer = self.buffers.shift();
    if (buffer) {
        // 如果缓存区仍有数据,则继续消费数据
        self._write(buffer.chunk, buffer.encoding, function () {
            buffer.callback();
            self.clearBuffer();
        });
    } else {
        // 如果缓存区空了,则重置写入状态
        self.writing = false;
        if (self.needDrain) {
            // 发送 drain 事件,告知缓存区已经消耗完了,可以进行下一波数据写入了
            self.needDrain = false;
            self.emit('drain');
        }
    }
};
复制代码

我们会发现,当数据流来的时候,可写流会直接去消费数据。当 消费/写入文件 速度过于缓慢的时候,数据流会被送入缓存区缓存起来。

当生产者传来的数据速度过快,把缓存塞满了之后,就会出现「背压」(fs.write(..)返回的结果),这个时候是需要告诉生产者暂停生产的,当缓存区被消耗完之后,可写流会给生产者发送一个 drain 消息,这样就可以恢复生产了。

总结

以上 fs Readable Streamfs Writable Stream分别是流的基本类型Readable StreamWritable Stream的上层API。fs源码中,实际上是分别继承这两个基本流类型再加上一些fs的文件操作,最后扩展成一个文件流的。

所以流就是基于事件和状态机去实现'生产者/消费者'这样的一个模型。

更多关于流的更多使用,可参考官网

参考

  • 深入理解 Node.js Stream 内部机制
  • [译] Node.js 流: 你需要知道的一切
  • Node中的stream
  • Node.js官网
  • Node.js源码

转载于:https://juejin.im/post/5ac8b961f265da2397071de9

相关文章:

  • 给mybatis添加自动建表,自动加字段的功能
  • 如何夯实(Java)编程基础,并深入学习和提高
  • 大话测试与质量
  • 文顶顶虽老,博客尚在
  • BZOJ3998:[TJOI2015]弦论——题解
  • 3、第一个Appium测试
  • 【代码片段】Python发送带图片的邮件
  • @Autowired @Resource @Qualifier的区别
  • 区块链学习路线
  • Activity事件分发机制
  • ListT常用操作函数
  • Tomcat 的连接数与线程池
  • JVM内存模型
  • windows下安装redis以及redis扩展,设置redis为windows自启服务
  • HTML中动态生成内容的事件绑定问题
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • 【Linux系统编程】快速查找errno错误码信息
  • 【翻译】Mashape是如何管理15000个API和微服务的(三)
  • 【附node操作实例】redis简明入门系列—字符串类型
  • android百种动画侧滑库、步骤视图、TextView效果、社交、搜房、K线图等源码
  • Android开源项目规范总结
  • CSS魔法堂:Absolute Positioning就这个样
  • Django 博客开发教程 8 - 博客文章详情页
  • httpie使用详解
  • iOS筛选菜单、分段选择器、导航栏、悬浮窗、转场动画、启动视频等源码
  • JavaScript设计模式之工厂模式
  • js对象的深浅拷贝
  • log4j2输出到kafka
  • PAT A1017 优先队列
  • PHP变量
  • Terraform入门 - 3. 变更基础设施
  • 高性能JavaScript阅读简记(三)
  • 计算机在识别图像时“看到”了什么?
  • 看域名解析域名安全对SEO的影响
  • 配置 PM2 实现代码自动发布
  • 视频flv转mp4最快的几种方法(就是不用格式工厂)
  • 译自由幺半群
  • 做一名精致的JavaScripter 01:JavaScript简介
  • 第二十章:异步和文件I/O.(二十三)
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • ###STL(标准模板库)
  • (173)FPGA约束:单周期时序分析或默认时序分析
  • (5)STL算法之复制
  • (定时器/计数器)中断系统(详解与使用)
  • (附源码)node.js知识分享网站 毕业设计 202038
  • (接口封装)
  • (十八)三元表达式和列表解析
  • (十六)Flask之蓝图
  • (十五)使用Nexus创建Maven私服
  • (一) storm的集群安装与配置
  • (一)基于IDEA的JAVA基础10
  • ******之网络***——物理***
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • . Flume面试题
  • .bat批处理(八):各种形式的变量%0、%i、%%i、var、%var%、!var!的含义和区别