流(Stream)主要用於順序的數據處理,比如文件讀寫,從第一行開始,直到最后一行,按照文件的書寫順序依次讀寫,和我們讀書時,用手指指着字讀書是一個道理。先用手指指第一個字,讀完第一個字,再把手指指第二個字,再讀第二字,手指按照書本的書寫順序,依次向后,我們依次讀取,直到最后一個字,讀取完畢。再比如網絡數據傳輸, 或任何端到端的數據交換,都是有先后順序的。
流在處理數據時,又與傳統方式有所不同,它不是把數據作為一個整體進行處理(傳統方式),而是把數據分割成一塊一塊的進行處理。文件讀取時,流並不是把文件的所有內容都讀取到內存中,而是只讀取一塊數據,等待這塊數據處理完畢,比如把這塊數據寫入到另外一個文件中,再讀取另一塊數據,循環往復,直到文件讀取完畢。讀取一塊數據,處理一塊數據,流不會讓數據一直在內存中,因此使用流處理數據,可以高效的使用內存,更有可能來處理大文件。再以網絡數據傳輸(網上看視頻)為例,並不是把整個電影都從服務器上下載下來才開始播放,而是一塊一塊地下載,下載一塊,播放一塊。服務器一塊一塊地寫數據,瀏覽器一塊一塊的讀數據。用流處理數據,時間上也比較高效。在Node.js中,有以下4種流:
可讀流(readable stream): 可以從里面讀取數據的流。它負責從數據源里讀取數據(到內存),我們負責從它里面讀取數據,可以把它看作數據源的抽象。
可寫流(writable stream): 可以向其里面寫入數據的流,我們向可寫流里寫入數據(把內存中的數據寫入到可寫流中),它負責向目的地寫入數據,它是目的地的抽象。
雙向流或雙工流(duplex stream): 既可以從它里面讀取數據,也可以向它里面寫入數據。
轉換流(transform stream):它接受一個流,把流里面的內容進行轉換,然后再把流輸出,流的性質沒變,只是流的內容發生了變化,通常轉化的是可讀流。
使用流的方式有兩種,事件(event) 和管道(pipeline)。
流是一個事件觸發器(EventEmitter),完成某項任務后,它就會發出事件,比如,可讀流讀取到數據后,就會發出data事件,我們只需監聽事件,然后在事件處理函數中,做我們想做的事情。流具體發出哪些事件,都是事先實現好的,我們只能監聽這些特定的事件。
先說可讀流,它比較復雜,因為有兩種模式:pause和flow,並且在每種模式下發出的事件也不相同。pause模式是默認模式,就是創建可讀流后,可讀流並不會自動地從數據源中讀取數據,需要我們手動觸發它的讀取操作。可讀流從數據源中讀取數據,完全是由程序(消費數據的一端)驅動的。每一次的讀取操作都是由消費端發起,數據的讀取速度絕對不會比消費數據速度快,用另一句話說,數據是消費端從可讀流中拉取出來的。
這會有一個問題,可讀流從文件中讀取數據的時候,程序就要等待,程序在處理數據的時候,可讀流不會讀取數據。為了提高效率,可讀流內部使用了緩沖技術和提前讀取方式。可讀流不是一次讀取一塊數據,而是一次讀取多塊數據,程序每次從可讀流中拉取一塊數據,可讀流就會再進行讀取操作,填充空余的緩沖區。因此程序需要觸發第一次讀取操作,以后每一次的拉取(消費)數據都會觸發一次讀取操作。
監聽可讀流的readable事件,可以觸發初始讀取操作,並且在數據可讀時,可讀流發出readable事件。read()方法拉取或消費數據,從而觸發另一次讀取操作,所以要在readable事件處理函數中,調用read()方法,read() 方法,如果讀取不到數據,就會返回null,
const fs = require('fs'); const stream = fs.createReadStream('/path/to/file', { highWaterMark: 64 * 1024, // 內部緩沖大小 }); stream.once('readable', consume); // 觸發第一次可讀流讀取操作 async function consume() { let chunk; // 消費數據,從而觸發另一次可讀流的讀取操作 while ((chunk = stream.read()) !== null) { await asyncHandle(chunk); } stream.once('readable', consume); } async function asyncHandle(chunk) { console.log(chunk); }
但這種處理方法有點麻煩,Node.js10 以后,可以使用異步迭代器(for await ... of )來監聽readable 事件,讀取數據,因為輸入流是異步可迭代對象
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk.toString()); } } logChunks(readableStream);
除了在異步迭代器中,直接處理數據,也可以把流中的數據暫時存儲起來,以便日后消費,如果要處理異常,可以用try/catch 把for await 的處理包起來
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); readableStream.setEncoding('utf-8'); async function readableToString(readable) { let result = ''; try { for await (const chunk of readable) { result += chunk; } } catch (error) { } return result; } readableToString(readableStream).then(console.log);
說完pause模式,再說flow模式。flow模式,就是可讀流自動從數據源中讀取數據,讀到數據后,推送數據到程序中(消費端),消費端只管處理數據。怎么從pause模式切換到flow模式呢?監聽可讀流的data事件,流就自動轉化成flow模式,監聽可讀流的data事件后,可讀流就會被驅動,到數據源中讀取數據,當讀取到一塊數據后,可讀流就會把數據推送到程序中。推送完成,它就再讀取另一塊數據。在flow模式下,數據的讀取是可讀流自己控制的,程序只管處理就可以。
看起來,使用flow模式非常簡單,監聽data事件,並處理數據就可以了
const fs = require('fs'); const readable = fs.createReadStream('./data.txt'); readable.on('data', data => { console.log(data); })
但有一個問題,消費端並沒有方法來控制數據產生的速度(可讀流推送數據的速度),如果消費端消費數據很慢,那就要更多消費端,或者在消費端把沒有消費的數據進行緩存,
這就導致無限緩存和無法控制內存使用。在flow模式下,可讀流控制着讀數據的過程,一有數據,就立即推送到消費端(data listener),沒有緩存,霜要用backpressure mechanism
const fs = require('fs'); const stream = fs.createReadStream('/path/to/file'); // subscribing to `data` event moves stream to flowing mode stream.on('data', chunk => { queue.push(chunk); if (queue.length > MAX_INFLIGH_CHUNKS) { stream.pause(); // poor man's backpressure queue.on('drain', () => stream.resume()); } }); // ... separate consuming process while (queue.size()) { await process(queue.deque()); } queue.emit('drain');
說完了可讀流,再說可寫流,我們向可寫流里面寫數據,可寫流再向目的地寫數據。
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); const writeStream = fs.createWriteStream('./result.txt'); readableStream.on('data', data => { writeStream.write(data); // 調用write方法向可寫流里面寫數據 })
這里面存在一個問題,程序向可寫流里寫入數據的速度和可寫流向目的地的寫入數據的速度不一致?寫入數據稱為生產者,可寫流稱為消費者。產生數據太快,來不及消費?產生數據太慢,又無法消費?那就在生產者和消費者之間創建一個buffer(緩沖區)。生產者寫入數據到緩沖區,消費者從緩沖區讀取數據,寫入到目的地。如果程序寫入還是太快,緩沖區就會溢出,緩沖區滿了,就要告訴生產都,不能再寫數據了。如果寫入的速度慢,緩沖區是空的,那么可讀流就要停止讀取。
可讀流,就是內置了buffer(緩沖區),它是一個FIFO隊列,程序向可寫流里寫入數據,就是寫到buffer中,可寫流向目的地寫入數據,就是從隊列(緩沖區)中讀取數據。在寫的過程中,可寫流還會告訴你,它內部的buffer狀況,buffer是否滿了,還是buffer中仍有空間。buffer 的大小是由highWaterMark 決定的。當調用write()向可寫流寫入數據的進候,它會返回true or false 來表示內部buffer的狀態,當寫入成功后,buffer仍有空閑,它就會返回true,表示仍然可以寫入。如果寫入后,buffer滿了,返回false,表示需要backpressure。然而,這個返回值只是一個建議,如果向寫入流寫入數據時不遵循這個建議,可寫流會繼續緩沖數據,從而導致過多的內存消耗。如果write()返回false,就要停止向可寫流寫入數據。然而什么候才能繼續寫入呢?只要可寫流內置的buffer有空閑,或每一次可寫流從滿的buffer中取了數據,它就會發出“emit”事件,收到drain事件,就可以繼續向可寫流中寫入數據了。
const fs = require('fs'); const util = require('util'); const stream = require('stream'); const { once } = require('events'); const readableStream = fs.createReadStream('./data.txt'); const writeStream = fs.createWriteStream('./result.txt'); const finished = util.promisify(stream.finished); async function writeIterableToFile(readable, writable) { for await (const chunk of readable) { if (!writable.write(chunk)) { await once(writable, 'drain'); } } writable.end(); // 程序不會再向可寫流中寫數據了 // finished:可寫流把把所有的數據都寫入到的目的地中。 await finished(writable); } writeIterableToFile(readableStream, writeStream)
當可讀流的目的地是可寫流時,以上處理方式就有點麻煩了,有一種更好的方式,那就是pipe管道。所有流都實現了pipeline模式,pipeline模式就是描述了數據流過不同的階段,像下圖所示
源數據經過可讀流,最終流經可寫流,到達目的地,中間可能經過0個,1個或多個轉換流,對數據進行轉換。數據被可讀流讀取到后,可以流向(piped to)其它流, 可讀流讀取數據,由可寫流消費時,就是把可讀流導入到可寫流,
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); const writeStream = fs.createWriteStream('./result.txt'); readableStream.pipe(writeStream);
pipe() 操作,前一個的輸出變成后面一個的輸入。readableStream可讀流的輸出,就是讀取到的數據,它正好是可寫流的輸入,因此,就可以用pipe把這兩個鏈接起來。pipe()的操作就相當於以下幾個步驟
可讀流監聽data事件,驅使可讀流去讀取數據,同時在事件監聽函數中,調用可寫流的write()方法寫入數據。如果write()返 回false, 可讀流就就要暫停讀取。同時,可寫流要監聽'drain'事件,在事件中調用可讀流的resume()方法。一旦在可讀流的buffer中有空間,可寫流發出'drain'事件,繼續可讀流的讀取數據。
const fs = require('fs'); const readable = fs.createReadStream('./biji.txt'); const writeable = fs.createWriteStream('anotehr.txt'); readable.on('data', (data) => { if (!writeable.write(data)) { readable.pause(); writeable.on('drain', () => { readable.resume(); }) } })
pipe() 把可讀流的模式轉換成了flow模式,並且也處理了backpressure,隱藏了內部的detail,使用簡單。但怎么知道文件都寫完了呢?使用pipe()方法,並不會關閉流發出事件。事件依然有效,它會告訴你流發生了什么事情。在pipe()方法之前,監聽可寫流的end事件。
const fs = require('fs'); const readStream = fs.createReadStream('test.txt'); const writeStream = fs.createWriteStream('output.txt'); writeStream.on('end', () => { console.log('Done'); }); readStream.pipe(writeStream);
除了使用Node.js提供的流,還可以在Node.js提供的流的基礎上自定義流。有兩種方式來自定義流,簡單構造和繼承。簡單構造,就是創建stream.Readable和Stream.Write 的實例對象。比如創建可讀流,就是創建一個Stream.Readable的實例,因為可讀流是從數據源中讀取數據,所以它有一個push方法,向它里面push數據,push進去數據,就相當於可讀流從數據源中讀取了數據,push(null)就表示不push數據了。
const Stream = require('stream'); const readableStream = new Stream.Readable(); readableStream.push('ping'); readableStream.push('pong!'); readableStream.push(null);
Readable.from() 從一個可迭代對象中創建一個可讀流
const Stream = require('stream');
async function* gen() { yield 'hello'; yield 'stream'; } const readableFromIterator = Stream.Readable.from(gen());
創建可寫流,就是創建Stream.Writable 的實例,然后實現_write方法,_write就是可寫流向目的地寫入數據。不要寫write方法搞混了,write()方法,是向可寫流中寫入數據
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()); // 寫到控制台上 next(); // 表示寫入成功 } writableStream.write('hello, '); writableStream.end('world!');
使用繼承實現可讀流,就是繼承Readable,然后實現_read方法,_read就是從數據源中讀取數據,也是使用push方法
const stream = require('stream'); const data = require('./result'); // json 數據 class JsonDataReadable extends stream.Readable { constructor(options) { super(options); this.readIndex = 0; } _read(size) { // 讀取的默認大小(字節為單位) let okToSend = true; while (okToSend) { // push 返回true或false,表示還能不能向可讀流中push數據 okToSend = this.push(data.text.substr(this.readIndex, size)); this.readIndex += size; if (this.readIndex >data.text.length) { this.push(null); okToSend = false; } } } }
在pause模式下,當程序中調有read()方法時,_read()方法就會被調用,從源數據中讀取數據,push到buffer中,buffer滿了,它就不要push了,所以push返回true or false,表示要不要push數據到buffer。如果是flow模式下,push給消費者,消費者受不了,調pause()方法,push也要暫停。
消費這個readable, 可以在這個js文件同級目錄下,建立一個result.json 方件。
{ "text": "The return value of the pipe() method is the destination stream, which is a very convenient thing that lets us chain multiple pipe() calls, like this:" }
然后在這個js 文件寫
const fs = require('fs'); const Reader = new JsonDataReadable(); const fileWriter = fs.createWriteStream('output.txt'); Reader.pipe(fileWriter);
使用繼承實現可寫流,就是繼承Wrtable,然后重寫_write方法,比如接收到數據后向服務器發送數據
const request = require('request'); const stream = require('stream'); class SlackWritable extends stream.Writable { constructor(options) { options = options || {}; super(options); this.webUrl = options.webUrl; } _write(chunk, encoding, callback) { if (Buffer.isBuffer(chunk)) { chunk = chunk.toString('utf8'); } request.post({ url: this.webUrl, json: true, body: {text: chunk} }, callback); } }
_write()方法接受三個參數 chunk(數據), encoding(字符編碼), 和callback(回調函數). chunk(數據) 就是從上游發送過來的的數據,它可能有多種形式,buffer,字符串,對象。默認情況下是buffer。 由於向服務器發送數據,需要字符串,所以如果數據是buffer的話,要轉化成字符串。你可能會問,為什么不使用encoding參數?如果chunk是字符串,encoding參數才有效,如果chunk是buffer,encoding參數要被忽略。如果buffer轉化成字符串,它就會post 到webUrl。post完成后,調用回調函數,告訴上游,發送過來數據已經通過_write方法處理完了,可以再發送數據了。寫動作,一旦開始,就會調用_write方法,直到寫完,_write不會再調。使用的話,
const slackWriter = new SlackWritable({ webUrl: “https://www.example.com”});
使用繼承實現轉化流,就是繼承Transform, 重寫_transfrom方法,和可選的_flush方法。 _transform的三個參數和_write()的三個參數一致,都是 chunk(數據), encoding(字符編碼), 和callback(回調函數),它的作用就是讀取上游傳遞過來的數據,然后把轉換后的數據再push回去。_flush則是,如果上游發送的數據完了,但在 transform流中還有數據,那就在_flush中把剩余的數據push出去,它接受一個回調函數,也是通知的作用
const { Transform } = require('stream'); class MyTransform extends Transform { _transform(chunk, encoding, callback) { var upperChunk = chunk.toString().toUpperCase(); this.push(upperChunk); // 調用push方法,向流中push數據。 callback(); } }
使用方法就是,在可讀流和可寫流的中間
const fs = require('fs'); const reader = fs.createReadStream('./abc.txt'); const writer = fs.createWriteStream('./def.txt'); const trans = new MyTransform(); reader.pipe(trans).pipe(writer);