Nodejs的Stream对象

原创 Lin_Grady 教程 nodejs 153阅读 2018-07-26 10:22:23 举报

前言

算是对之前的博文的补充,纯属于API讲解,看起来会很枯燥的。
Nodejs内存控制
Nodejs的Buffer对象(中)—— 基本原理与使用要点

流(stream)

流(stream)是一种可读写的在 Node.js 中处理流式数据的抽象接口,所有的流都是 EventEmitter 的实例。

之所以出现这个模块是因为之前在Nodejs内存控制有提到过,Nodejs 提供了 Stream模块 用于处理大文件,它继承自 EventEmitter ,所以具备基本的自定义事件功能,同时抽象出标准的事件和方法, Nodejs 大多数模块都会用到 Stream模块,得益于它实现方式不受V8内存限制,能够有效提高代码健壮性

stream 模块提供了一些基础的 API,用于构建实现了流接口的对象,尽管理解流的工作方式很重要,但是 stream 模块本身主要用于开发者创建新类型的流实例。 对于以消耗流对象为主的开发者,极少需要直接使用 stream 模块。

流的类型

  • Writable - 可写入数据的流(例如 fs.createWriteStream())。
  • Readable - 可读取数据的流(例如 fs.createReadStream())。
  • Duplex - 可读又可写的流(例如 net.Socket)。
  • Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())。

对象模式

所有 Node.js API 创建的流都是专门运作在字符串Buffer(或 Uint8Array)对象上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null,因为它在流中有特殊用途)。 这些流会以“对象模式”进行操作。

当创建流时,可以使用 objectMode 选项把流实例切换到对象模式,试图将已经存在的流切换到对象模式是不安全的

缓冲

可写流和可读流都会在一个内部的缓冲器中存储数据,可以分别使用 writable.writableBuffer 或 readable.readableBuffer 来获取。

可缓冲的数据的数量取决于传入流构造函数的 highWaterMark 选项。 对于普通的流,highWaterMark 选项指定了字节的总数量。 对于以对象模式运作的流,highWaterMark 指定了对象的总数量。

当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消耗程序没有调用 stream.read(),则这些数据会停留在内部队列中,直到被消耗。

一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消耗 (也就是说,流会停止调用内部的用于填充可读缓冲的 readable._read() 方法)。

当反复地调用 writable.write(chunk) 方法时,数据会被缓冲在可写流中。 当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false。

stream API 的主要目标,特别是 stream.pipe() 方法,是为了将数据缓冲限制在可接受的级别,以便不同速度的源和目的地不会超过可用内存。

因为 Duplex 和 Transform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。 例如,net.Socket 实例是 Duplex 流,它的可读端可以消耗从 socket 接收的数据,而可写端则可以将数据写入到 socket。 因为数据写入到 socket 的速度可能比接收数据的速度快或者慢,所以在读写两端独立地进行操作(或缓冲)就显得很重要了。

可写流(stream.Writable 类)

可写流是对数据要被写入的目的地的一种抽象,所有可写流都实现了 stream.Writable类 定义的接口。

我们先以 fs模块fs.WriteStream 类 为例运行一次看看效果,然后再针对stream.Writable 类的方法事件做进一步详解。
常规事件:

  • close:当 WriteStream 底层的文件描述符被关闭时触发。
  • open:当 WriteStream 的文件被打开时触发。
  • ready:当 fs.WriteStream 已准备好被使用时触发,在 'open' 事件之后立即触发。
  • finish:当 fs.WriteStream 写入完成时触发。
  • error:当 fs.WriteStream 错误时触发。

(完整代码可以执行stream-demo的 lesson1 查看效果)

stream.Writable 类事件

'close' 事件

当流或其底层资源(比如文件描述符)被关闭时,触发 'close' 事件。 该事件表明不会再触发其他事件,且不会再发生运算,不是所有可写流都会触发 'close' 事件。

'drain' 事件

如果调用 stream.write(chunk) 方法返回 false,则在适合恢复写入数据到流时触发 'drain' 事件。

'error' 事件

当写入数据出错或使用管道出错时,触发 'error' 事件。 监听器回调函数被调用时会传入一个 Error 参数,但触发 'error' 事件时,流还未被关闭

'finish' 事件

调用 stream.end() 方法且缓冲数据都已经传给底层系统之后,触发 'finish' 事件。

'pipe' 事件

当在可读流上调用 stream.pipe() 方法添加可写流到目标流向时,触发 'pipe' 事件。

'unpipe' 事件

当在可读流上调用 stream.unpipe() 方法从目标流向中移除当前可写流时,触发 'unpipe' 事件。
当可读流通过管道流向可写流发生错误时,也会触发 'unpipe' 事件。

stream.Writable 类方法

writable.write(chunk[, encoding][, callback])

参数描述
chunk [string,Buffer,Uint8Array,any]要写入的数据。可选的。 对于非对象模式下的流, chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象模式下的流,chunk 可以是除 null 外的任意 JavaScript 值。
encoding [string]如果 chunk 是字符串,这里指定字符编码,对象模式的写入流将忽略 encoding 参数。
callback [Function]缓冲数据输出时的回调函数

writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback 不一定以这个错误作为第一个参数并被调用,要确保可靠地检测到写入错误,应该监听 'error' 事件。

在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。 如果返回值为 false ,应该停止向流中写入数据,直到 'drain' 事件被触发。

当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发。 我们建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。 然而,当流不处在 'drain' 状态时, 调用 write() 是被允许的, Node.js 会缓存所有已经写入的数据块, 直到达到最大内存占用, 这时它会无条件中止。 甚至在它中止之前, 高内存占用将会导致糟糕的垃圾回收器的性能和高的系统相对敏感性 (即使内存不再需要,也通常不会被释放回系统)。 如果远程的另一端没有读取数据, TCP sockets 可能永远也不会 drain , 所以写入到一个不会drain的socket可能会导致远程可利用的漏洞。

对于一个 Transform, 写入数据到一个不会drain的流尤其成问题, 因为 Transform 流默认被暂停, 直到它们被pipe或者被添加了 'data' 或 'readable' 事件处理函数。

如果想根据需要生成或者取得将要被写入的数据,我们建议将逻辑封装为一个可读流并且使用 stream.pipe()。 然而如果调用 write() 优先, 那么可以使用 'drain' 事件来防止回压并且避免内存问题:

(完整代码可以执行stream-demo的 lesson2 查看效果)

writable.end([chunk][, encoding][, callback])

参数描述
chunk(string,Buffer,Uint8Array,any)可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null。
encoding(string)如果 chunk 是字符串,这里指定字符编码。
callback(Function)可选的,流结束时的回调函数。

调用 writable.end() 方法表明接下来没有数据要被写入可写流。通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。

在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

(完整代码可以执行stream-demo的 lesson3 查看效果)

writable.destroy([error])

摧毁这个流,并发出传过来的错误。当这个函数被调用后,这个写入流就结束了。 使用者不应该重写这个函数,而是重写 writable._destroy。

(完整代码可以执行stream-demo的 lesson4 查看效果)

writable.cork()

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出。

在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internal buffer)可能失效,从而导致性能下降。writable.cork() 方法主要就是用来避免这种情况。 对于这种情况, 实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率。

writable.uncork()

将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

如果使用 writable.cork() 和 writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理。

如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据。

(完整代码可以执行stream-demo的 lesson5 查看效果)

writable.setDefaultEncoding(encoding)

用于为可写流设置 encoding。

writable属性

writableHighWaterMark: 返回构造该可写流时传入的 highWaterMark 参数值。
writableLength: 这个属性包含了写入就绪队列的字节(或者对象)数,这个值提供了关于highWaterMark状态的内省数据。

我们同目录下随便新建一个文本,然后直接输出流信息看

(完整代码可以执行stream-demo的 lesson6 查看效果)

实例

客户端上的 HTTP 请求
服务器上的 HTTP 响应
fs 写入的流
zlib 流
crypto 流
TCP socket
子进程 stdin
process.stdoutprocess.stderr

可读流(stream.Readable 类)

可读流是对提供数据的来源的一种抽象,所有的可读流都实现了 stream.Readable类 上定义的接口。

我们依然以 fs模块 的 fs.ReadStream 类 为例运行一次看看效果,然后再针对stream.Readable 类的方法事件做进一步详解。

(完整代码可以执行stream-demo的 lesson8 查看效果)

两种模式

可读流实质上运作于流动中(flowing)已暂停(paused)两种模式之一。

在 flowing 模式中,数据自动地从底层的系统被读取,并通过 EventEmitter 接口的事件尽可能快地被提供给应用程序。

在 paused 模式中,必须显式调用 stream.read() 方法来从流中读取数据片段。

所有可读流都开始于 paused 模式,可以通过以下方式切换到 flowing 模式:

  • 新增一个 'data' 事件处理函数。
  • 调用 stream.resume() 方法。
  • 调用 stream.pipe() 方法发送数据到可写流。

可读流可以通过以下方式切换回 paused 模式:

  • 如果没有管道目标,调用 stream.pause() 方法。
  • 如果有管道目标,移除所有管道目标。调用 stream.unpipe() 方法可以移除多个管道目标。

需要记住的重要概念是,只有提供了消耗或忽略数据的机制后,可读流才会产生数据。 如果消耗的机制被禁用或移除,则可读流会停止产生数据。

为了向后兼容,移除 'data' 事件处理函数不会自动地暂停流。 如果存在管道目标,一旦目标变为 drain 状态并请求接收数据时,则调用 stream.pause() 也不能保证流会保持暂停状态。

如果可读流切换到 flowing 模式,且没有可用的消耗函数处理数据,则这些数据将会丢失。 例如,当调用 readable.resume() 方法时,没有监听 'data' 事件或 'data' 事件的处理函数从流中被移除了。

三种状态

可读流运作的两种模式是对发生在可读流中更加复杂的内部状态管理的一种简化的抽象。

在任意时刻,任一可读流会处于以下三种状态之一:

  • readable.readableFlowing = null
  • readable.readableFlowing = false
  • readable.readableFlowing = true

当 readable.readableFlowing 为 null 时,没有提供消耗流数据的机制,所以流不会产生数据。 在这个状态下,监听 'data' 事件、调用 readable.pipe() 方法、或调用 readable.resume() 方法, 则 readable.readableFlowing 会变成 true ,可读流开始主动地产生数据触发事件。

调用 readable.pause()、readable.unpipe()、或接收反压,则 readable.readableFlowing 会被设为 false,暂时停止事件流动但不会停止数据的生成。 在这个状态下,为 'data' 事件设置监听器不会使 readable.readableFlowing 变成 true。

(完整代码可以执行stream-demo的 lesson7 查看效果)

当 readable.readableFlowing 为 false 时,数据可能会堆积在流的内部缓冲中。

API选择

可读流 API 的演化贯穿了多个 Node.js 版本,提供了多种方法来消耗流数据。通常开发者应该选择其中一种来消耗数据,而不应该在单个流使用多种方法来消耗数据。

对于大多数用户,建议使用 readable.pipe() 方法来消耗流数据,因为它是最简单的一种实现。开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitter 和 readable.pause()/readable.resume() 提供的 API 。

stream.Readable 类事件

'error' 事件

'error' 事件可以在任何时候在可读流实现(Readable implementation)上触发。 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。

回调函数将接收到一个 Error 对象。

'close' 事件

将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件,不是所有 [Readable][] 都会触发 'close' 事件。

'data' 事件

chunk [Buffer,string,any] :数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。 对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。

'data' 事件会在流将数据传递给消耗者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe(), readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。

在没有明确暂停的流上添加 'data' 事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。

如果调用 readable.setEncoding() 方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个 Buffer 实例。

'readable' 事件

'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。

当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。

事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。 例如,下面的例子中的 foo.txt 是一个空文件:

上面脚本的输出如下:

注意: 通常情况下,readable.pipe() 方法和 'data' 事件机制比 'readable' 事件更容易理解。然而处理 'readable'事件可能造成吞吐量升高。

'end' 事件

'end' 事件将在流中再没有数据可供消耗时触发。

注意: 'end' 事件只有在数据被完全消耗后才会触发 。 可以在数据被完全消耗后,通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

stream.Readable 类方法

readable.setEncoding(encoding)

默认返回Buffer对象。设置编码会使得该流数据返回指定编码的字符串而不是Buffer对象。例如,调用readable.setEncoding('utf-8')会使得输出数据作为UTF-8数据解析,并作为字符串返回。调用readable.setEncoding('hex')使得数据被编码成16进制字符串格式。

可读流会妥善处理多字节字符,如果仅仅直接从流中取出Buffer对象,很可能会导致错误解码。

readable.destroy([error])

销毁流,并且触发error事件。然后,可读流将释放所有的内部资源。

(完整代码可以执行stream-demo的 lesson9 查看效果)
开发者不应该覆盖这个方法,应该覆盖readable._destroy方法。

readable.resume()

readable.resume() 方法会重新触发data事件, 将暂停模式切换到流动模式,也可以用来充分使用流中的数据,而不用实际处理任何数据,如以下示例所示:

readable.pause()

readable.pause() 方法将会使 flowing 模式的流停止触发 'data' 事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

readable.isPaused()

readable.isPaused() 方法返回可读流的当前操作状态。 该方法主要是在 readable.pipe() 方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。

readable.pipe(destination[, options])

参数描述
destination (stream.Writable)数据写入目标
options (Object)Pipe 选项,end (Object):在 reader 结束时结束 writer 。默认为 true。

readable.pipe() 绑定一个 [Writable][] 到 readable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 [Writable][]。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)

下面例子将 readable 中的所有数据通过管道传递给名为 file.txt 的文件:

可以在单个可读流上绑定多个可写流。

readable.pipe() 方法返回 目标流 的引用,这样就可以对流进行链式地管道操作:

默认情况下,当源可读流(the source Readable stream)触发 'end' 事件时,目标流也会调用 stream.end() 方法从而结束写入。要禁用这一默认行为, end 选项应该指定为 false, 这将使目标流保持打开, 如下面例子所示:

这里有一点要警惕,如果可读流在处理时发生错误,目标可写流不会自动关闭。 如果发生错误,需要手动关闭所有流以避免内存泄漏。

注意:不管对 process.stderr 和 process.stdout 指定什么选项,它们都是直到 Node.js 进程退出才关闭。

readable.unpipe([destination])

readable.unpipe() 方法将之前通过stream.pipe()方法绑定的流分离

如果 destination 没有传入, 则所有绑定的流都会被分离.

如果传入 destination, 但它没有被pipe()绑定过,则该方法不作为

readable.read([size])

从内部缓冲区中抽出并返回一些数据。如果没有可读的数据,返回null。readable.read()方法默认数据将作为“Buffer”对象返回 ,除非已经使用readable.setEncoding()方法设置编码或流运行在对象模式。

可选的size参数指定要读取的特定数量的字节。如果size字节不可读,将返回null除非流已经结束,在这种情况下剩余所有保留在内部缓冲区的数据将被返回。如果没有指定size参数,则内部缓冲区包含的所有数据将返回。

readable.read()方法只应该在暂停模式下的可读流上运行。在流模式下,readable.read()自动调用直到内部缓冲区的数据完全耗尽。

一般来说,建议开发人员避免使用'readable'事件和readable.read()方法,使用readable.pipe()或'data'事件代替。

无论size参数的值是什么,对象模式中的可读流将始终返回调用readable.read(size)的单个项目。

注意:如果readable.read()方法返回一个数据块,那么一个'data'事件也将被发送。

注意:在已经被发出的'end'事件后调用stream.read([size])事件将返回null。不会抛出运行时错误。

readable.unshift(chunk)

chunk (Buffer,Uint8Array,string,any) 数据块移动到可读队列底部。对于不以对象模式运行的流,chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象流, chunk 任何非null的值。

The readable.unshift() method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by code that needs to "un-consume" some amount of data that it has optimistically pulled out of the source, so that the data can be passed on to some other party.

readable.unshift() 方法会将数据块推回内部缓冲区。 这在如下特定情形下有用:代码正在消耗一个数据流,已经"乐观地"拉取了数据。 又需要"反悔-消耗"一些数据,以便这些数据可以传给其他人用。
(不太懂,附上原文)

注意: 'end' 事件已经触发或者运行时错误抛出后,stream.unshift(chunk) 方法不能被调用。

使用 stream.unshift() 的开发者一般需要换一下思路,考虑用一个[Transform][] 流替代. 更多信息请查看API for Stream Implementers部分。

注意: 不像 stream.push(chunk),stream.unshift(chunk)在重置流的内部读取状态时是不会结束读取过程。 如果在读取过程中调用 readable.unshift() 则会导致异常 (例如:即来自自定义流上的 stream._read()内部方法上的实现)。 应该在调用 readable.unshift()方法之后适当调用 stream.push('') 来重置读取状态,执行读取的过程中最好避免调用 readable.unshift()方法。

readable[Symbol.asyncIterator]()(新增于: v10.0.0)

Returns: < AsyncIterator > 去完全消耗流.

(这还是一个新的特性,了解即可)
如果循环以中断或抛错而终止,流会被销毁。换句话说迭代流会完全消耗完流,流会被读取为和highWaterMark选项相同大小的块,在上面代码示例,如果文件的数据小于64KB会在一个单独的块因为没有highWaterMark可选项提供给fs.createReadStream().

readable属性

readableHighWaterMark: 返回构造该可读流时传入的 'highWaterMark' 属性。
readableLength: 该属性包含队列中准备读取的字节数(或对象)。该值提供关于highWaterMark状态的内部数据。

我们直接输出流信息看

(完整代码可以执行stream-demo的 lesson10 查看效果)

实例

客户端上的 HTTP 请求
服务器上的 HTTP 响应
fs 写入的流
zlib 流
crypto 流
TCP socket
子进程 stdout 与 stderr
process.stdin

Duplex 流与 Transform 流

stream.Duplex 类

Duplex 流是同时实现了 [Readable][] 和 [Writable][] 接口的流。

实例

TCP sockets
zlib streams
crypto streams

stream.Transform 类

变换流(Transform streams) 是一种 [Duplex][] 流。它的输出与输入是通过某种方式关联的。和所有 [Duplex][] 流一样,变换流同时实现了 [Readable][] 和 [Writable][] 接口。

实例

zlib streams
crypto streams

新增于 v10.0.0

stream.finished(stream, callback)

参数描述
stream (stream一个可读或可写的流
callback (Function)一个回调函数,可以带有一个错误信息参数,也可没有

使用此函数,以在一个流不再可读、可写或发生了错误、提前关闭等事件时获得通知。

(完整代码可以执行stream-demo的 lesson11 查看效果)
在处理流的提前销毁(如被抛弃的HTTP请求)等错误事件时特别有用,此时流不会触发 'end' 或 'finish' 事件
finished API 也可做成Promise:

stream.pipeline(...streams[, callback])

参数描述
...streams (stream两个或多个要用管道连接的流
callback (Function)一个回调函数,可以带有一个错误信息参数,也可没有

该模块方法用于在多个流之间架设管道,可以自动传递错误和完成扫尾工作,并且可在管道架设完成时提供一个回调函数:

(完整代码可以执行stream-demo的 lesson12 查看效果)
pipeline API 也可做成Promise:

注意事项

兼容旧版本

为了向后兼容较旧的Node.js程序,当添加data事件处理程序或调用stream.resume()方法时,可读流将切换到“流动模式”。 结果是,即使不使用新的stream.read()方法和'readable'事件,也不必担心丢失'data'块。

虽然大多数应用程序将继续正常工作,但在以下情况下会引入极端情况:

  • 未添加'data' 事件监听。
  • 未调用stream.resume()方法。
  • 通过管道没有传送到任何可写的目的地。

例如,请考虑以下代码:

在Node.js v0.10及更高版本中,套接字仍然存在永远停顿。在这种情况下的解决方法是调用stream.resume()开始读取数据:

除了新的可读流切换到流动模式之外,pre-v0.10风格的流可以使用包装在Readable类中readable.wrap() 方法。

readable.read(0)

在某些情况下,需要有机制来触发刷新基础可读流, 而没有实际消耗任何数据。在这种情况下,可以调用readable.read(0),返回null。

如果内部读取缓冲区低于highWaterMark,并且该流目前未读取,则调用stream.read(0)将触发调用底层 stream._read()方法。

虽然大多数应用程序几乎都不需要这样做,但Node.js中会出现这种情况,尤其是在可读流类内部。

readable.push('')

不推荐使用readable.push('')。

向一个不处在object mode的流压入一个Buffer或Uint8Array0字节字符串,会产生有趣的副作用。 因为调用了readable.push(),所以会停止读进程。 然而,因为参数是一个空字符串,没有可读的数据添加到可读buffer, 所以没有可以供用户消耗的数据。

在调用readable.setEncoding()之后highWaterMark会产生偏差

调用 readable.setEncoding() 会改变 highWaterMark 属性在非对象模式中的作用。

一般而言,我们直接将缓冲器存储的 字节数 同 highWaterMark 相比较。然而在调用 setEncoding() 之后,程序会将缓冲器中存储的 字符数 与 highWaterMark 相比较。

在通常情况下,如使用 latin1 或 ascii 时,这不成问题。但在处理可能含有多字节字符的字符串时,此行为需要当心。

实现流的 API

stream模块API的设计是为了让JavaScript的原型继承模式可以简单的实现流。

首先,一个流开发者可能声明了一个JavaScript类并且继承四个基本流类中的一个(stream.Writeable,stream.Readable,stream.Duplex,或者stream.Transform),确保他们调用合适的父类构造函数:

新的流类必须实现一个或多个特定的方法,根据所创建的流类型,如下所示:

用例实现的方法
只读流Readable_read
只写流writable_write ,_writev,_final
可读可写流Duplex_read ,_write ,_writev,_final
操作写数据,然后读结果Transform_transform,_flush,_final

注意:实现流的代码里面不应该出现调用“public”方法的地方因为这些方法是给使用者使用的。这样做可能会导致使用流的应用程序代码产生不利的副作用。

实现一个可写流

简化构造

通过流基础类stream.Writable,stream.Readable,stream.Duplex,或者stream.Transform传入对象完成,对象包含合适的方法作为构造函数选项。例如:

Constructor: new stream.Writable([options])

  • options <Object>
    • highWaterMark <number> 缓冲大小当开始调用stream.write() 返回 false。默认16384 (16kb), 对于 objectMode 流为默认为16。
    • decodeStrings <boolean> 是否解码字符串在调用 stream._write() 传递到缓冲区之前。默认为 true
    • objectMode <boolean> stream.write(anyObj) 是否是一个有效的操作. 一旦设置,可以写字符串以外的值,例如Buffer 或者 Uint8Array 只要流支持。默认为false。
    • write <Function> 实现stream._write() 方法。
    • writev <Function> 实现stream._writev() 方法。
    • destroy <Function> 实现stream._destroy() 方法。
    • final <Function> 实现stream._final() 方法。

ES6之前的语法

简化的构造函数方法

(因为毕竟实际开发中很少需要自己去实现流的代码,所以我简单说说这章就算跳过了,有兴趣的自行研究吧)

完整demo

stream-demo
里面demo都是自己写的,没有依赖可以直接跑。懒得写就去上面搬走看,懒得搬就直接看文章,大部分代码连输出信息都给你们了。

参考资料

Node.js v10.6.0 文档
Node.js v10.7.0 Documentation

评论 ( 0 )
最新评论
暂无评论

赶紧努力消灭 0 回复