概念
流(stream)是 Node.js 中處理流式數據的抽象接口。 stream 模塊用於構建實現了流接口的對象。
Node.js 提供了多種流對象。 例如,HTTP 服務器的請求和 process.stdout 都是流的實例。
流可以是可讀的、可寫的、或者可讀可寫的。 所有的流都是 EventEmitter 的實例。
訪問 stream 模塊:
const stream = require('stream');
盡管理解流的工作方式很重要,但是 stream 模塊主要用於開發者創建新類型的流實例。 對於以消費流對象為主的開發者,極少需要直接使用 stream 模塊。
Node.js 中有四種基本的流類型:
- Writable - 可寫入數據的流(例如 fs.createWriteStream())。
- Readable - 可讀取數據的流(例如 fs.createReadStream())。
- Duplex - 可讀又可寫的流(例如 net.Socket)。
- Transform - 在讀寫過程中可以修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。
此外,該模塊還包括實用函數 stream.pipeline()、stream.finished() 和 stream.Readable.from()。
盜圖
如何獲取內存中的流
可寫流和可讀流都會在內部的緩沖器中存儲數據,可以分別使用的 writable.writableBuffer 或 readable.readableBuffer 來獲取。
細節
可讀流
兩種模式
- 流動模式(不用打,自己動):數據
自動
從底層系統讀取,並通過EventEmitter接口的事件盡可能快的提供剛給應用程序 - 暫停模式(打一下,動一下):必須顯示調用
stream.read()
讀取數據塊
其實,所有可讀流初始
的時候都處於暫停模式
,不過可以通過以下方法切換到流動模式
- 添加
data
事件句柄。 - 調用
stream.resume()
方法。 - 調用
stream.pipe()
方法將數據發送到可寫流。
當然,能切到暫停模式
,肯定也能切到流動模式
- 如果沒有管道目標,則調用
stream.pause()
- 如果有管道目標,則移除所有的管道目標。調用
stream.unpipe()
可以移除多個管道目標。
為了向后兼容,移除 'data' 事件句柄不會自動地暫停流。
如果有管道目標,一旦目標變為 drain 狀態並請求接收數據時,則調用 stream.pause() 也不能保證流會保持暫停模式。
如果可讀流切換到流動模式,且沒有可用的消費者來處理數據,則數據將會丟失。 例如,當調用 readable.resume() 時,沒有監聽 'data' 事件或 'data' 事件句柄已移除。
添加 readable
事件句柄會使流自動停止流動,並通過 readable.read()
消費數據。 如果 readable
事件句柄被移除,且存在 data
事件句柄,則流會再次開始流動
demo
流動模式
const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))
rs.setEncoding('utf8')
rs.on('data', (data) => {
console.log(data)
})
暫停模式
const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))
rs.setEncoding('utf8')
rs.on('readable', () => {
let d = rs.read(1) // 要讀取的數據的字節數。
console.log(d)
})
read方法: 參數 可選 [size]
如果沒有指定 size 參數,則返回內部緩沖中的所有數據。
使用 readable.read()
處理數據時, while 循環是必需的。
read方法消耗的是內存中的數據
當read方法返回的是null
的時候,會觸發 流監聽的 end
事件
不使用read消耗內存中的流數據,則不會觸發end
預覽一波
遇到的問題
process.stdin.setEncoding('utf8');
process.stdin.on('readable', () => {
let chunk;
while ((chunk = process.stdin.read()) !== null) {
process.stdout.write(`數據: ${chunk}長度${chunk.length}\n`);
}
});
process.stdin.on('end', () => {
process.stdout.write('結束\n');
process.exit(1);
});
上面的代碼作用是讀取用戶在terminal上的輸出,然后輸出內容和長度,但是執行的時候,總是無法執行到end,不僅如此,就算內容為空,返回的長度也不為0,這讓我很疑惑。
最后發現是堅挺的end
事件,只有當read方法返回為null的時候才會觸發,所以沒有觸發,而問題就在於返回的內容是換行符,不同系統下換行符不一樣,mac下是\n
,所以我們需要處理一下read返回的內容,然后手動end
查看字符串中回車符可以使用:
console.log(JSON.stringify(chunk));
修正后的代碼為:
process.stdin.setEncoding('utf8');
process.stdin.on('readable', () => {
let chunk;
while ((chunk = process.stdin.read()) !== null) {
chunk = chunk.replace(/\n/g, '');
if (!chunk.length) {
console.log('輸入為空');
return process.stdin.emit('end');
}
process.stdout.write(`數據: ${chunk}長度${chunk.length}\n`);
}
});
process.stdin.on('end', () => {
process.stdout.write('結束\n');
process.exit(1);
});
執行結果: