流是什么?
流的常见的应用场景有哪些?
流的实现机制是什么?
流是什么?
什么是流呢?我画了个污水处理的简化流程来表达我对流的理解(原谅我拙劣的想象力和画技)。
这也是我早期在使用gulp的时候,对它的工作机制的理解。污水通过进水口进入,通过污水处理器中一系列的处理,最终会在出水口流出清水。
回归到代码中来,使用gulp时,我输入一个less文件,gulp会使用less插件、自动补全插件、压缩插件等一系列的处理,最终输出我们想要的css文件。一条龙服务,有木有?
在学习流的过程中,我还了解到了一个有趣的模型——生产者/消费者模型。理解了这个模型,那理解流就基本没什么压力了。
举个生活中的例子,帮助我们理解生产者/消费者模型。
生产者,比作生活中的工厂,不断生成泡面。
消费者,比作广大人民群众,需要不断的买泡面续命。
如果我直接去工厂里买泡面,虽然会很便宜,但人家零售就会亏本,所以只会批发给我,这足以刷爆我的小金库。
对于工厂,如果直接面对买家,那工厂生产一天的泡面,就要停产一个月,因为接下来一个月都需要去卖泡面。很快,工厂就会倒闭了。
既然工厂和消费者不能直接对接,那如果引入一个第三方——超市,作为工厂的代理商。那么,工厂就可以专注于生产泡面了,生产的泡面直接批发给超市。而消费者也不用去工厂批发了,虽然贵一点,但自由啊。想吃,就去超市买一包。
通过以上两个现实小栗子,宏观理解一下流。
三个角色:生产者、消费者、第三方中介
下方虚线的第三方中介,与上方的第三方中介,就是一个。之所以分开表示,是为了突出消费者与生产者通过第三方中介解耦了。
流的一些应用场景
我们从Node.js官网中可以找到,Node.js有四种基本的流类型:
- Readable——可读流
- Writable——可写流
- Duplex——可读写的流,也叫双工流
- Transform——在读写过程中可以修改和变换数据的一种特殊的 Duplex 流
Node.js中很多内建/核心模块都是基于流实现的:
由此图可看出,流在Node.js中的地位。可以说理解了流,将会很大程度的帮助我们去理解和使用上图列举的内建模块。
流的实现机制
了解流内部机制的最佳方式除了看 Node.js 官方文档,还可以去看看 Node.js 的 源码:
- _stream_readable.js
- _stream_writable.js
- _stream_duplex.js
- _stream_transform.js
从源码中我们可以看出,这四种基本类型的流都是流的一种抽象,提供给开发者去扩展使用的,所以源码看起来得有一定的使用基础。接下来我将另辟蹊径,不借助这四种基本类型的流,去实现 fs readable stream 的主要逻辑,管中窥豹,加深对流的理解
fs Readable Stream
Readable stream有两种模式:
- flowing:在该模式下,会尽快获取数据向外输出。因此如果没有事件监听,也没有pipe()来引导数据流向,数据可能会丢失。
- paused:默认模式。在该模式下,需要手动调用
stream.read(..)
来获取数据。
可以通过以下几种方法切换到flowing模式:
- 添加
'data'
事件监听器 - 调用
stream.resume(..)
方法 - 调用
stream.pipe()
方法将数据发送给消费者Writable
可以通过以下几种方法切换到paused模式:
- 如果没有调用
stream.pipe(..)
,则调用stream.pause(..)
即可 - 如果有调用
stream.pipe(..)
,那么需要通过stream.unpipe(..)
移除所有的pipe
需要特别注意以下几点:
- 只有提供消费者去消费数据,比如,添加
'data'
事件监听器,可读流才会去生产数据。 - 如果移除
'data'
事件监听器,将不会自动的停止流。 - 如果调用了
stream.pipe(..)
,再调用stream.pause()
,将不会停止这个流。 - 如果可读流被切换到了流的模式,但是却没有添加
'data'
事件监听器,那么数据将会丢失掉。比如调用了stream.resume()
,却没有添加'data'
事件监听器,或者'data'
事件监听器被移除了。 - 选择一种方式去消耗可读流生产的数据。比较推荐
stream.pipe(..)
。也可使用可控性比较强的事件机制,再配合readable.pause()/readable.resume()
APIs - 如果
readable
和data
被同时使用了,那么readable
事件的优先级会比data
事件高。此时,必须在readable
事件内显示调用stream.read(..)
才能读到数据。
更多API使用可参考官网stream_readable_streams
flowing模式
下面尝试实现文件可读流的流动模式,一观它的内部机制。
// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');
// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法
// 由此可看出,流就是基于事件机制实现的
util.inherits(FsReadableStream, EventEmitter);
// 声明一个文件可读流的构造函数,并初始化相关参数
function FsReadableStream(path, options) {
const self = this; // 防止this指针的指向混乱
// 这里为了主要说明实现流程,省略参数的边界限制。
self.path = path;
// 打开文件时的参数
// 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
self.flags = options.flags || 'r';
self.mode = options.mode || 0o66;
// 文件内容的读取起止位置
self.start = options.start || 0;
self.end = options.end;
// 每次读取内容的水位线,即一次读取,最大缓存
self.highWaterMark = options.highWaterMark || 64 * 1024;
// 内容读取完毕之后,是否自动关闭文件
self.autoClose = options.autoClose === undefined ? true : options.autoClose;
// 最后输出的数据将以何种编码方式解码
self.encoding = options.encoding || 'utf8';
// 文件有效的描述符
self.fd = null;
// 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度
self.pos = self.start;
// 申请水位线大小的空间作为buffer缓存
self.buffer = Buffer.alloc(self.highWaterMark);
// new这个构造函数时,就打开文件,为后续做准备
self.open();
// 模式 初始为暂停模式
self.flowing = null;
// 一旦有新的事件被监听,且,是'data'事件,
// 就将模式切换至流动模式,并读取数据
self.on('newListener', function (eventName) {
if (eventName === 'data') {
self.flowing = true;
self.read();
}
});
}
// 在对文件操作之前,需要先打开文件,获取文件的有效描述符
FsReadableStream.prototype.open = function () {
const self = this;
fs.open(self.path, self.flags, self.mode, function (err, fd) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
self.fd = fd;
self.emit('open', fd);
});
};
// 读取文件里的内容
FsReadableStream.prototype.read = function () {
const self = this;
// self.open()是异步方法,此时,需判断文件是否被打开了
if (typeof self.fd !== 'number') {
// 文件未打开,可以添加open事件监听器
self.once('open', self.read);
return;
}
// 需计算每次需要读取多少数据。
const howMuchToRead = self.end ? Math.min(self.highWaterMark, self.end - self.pos + 1) : self.highWaterMark;
fs.read(self.fd, self.buffer, 0, howMuchToRead, self.pos, function (err, bytesRead) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
if (bytesRead > 0) {
// 更新读取位置
self.pos = self.pos + bytesRead;
// 有可能读取的内容长度比buffer缓存长度小,就必须截取出来,防止乱码情况
const data = self.encoding ? self.buffer.slice(0, bytesRead).toString(self.encoding) : self.buffer.slice(0, bytesRead);
self.emit('data', data);
// 如果下一次读取的起始位置比结束位置要大,则表明已经读完了
if (self.pos > self.end) {
self.emit('end');
if (self.autoClose) {
self.destroy();
}
}
// 如果仍然处于流动模式,将会继续读取数据
if (self.flowing) {
self.read();
}
} else {
// 文件内容已经读取完毕了
self.emit('end');
if (self.autoClose) {
self.destroy();
}
}
});
};
// 将流动模式切换到暂停模式
FsReadableStream.prototype.pause = function () {
if (this.flowing !== false) {
this.flowing = false;
}
};
// 将暂停模式切换到流动模式
FsReadableStream.prototype.resume = function () {
// 如果直接调用resume,却没有添加data监听器
// 数据则会丢失
if (!this.flowing) {
this.flowing = true;
this.read();
}
};
// 关闭文件
FsReadableStream.prototype.destroy = function () {
const self = this;
if (typeof self.fd === 'number') {
fs.close(self.fd, function (err) {
if (err) {
self.emit('error', err);
return;
}
self.fd = null;
self.emit('close');
});
return;
}
this.emit('close');
};
复制代码
我们会发现流动模式,是会源源不断的生成数据的,直到数据源枯竭为止。当然,也可以通过stream.pause(..)/stream.resume
去精准控制。这种模式的控制权在于开发者,开发者必须熟悉这种模式的运行机制,谨慎运用,否则很容易出现,消费者被撑爆或者数据中途丢失的情况。
paused模式
由于流动模式和暂停模式是互斥的,所以采用分开实现可读流的两种模式。暂停模式下,我们需要监听另外一个事件——'readable'
,并显示调用stream.read(n)
才能读到数据。
// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js
// 文件可读流的暂停模式
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');
// 使用Node.js内部工具模块,让文件可读流继承事件的很多事件方法
// 由此可看出,流就是基于事件机制实现的
util.inherits(FsReadableStream, EventEmitter);
// 声明一个文件可读流的构造函数,并初始化相关参数
function FsReadableStream(path, options) {
const self = this; // 防止this指针的指向混乱
// 这里为了主要说明实现流程,省略参数的边界限制。
self.path = path;
// 打开文件时的参数
// 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
self.flags = options.flags || 'r';
self.mode = options.mode || 0o66;
// 文件内容的读取起止位置
self.start = options.start || 0;
self.end = options.end;
// 每次读取内容的水位线,即一次读取,最大缓存
self.highWaterMark = options.highWaterMark || 64 * 1024;
// 内容读取完毕之后,是否自动关闭文件
self.autoClose = options.autoClose === undefined ? true : options.autoClose;
// 最后输出的数据将以何种编码方式解码
self.encoding = options.encoding || 'utf8';
// 文件有效的描述符
self.fd = null;
// 文件读取的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次读的内容长度
self.pos = self.start;
// 作为缓存存放每次读到的数据 [Buffer, Buffer, Buffer...]
self.buffers = [];
// 当前缓存的长度 self.buffers.length 只能读到数组有多少个元素。
// 这里是缓存 buffers 每一项的长度之和
self.length = 0;
// 因为读取文件是异步操作,所以这里需要一个标记
// 如果处于正在读状态,则将数据存放在缓存中
this.reading = false;
// 是否达到 发送 'readable' 事件的条件
// 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。
// 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null
this.emittedReadable = false;
// new这个构造函数时,就打开文件,为后续做准备
self.open();
// 一旦有新的事件被监听,且,是'readable'事件,
// 开启 emittedReadable
self.on('newListener', function (eventName) {
if (eventName === 'readable') {
self.read();
}
});
}
FsReadableStream.prototype.read = function (n) {
const self = this;
let buffer = null;
// 当索要的数据长度 大于 缓存区的长度
if (n > self.length) {
// 此时要索要的数据,超过了缓存大小
// 就会提升水位线,来适应这种需求量
// computeNewHighWaterMark 是node源码中提高水位线的方法
self.highWaterMark = computeNewHighWaterMark(n);
self.emitReadable = true;
self._read();
}
// 当索要的数据长度 大于 0 且 小于等于 缓存区的长度
if (n > 0 && n <= self.length) {
// 先申请Buffer内存
buffer = Buffer.alloc(n);
let index = 0; // 循环次数
let flag = true; // 控制while的标记
let b;
while (flag && (b = self.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
buffer[index++] = b[i]; // 赋值
if (n === index) {
let arr = b.slice(index);
if (arr.length) {
// 不要的 再塞回缓存
self.buffers.unshift(arr);
}
self.length = self.length - n;
flag = false;
}
}
}
}
// 如果当期缓存区没有数据
if (self.length === 0) {
self.emittedReadable = true;
}
// 当缓存区的数据长度 小于 水位线了 就去生成数据,继续放在缓存区
if (self.length < self.highWaterMark) {
if (!self.reading) {
self.reading = true;
self._read();
}
}
// 返回读到的数据
return buffer;
};
FsReadableStream.prototype._read = function () {
const self = this;
if (typeof self.fd !== 'number') {
return self.once('open', self._read);
}
const buffer = Buffer.alloc(self.highWaterMark);
fs.read(self.fd, buffer, 0, self.highWaterMark, self.pos, function (err, bytesRead) {
if (bytesRead > 0) {
// 默认将读取的内容放到缓存区中
self.buffers.push(buffer.slice(0, bytesRead));
self.pos = self.pos + bytesRead; // 维护读取的索引
self.length = self.length + bytesRead; // 维护缓存区的大小
self.reading = false; // 读取完成
// 是否需要触发readable事件
if (self.emittedReadable) {
self.emittedReadable = false; // 下次默认不触发
self.emit('readable');
}
} else {
self.emit('end');
if (self.autoClose) {
self.destroy();
}
}
});
};
// 在对文件操作之前,需要先打开文件,获取文件的有效描述符
FsReadableStream.prototype.open = function () {
const self = this;
fs.open(self.path, self.flags, self.mode, function (err, fd) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
self.fd = fd;
self.emit('open', fd);
});
};
// 关闭文件
FsReadableStream.prototype.destroy = function () {
const self = this;
if (typeof self.fd === 'number') {
fs.close(self.fd, function (err) {
if (err) {
self.emit('error', err);
return;
}
self.fd = null;
self.emit('close');
});
return;
}
this.emit('close');
};
复制代码
这种模式,我们会发现,'readable'
事件是告诉我们什么时候可以去索取数据了。如果直接去调stream.read(n)
的话,会因为fs.read(..)
异步操作,还没将数据读出并放至缓存区,导致结果将返回null。
只要缓存区内容被消耗至水位线以下,就会自动续杯,生成水位线大小的数据放到缓存中。
那什么时候会触发'readable'
事件呢?缓存区为空,然后生产了水位线大小的数据放在缓存区之后,便会触发。下一次触发的时机,仍然是缓存区被消耗干净了,再次续满杯之后。
stream.read(n)
想要多少数据,就传入多长。可以通过stream.length
查看当前缓存区数据的长度,再决定索取多少数据。实际场景运用中则需要使用stream._readableState.length
fs Writable Stream
fs writable stream
它的机制和可读流有些相似。话不多说,先上代码:
// 源码参见 https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');
// 使用Node.js内部工具模块,让文件可写流继承事件的很多事件方法
// 由此可看出,流就是基于事件机制实现的
util.inherits(FsWritableStream, EventEmitter);
// 声明一个文件可写流的构造函数,并初始化相关参数
function FsWritableStream(path, options) {
const self = this; // 防止this指针的指向混乱
// 这里为了主要说明实现流程,省略参数的边界限制。
self.path = path;
// 打开文件时的参数
// 参见 https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
self.flags = options.flags || 'r';
self.mode = options.mode || 0o66;
// 文件内容的写入开始位置
self.start = options.start || 0;
// 每次写入内容的水位线,即最大缓存
self.highWaterMark = options.highWaterMark || 64 * 1024;
// 内容写入完毕之后,是否自动关闭文件
self.autoClose = options.autoClose === undefined ? true : options.autoClose;
// 告诉程序写入的数据将以何种编码方式解码
self.encoding = options.encoding || 'utf8';
// 文件有效的描述符
self.fd = null;
// 文件写入的实时起始位置。第一次从0开始,第二次就是从 0 + 第一次写入的内容长度
self.pos = self.start;
// 作为缓存,存放来不及写入文件的数据 [Buffer, Buffer, Buffer...]
self.buffers = [];
// 当前缓存的长度 self.buffers.length 只能读到数组有多少个元素。
// 这里是缓存 buffers 每一项的长度之和
self.length = 0;
// 因为读取文件是异步操作,所以这里需要一个标记
// 如果处于正在读状态,则将数据存放在缓存中
this.writing = false;
// 控制是否通知'drain'事件监听器,表示,缓存区从满状态,被消耗空了
self.needDrain = false;
// new这个构造函数时,就打开文件,为后续做准备
self.open();
}
FsWriteStream.prototype.open = function () {
const self = this;
fs.open(self.path, self.flags, self.mode, function (err, fd) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
self.fd = fd;
self.emit('open', fd);
});
};
FsWriteStream.prototype.destroy = function () {
const self = this;
if (typeof self.fd === 'number') {
fs.close(self.fd, function (err) {
if (err) {
self.emit('error', err);
return;
}
self.emit('close');
});
} else {
self.emit('close');
}
};
// 主动发起调用,这里默认三个参数都会传
FsWriteStream.prototype.write = function (chunk, encoding, callback) {
const self = this;
const bufferChunk = Buffer.isBuffer(chunk) && chunk || Buffer.from(chunk, encoding);
self.length = self.length + bufferChunk.length;
// 如果当前缓存长度 小于 水位线,表示缓存区未满
const ret = self.length < self.highWaterMark;
// 当缓存区满了,则打开 向drain事件 发通知的开关
self.needDrain = !ret;
if (self.writing) {
// 正在 写入/消费 数据,所以先存到缓存中
self.buffers.push({
chunk,
encoding,
callback,
});
} else {
// 相当于去发ajax,这里先来一个loading
self.writing = true;
self._write(chunk, encoding, function () {
callback();
// 当写入完成时,清除缓存中的内容
self.clearBuffer();
});
}
// 每次调用write时,到会返回当前缓存区是否满了
return ret;
};
FsWriteStream.prototype._write = function (chunk, encoding, callback) {
const self = this;
if (typeof self.fd !== 'number') {
self.once('open', function () {
self._write(chunk, encoding, callback);
});
return;
}
fs.write(self.fd, chunk, 0, chunk.length, self.pos, function (err, writtenBytes) {
if (err) {
self.emit('error', err);
self.writing = null;
if (self.autoClose) {
self.destroy();
}
return false;
}
// 长度 减掉已经消费成功的数据长度
self.length = self.length - writtenBytes;
// 更新下一次写入的开始位置
self.pos = self.pos + writtenBytes;
callback();
});
};
FsWriteStream.prototype.clearBuffer = function () {
const self = this;
const buffer = self.buffers.shift();
if (buffer) {
// 如果缓存区仍有数据,则继续消费数据
self._write(buffer.chunk, buffer.encoding, function () {
buffer.callback();
self.clearBuffer();
});
} else {
// 如果缓存区空了,则重置写入状态
self.writing = false;
if (self.needDrain) {
// 发送 drain 事件,告知缓存区已经消耗完了,可以进行下一波数据写入了
self.needDrain = false;
self.emit('drain');
}
}
};
复制代码
我们会发现,当数据流来的时候,可写流会直接去消费数据。当 消费/写入文件 速度过于缓慢的时候,数据流会被送入缓存区缓存起来。
当生产者传来的数据速度过快,把缓存塞满了之后,就会出现「背压」(fs.write(..)
返回的结果),这个时候是需要告诉生产者暂停生产的,当缓存区被消耗完之后,可写流会给生产者发送一个 drain
消息,这样就可以恢复生产了。
总结
以上 fs Readable Stream
和fs Writable Stream
分别是流的基本类型Readable Stream
和Writable Stream
的上层API。fs源码中,实际上是分别继承这两个基本流类型再加上一些fs的文件操作,最后扩展成一个文件流的。
所以流就是基于事件和状态机去实现'生产者/消费者'这样的一个模型。
更多关于流的更多使用,可参考官网
参考
- 深入理解 Node.js Stream 内部机制
- [译] Node.js 流: 你需要知道的一切
- Node中的stream
- Node.js官网
- Node.js源码