鏈接:https://zhuanlan.zhihu.com/p/21681115
來源:知乎
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
上篇(基礎篇)主要介紹了Stream的基本概念和用法,本篇將深入剖析背后工作原理,重點是如何實現流式數據處理和back pressure機制。
目錄
本篇介紹stream是如何實現流式數據處理的。
數據生產和消耗的媒介
為什么使用流取數據
下面是一個讀取文件內容的例子:
const fs = require('fs')
fs.readFile(file, function (err, body) {
console.log(body)
console.log(body.toString())
})
但如果文件內容較大,譬如在440M時,執行上述代碼的輸出為:
<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... > buffer.js:382 throw new Error('toString failed'); ^ Error: toString failed at Buffer.toString (buffer.js:382:11)
報錯的原因是body這個Buffer對象的長度過大,導致toString方法失敗。
可見,這種一次獲取全部內容的做法,不適合操作大文件。
可以考慮使用流來讀取文件內容。
const fs = require('fs')
fs.createReadStream(file).pipe(process.stdout)
fs.createReadStream創建一個可讀流,連接了源頭(上游,文件)和消耗方(下游,標准輸出)。
執行上面代碼時,流會逐次調用fs.read,將文件中的內容分批取出傳給下游。
在文件看來,它的內容被分塊地連續取走了。
在下游看來,它收到的是一個先后到達的數據序列。
如果不需要一次操作全部內容,它可以處理完一個數據便丟掉。
在流看來,任一時刻它都只存儲了文件中的一部分數據,只是內容在變化而已。
這種情況就像是用水管去取池子中的水。
每當用掉一點水,水管便會從池子中再取出一點。
無論水池有多大,都只存儲了與水管容積等量的水。
如何通過流取到數據
用Readable創建對象readable后,便得到了一個可讀流。
如果實現_read方法,就將流連接到一個底層數據源。
流通過調用_read向底層請求數據,底層再調用流的push方法將需要的數據傳遞過來。
當readable連接了數據源后,下游便可以調用readable.read(n)向流請求數據,同時監聽readable的data事件來接收取到的數據。
這個流程可簡述為:
read
read方法中的邏輯可用下圖表示,后面幾節將對該圖中各環節加以說明。
push方法
消耗方調用read(n)促使流輸出數據,而流通過_read()使底層調用push方法將數據傳給流。
如果流在流動模式下(state.flowing為true)輸出數據,數據會自發地通過data事件輸出,不需要消耗方反復調用read(n)。
如果調用push方法時緩存為空,則當前數據即為下一個需要的數據。
這個數據可能先添加到緩存中,也可能直接輸出。
執行read方法時,在調用_read后,如果從緩存中取到了數據,就以data事件輸出。
所以,如果_read異步調用push時發現緩存為空,則意味着當前數據是下一個需要的數據,且不會被read方法輸出,應當在push方法中立即以data事件輸出。
因此,上圖中“立即輸出”的條件是:
state.flowing && state.length === 0 && !state.sync
end事件
由於流是分次向底層請求數據的,需要底層顯示地告訴流數據是否取完。
所以,當某次(執行_read())取數據時,調用了push(null),就意味着底層數據取完。
此時,流會設置state.ended。
state.length表示緩存中當前的數據量。
只有當state.length為0,且state.ended為true,才意味着所有的數據都被消耗了。
一旦在執行read(n)時檢測到這個條件,便會觸發end事件。
當然,這個事件只會觸發一次。
readable事件
在調用完_read()后,read(n)會試着從緩存中取數據。
如果_read()是異步調用push方法的,則此時緩存中的數據量不會增多,容易出現數據量不夠的現象。
如果read(n)的返回值為null,說明這次未能從緩存中取出所需量的數據。
此時,消耗方需要等待新的數據到達后再次嘗試調用read方法。
在數據到達后,流是通過readable事件來通知消耗方的。
在此種情況下,push方法如果立即輸出數據,接收方直接監聽data事件即可,否則數據被添加到緩存中,需要觸發readable事件。
消耗方必須監聽這個事件,再調用read方法取得數據。
doRead
流中維護了一個緩存,當緩存中的數據足夠多時,調用read()不會引起_read()的調用,即不需要向底層請求數據。
用doRead來表示read(n)是否需要向底層取數據,其邏輯為:
var doRead = state.needReadable if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true } if (state.ended || state.reading) { doRead = false } if (doRead) { state.reading = true state.sync = true if (state.length === 0) { state.needReadable = true } this._read(state.highWaterMark) state.sync = false }
state.reading標志上次從底層取數據的操作是否已完成。
一旦push方法被調用,就會設置為false,表示此次_read()結束。
state.highWaterMark是給緩存大小設置的一個上限閾值。
如果取走n個數據后,緩存中保有的數據不足這個量,便會從底層取一次數據。
howMuchToRead
調用read(n)去取n個數據時,m = howMuchToRead(n)是將從緩存中實際獲取的數據量。
根據以下幾種情況賦值,一旦確定則立即返回:
- state.length為0,state.ended為true。
數據源已枯竭,且緩存為空,無數據可取,m為0. - state.objectMode為true。
n為0,則m為0;
否則m為1,將緩存的第一個元素輸出。 - n是數字。
若n <= 0,則m為0;
若n > state.length,表示緩存中數據量不夠。
此時如果還有數據可讀(state.ended為false),則m為0,同時設置state.needReadable,下次執行read()時doRead會為true,將從底層再取數據。
如果已無數據可讀(state.ended為true),則m為state.length,將剩下的數據全部輸出。
若0 < n <= state.length,則緩存中數據夠用,m為n。 - 其它情況。
state.flowing為true(流動模式),則m為緩存中第一個元素(Buffer)的長度,實則還是將第一個元素輸出;
否則m為state.length,將緩存讀空。
上面的規則中:
- n通常是undefined或0,即不指定讀取的字節數。
- read(0)不會有數據輸出,但從前面對doRead的分析可以看出,是有可能從底層讀取數據的。
- 執行read()時,由於流動模式下數據會不斷輸出,所以每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空。
- objectMode為true時,m為0或1。此時,一次push()對應一次data事件。
綜上所述:
可讀流是獲取底層數據的工具,消耗方通過調用read方法向流請求數據,流再從緩存中將數據返回,或以data事件輸出。
如果緩存中數據不夠,便會調用_read方法去底層取數據。
該方法在拿到底層數據后,調用push方法將數據交由流處理(立即輸出或存入緩存)。
可以結合readable事件和read方法來將數據全部消耗,這是暫停模式的消耗方法。
但更常見的是在流動模式下消耗數據,具體見后面的章節。
數據的流式消耗
所謂“流式數據”,是指按時間先后到達的數據序列。
數據消耗模式
可以在兩種模式下消耗可讀流中的數據:暫停模式(paused mode)和流動模式(flowing mode)。
流動模式下,數據會源源不斷地生產出來,形成“流動”現象。
監聽流的data事件便可進入該模式。
暫停模式下,需要顯示地調用read(),觸發data事件。
可讀流對象readable中有一個維護狀態的對象,readable._readableState,這里簡稱為state。
其中有一個標記,state.flowing, 可用來判別流的模式。
它有三種可能值:
- true。流動模式。
- false。暫停模式。
- null。初始狀態。
調用readable.resume()可使流進入流動模式,state.flowing被設為true。
調用readable.pause()可使流進入暫停模式,state.flowing被設為false。
暫停模式
在初始狀態下,監聽data事件,會使流進入流動模式。
但如果在暫停模式下,監聽data事件並不會使它進入流動模式。
為了消耗流,需要顯示調用read()方法。
const Readable = require('stream').Readable
// 底層數據
const dataSource = ['a', 'b', 'c']
const readable = Readable()
readable._read = function () {
if (dataSource.length) {
this.push(dataSource.shift())
} else {
this.push(null)
}
}
// 進入暫停模式
readable.pause()
readable.on('data', data => process.stdout.write('\ndata: ' + data))
var data = readable.read()
while (data !== null) {
process.stdout.write('\nread: ' + data)
data = readable.read()
}
執行上面的腳本,輸出如下:
data: a
read: a
data: b
read: b
data: c
read: c
可見,在暫停模式下,調用一次read方法便讀取一次數據。
執行read()時,如果緩存中數據不夠,會調用_read()去底層取。
_read方法中可以同步或異步地調用push(data)來將底層數據交給流處理。
在上面的例子中,由於是同步調用push方法,數據會添加到緩存中。
read方法在執行完_read方法后,便從緩存中取數據,再返回,且以data事件輸出。
如果改成異步調用push方法,則由於_read()執行完后,數據來不及放入緩存,
將出現read()返回null的現象。
見下面的示例:
const Readable = require('stream').Readable
// 底層數據
const dataSource = ['a', 'b', 'c']
const readable = Readable()
readable._read = function () {
process.nextTick(() => {
if (dataSource.length) {
this.push(dataSource.shift())
} else {
this.push(null)
}
})
}
readable.pause()
readable.on('data', data => process.stdout.write('\ndata: ' + data))
while (null !== readable.read()) ;
執行上述腳本,可以發現沒有任何數據輸出。
此時,需要使用readable事件:
const Readable = require('stream').Readable
// 底層數據
const dataSource = ['a', 'b', 'c']
const readable = Readable()
readable._read = function () {
process.nextTick(() => {
if (dataSource.length) {
this.push(dataSource.shift())
} else {
this.push(null)
}
})
}
readable.pause()
readable.on('data', data => process.stdout.write('\ndata: ' + data))
readable.on('readable', function () {
while (null !== readable.read()) ;;
})
輸出:
data: a
data: b
data: c
當read()返回null時,意味着當前緩存數據不夠,而且底層數據還沒加進來(異步調用push())。
此種情況下state.needReadable會被設置為true。
push方法被調用時,由於是暫停模式,不會立即輸出數據,而是將數據放入緩存,並觸發一次readable事件。
所以,一旦read被調用,上面的例子中就會形成一個循環:readable事件導致read方法調用,read方法又觸發readable事件。
首次監聽readable事件時,還會觸發一次read(0)的調用,從而引起_read和push方法的調用,從而啟動循環。
總之,在暫停模式下需要使用readable事件和read方法來消耗流。
流動模式
流動模式使用起來更簡單一些。
一般創建流后,監聽data事件,或者通過pipe方法將數據導向另一個可寫流,即可進入流動模式開始消耗數據。
尤其是pipe方法中還提供了back pressure機制,所以使用pipe進入流動模式的情況非常普遍。
本節解釋data事件如何能觸發流動模式。
先看一下Readable是如何處理data事件的監聽的:
Readable.prototype.on = function (ev, fn) { var res = Stream.prototype.on.call(this, ev, fn) if (ev === 'data' && false !== this._readableState.flowing) { this.resume() } // 處理readable事件的監聽 // 省略 return res }
Stream繼承自EventEmitter,且是Readable的父類。
從上面的邏輯可以看出,在將fn加入事件隊列后,如果發現處於非暫停模式,則會調用this.resume(),開始流動模式。
resume()方法先將state.flowing設為true,
然后會在下一個tick中執行flow,試圖將緩存讀空:
if (state.flowing) do { var chunk = stream.read() } while (null !== chunk && state.flowing)
flow中每次read()都可能觸發push()的調用,
而push()中又可能觸發flow()或read()的調用,
這樣就形成了數據生生不息的流動。
其關系可簡述為:
下面再詳細看一下push()的兩個分支:
if (state.flowing && state.length === 0 && !state.sync) { stream.emit('data', chunk) stream.read(0) } else { state.length += state.objectMode ? 1 : chunk.length state.buffer.push(chunk) if (state.needReadable) emitReadable(stream) }
稱第一個分支為立即輸出。
在立即輸出的情況下,輸出數據后,執行read(0),進一步引起_read()和push()的調用,從而使數據源源不斷地輸出。
在非立即輸出的情況下,數據先被添加到緩存中。
此時有兩種情況:
- state.length為0。
這時,在調用_read()前,state.needReadable就會被設為true。
因此,一定會調用emitReadable()。
這個方法會在下一個tick中觸發readable事件,同時再調用flow(),從而形成流動。 - state.length不為0。
由於流動模式下,每次都是從緩存中取第一個元素,所以這時read()返回值一定不為null。
故flow()中的循環還在繼續。
此外,從push()的兩個分支可以看出來,如果state.flowing設為false,第一個分支便不會再進去,也就不會再調用read(0)。
同時第二個分支中引發flow的調用后,也不會再調用read(),這就完全暫停了底層數據的讀取。
事實上,pause方法就是這樣使流從流動模式轉換到暫停模式的。
背壓反饋機制
考慮下面的例子:
const fs = require('fs')
fs.createReadStream(file).on('data', doSomething)
監聽data事件后文件中的內容便立即開始源源不斷地傳給doSomething()。
如果doSomething處理數據較慢,就需要緩存來不及處理的數據data,占用大量內存。
理想的情況是下游消耗一個數據,上游才生產一個新數據,這樣整體的內存使用就能保持在一個水平。
Readable提供pipe方法,用來實現這個功能。
pipe
用pipe方法連接上下游:
const fs = require('fs')
fs.createReadStream(file).pipe(writable)
writable是一個可寫流Writable對象,上游調用其write方法將數據寫入其中。
writable內部維護了一個寫隊列,當這個隊列長度達到某個閾值(state.highWaterMark)時,
執行write()時返回false,否則返回true。
於是上游可以根據write()的返回值在流動模式和暫停模式間切換:
readable.on('data', function (data) {
if (false === writable.write(data)) {
readable.pause()
}
})
writable.on('drain', function () {
readable.resume()
})
上面便是pipe方法的核心邏輯。
當write()返回false時,調用readable.pause()使上游進入暫停模式,不再觸發data事件。
但是當writable將緩存清空時,會觸發一個drain事件,再調用readable.resume()使上游進入流動模式,繼續觸發data事件。
看一個例子:
const stream = require('stream')
var c = 0
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(() => {
var data = c < 6 ? String.fromCharCode(c + 65) : null
console.log('push', ++c, data)
this.push(data)
})
}
})
const writable = stream.Writable({
highWaterMark: 2,
write: function (chunk, enc, next) {
console.log('write', chunk)
}
})
readable.pipe(writable)
輸出:
push 1 A write <Buffer 41> push 2 B push 3 C push 4 D
雖然上游一共有6個數據(ABCDEF)可以生產,但實際只生產了4個(ABCD)。
這是因為第一個數據(A)遲遲未能寫完(未調用next()),所以后面通過write方法添加進來的數據便被緩存起來。
下游的緩存隊列到達2時,write返回false,上游切換至暫停模式。
此時下游保存了AB。
由於Readable總是緩存state.highWaterMark這么多的數據,所以上游保存了CD。
從而一共生產出來ABCD四個數據。
下面使用tick-node將Readable的debug信息按tick分組:
⌘ NODE_DEBUG=stream tick-node pipe.js STREAM 18930: pipe count=1 opts=undefined STREAM 18930: resume ---------- TICK 1 ---------- STREAM 18930: resume read 0 STREAM 18930: read 0 STREAM 18930: need readable false STREAM 18930: length less than watermark true STREAM 18930: do read STREAM 18930: flow true STREAM 18930: read undefined STREAM 18930: need readable true STREAM 18930: length less than watermark true STREAM 18930: reading or ended false ---------- TICK 2 ---------- push 1 A STREAM 18930: ondata write <Buffer 41> STREAM 18930: read 0 STREAM 18930: need readable true STREAM 18930: length less than watermark true STREAM 18930: do read ---------- TICK 3 ---------- push 2 B STREAM 18930: ondata STREAM 18930: call pause flowing=true STREAM 18930: pause STREAM 18930: read 0 STREAM 18930: need readable true STREAM 18930: length less than watermark true STREAM 18930: do read ---------- TICK 4 ---------- push 3 C STREAM 18930: emitReadable false STREAM 18930: emit readable STREAM 18930: flow false ---------- TICK 5 ---------- STREAM 18930: maybeReadMore read 0 STREAM 18930: read 0 STREAM 18930: need readable false STREAM 18930: length less than watermark true STREAM 18930: do read ---------- TICK 6 ---------- push 4 D ---------- TICK 7 ----------
- TICK 0: readable.resume()
- TICK 1: readable在流動模式下開始從底層讀取數據
- TICK 2: A被輸出,同時執行readable.read(0)。
- TICK 3: B被輸出,同時執行readable.read(0)。
writable.write('B')返回false。
執行readable.pause()切換至暫停模式。 - TICK 4: TICK 3中read(0)引起push('C')的調用,C被加到readable緩存中。
此時,writable中有A和B,readable中有C。
這時已在暫停模式,但在readable.push('C')結束前,發現緩存中只有1個數據,小於設定的highWaterMark(2),故准備在下一個tick再讀一次數據。 - TICK 5: 調用read(0)從底層取數據。
- TICK 6: push('D'),D被加到readable緩存中。
此時,writable中有A和B,readable中有C和D。
readable緩存中有2個數據,等於設定的highWaterMark(2),不再從底層讀取數據。
可以認為,隨着下游緩存隊列的增加,上游寫數據時受到的阻力變大。
這種back pressure大到一定程度時上游便停止寫,等到back pressure降低時再繼續。
消耗驅動的數據生產
使用pipe()時,數據的生產和消耗形成了一個閉環。
通過負反饋調節上游的數據生產節奏,事實上形成了一種所謂的拉式流(pull stream)。
用喝飲料來說明拉式流和普通流的區別的話,普通流就像是將杯子里的飲料往嘴里傾倒,動力來源於上游,數據是被推往下游的;拉式流則是用吸管去喝飲料,動力實際來源於下游,數據是被拉去下游的。
所以,使用拉式流時,是“按需生產”。
如果下游停止消耗,上游便會停止生產。
所有緩存的數據量便是兩者的閾值和。
當使用Transform作為下游時,尤其需要注意消耗。
const stream = require('stream')
var c = 0
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(() => {
var data = c < 26 ? String.fromCharCode(c++ + 97) : null
console.log('push', data)
this.push(data)
})
}
})
const transform = stream.Transform({
highWaterMark: 2,
transform: function (buf, enc, next) {
console.log('transform', buf)
next(null, buf)
}
})
readable.pipe(transform)
以上代碼執行結果為:
push a transform <Buffer 61> push b transform <Buffer 62> push c push d push e push f
可見,並沒有將26個字母全生產出來。
Transform中有兩個緩存:可寫端的緩存和可讀端的緩存。
調用transform.write()時,如果可讀端緩存未滿,數據會經過變換后加入到可讀端的緩存中。
當可讀端緩存到達閾值后,再調用transform.write()則會將寫操作緩存到可寫端的緩存隊列。
當可寫端的緩存隊列也到達閾值時,transform.write()返回false,上游進入暫停模式,不再繼續transform.write()。
所以,上面的transform中實際存儲了4個數據,ab在可讀端(經過了_transform的處理),cd在可寫端(還未經過_transform處理)。
此時,由前面一節的分析可知,readable將緩存ef,之后便不再生產數據。
這三個緩存加起來的長度恰好為6,所以一共就生產了6個數據。
要想將26個數據全生產出來,有兩種做法。
第一種是消耗transform中可讀端的緩存,以拉動上游的生產:
readable.pipe(transform).pipe(process.stdout)
第二種是,不要將數據存入可讀端中,這樣可讀端的緩存便會一直處於數據不足狀態,上游便會源源不斷地生產數據:
const transform = stream.Transform({ highWaterMark: 2, transform: function (buf, enc, next) { next() } })
