node - 流 淺析


概念

流(stream)是 Node.js 中處理流式數據的抽象接口。 stream 模塊用於構建實現了流接口的對象。

Node.js 提供了多種流對象。 例如,HTTP 服務器的請求和 process.stdout 都是流的實例。

流可以是可讀的、可寫的、或者可讀可寫的。 所有的流都是 EventEmitter 的實例。

訪問 stream 模塊:

const stream = require('stream');

盡管理解流的工作方式很重要,但是 stream 模塊主要用於開發者創建新類型的流實例。 對於以消費流對象為主的開發者,極少需要直接使用 stream 模塊。

Node.js 中有四種基本的流類型:

  • Writable - 可寫入數據的流(例如 fs.createWriteStream())。
  • Readable - 可讀取數據的流(例如 fs.createReadStream())。
  • Duplex - 可讀又可寫的流(例如 net.Socket)。
  • Transform - 在讀寫過程中可以修改或轉換數據的 Duplex 流(例如 zlib.createDeflate())。

此外,該模塊還包括實用函數 stream.pipeline()、stream.finished() 和 stream.Readable.from()。

盜圖

如何獲取內存中的流

可寫流和可讀流都會在內部的緩沖器中存儲數據,可以分別使用的 writable.writableBuffer 或 readable.readableBuffer 來獲取。
細節

可讀流

兩種模式

  • 流動模式(不用打,自己動):數據自動從底層系統讀取,並通過EventEmitter接口的事件盡可能快的提供剛給應用程序
  • 暫停模式(打一下,動一下):必須顯示調用stream.read()讀取數據塊

其實,所有可讀流初始的時候都處於暫停模式,不過可以通過以下方法切換到流動模式

  • 添加 data 事件句柄。
  • 調用 stream.resume() 方法。
  • 調用 stream.pipe() 方法將數據發送到可寫流。

當然,能切到暫停模式,肯定也能切到流動模式

  • 如果沒有管道目標,則調用 stream.pause()
  • 如果有管道目標,則移除所有的管道目標。調用stream.unpipe()可以移除多個管道目標。

為了向后兼容,移除 'data' 事件句柄不會自動地暫停流。

如果有管道目標,一旦目標變為 drain 狀態並請求接收數據時,則調用 stream.pause() 也不能保證流會保持暫停模式。

如果可讀流切換到流動模式,且沒有可用的消費者來處理數據,則數據將會丟失。 例如,當調用 readable.resume() 時,沒有監聽 'data' 事件或 'data' 事件句柄已移除。

添加 readable 事件句柄會使流自動停止流動,並通過 readable.read() 消費數據。 如果 readable 事件句柄被移除,且存在 data 事件句柄,則流會再次開始流動

demo

流動模式

const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))

rs.setEncoding('utf8')

rs.on('data', (data) => {
    console.log(data)
})

暫停模式

const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))

rs.setEncoding('utf8')

rs.on('readable', () => {
    let d = rs.read(1) // 要讀取的數據的字節數。
    console.log(d)
})

read方法: 參數 可選 [size]

如果沒有指定 size 參數,則返回內部緩沖中的所有數據。

使用 readable.read() 處理數據時, while 循環是必需的。
read方法消耗的是內存中的數據

當read方法返回的是null的時候,會觸發 流監聽的 end 事件
不使用read消耗內存中的流數據,則不會觸發end

預覽一波

遇到的問題

process.stdin.setEncoding('utf8');

process.stdin.on('readable', () => {
    let chunk;
    while ((chunk = process.stdin.read()) !== null) {
        process.stdout.write(`數據: ${chunk}長度${chunk.length}\n`);
    }
});

process.stdin.on('end', () => {
    process.stdout.write('結束\n');
    process.exit(1);
});

上面的代碼作用是讀取用戶在terminal上的輸出,然后輸出內容和長度,但是執行的時候,總是無法執行到end,不僅如此,就算內容為空,返回的長度也不為0,這讓我很疑惑。

最后發現是堅挺的end事件,只有當read方法返回為null的時候才會觸發,所以沒有觸發,而問題就在於返回的內容是換行符,不同系統下換行符不一樣,mac下是\n,所以我們需要處理一下read返回的內容,然后手動end
查看字符串中回車符可以使用:

console.log(JSON.stringify(chunk));

修正后的代碼為:

process.stdin.setEncoding('utf8');

process.stdin.on('readable', () => {
    let chunk;
    while ((chunk = process.stdin.read()) !== null) {
        chunk = chunk.replace(/\n/g, '');
        if (!chunk.length) {
            console.log('輸入為空');
            return process.stdin.emit('end');
        }
        process.stdout.write(`數據: ${chunk}長度${chunk.length}\n`);
    }
});

process.stdin.on('end', () => {
    process.stdout.write('結束\n');
    process.exit(1);
});

執行結果:

中文文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM