node.js中stream流中可讀流和可寫流的使用


node.js中的流 stream 是處理流式數據的抽象接口。node.js 提供了很多流對象,像http中的request和response,和 process.stdout 都是流的實例。

流可以是 可讀的,可寫的,或是可讀可寫的。所有流都是 events 的實例。

 

一、流的類型

node.js中有四種基本流類型:

1、Writable 可寫流 (例:fs.createWriteStream() )

2、Readable 可讀流 (例:fs.createReadStream() )

3、Duplex 可讀又可寫流 (例:net.Socket ) 

4、Transform 讀寫過程中可修改或轉換數據的 Duplex 流 (例:zlib.createDeflate() )

 

二、流中的數據有兩種模式

1、二進制模式,都是 string字符串  和 Buffer。

2、對象模式,流內部處理的是一系統普通對象。

 

三、可讀流的兩種模式

1、流動模式 ( flowing ) ,數據自動從系統底層讀取,並通過事件,盡可能快地提供給應用程序。

2、暫停模式 ( paused ),必須顯式的調用 read() 讀取數據。

可讀流 都開始於暫停模式,可以通過如下方法切換到流動模式:

1、添加 'data' 事件回調。

2、調用 resume()。

3、調用 pipe()。

可讀流通過如下方法切換回暫停模式:

1、如果沒有管道目標,調用 pause()。

2、如果有管道目標,移除所有管道目標,調用 unpipe() 移除多個管道目標。

 

四、創建可讀流,並監聽事件

const fs = require('fs');

//創建一個文件可讀流
let rs = fs.createReadStream('./1.txt', {
    //文件系統標志
    flags: 'r',
    //數據編碼,如果調置了該參數,則讀取的數據會自動解析
    //如果沒調置,則讀取的數據會是 Buffer
    //也可以通過 rs.setEncoding() 進行設置
    encoding: 'utf8',
    //文件描述符,默認為null
    fd: null,
    //文件權限
    mode: 0o666,
    //文件讀取的開始位置
    start: 0,
    //文件讀取的結束位置(包括結束位置)
    end: Infinity,
    //讀取緩沖區的大小,默認64K
    highWaterMark: 3
});

//文件被打開時觸發
rs.on('open', function () {
    console.log('文件打開');
});

//監聽data事件,會讓當前流切換到流動模式
//當流中將數據傳給消費者后觸發
//由於我們在上面配置了 highWaterMark 為 3字節,所以下面會打印多次。
rs.on('data', function (data) {
    console.log(data);
});

//流中沒有數據可供消費者時觸發
rs.on('end', function () {
    console.log('數據讀取完畢');
});

//讀取數據出錯時觸發
rs.on('error', function () {
    console.log('讀取錯誤');
});

//當文件被關閉時觸發
rs.on('close', function () {
    console.log('文件關閉');
});

注意,'open' 和 'close' 事件並不是所有流都會觸發。

當們監聽'data'事件后,系統會盡可能快的讀取出數據。但有時候,我們需要暫停一下流的讀取,操作其他事情。

這時候就需要用到 pause() 和 resume() 方法。

const fs = require('fs');

//創建一個文件可讀流
let rs = fs.createReadStream('./1.txt', {
    highWaterMark: 3
});

rs.on('data', function (data) {
    console.log(`讀取了 ${data.length} 字節數據 : ${data.toString()}`);

    //使流動模式的流停止觸發'data'事件,切換出流動模式,數據都會保留在內部緩存中。
    rs.pause();

    //等待3秒后,再恢復觸發'data'事件,將流切換回流動模式。
    setTimeout(function () {
        rs.resume();
    }, 3000);
});

可讀流的 'readable' 事件,當流中有數據可供讀取時就觸發。

注意當監聽 'readable' 事件后,會導致流停止流動,需調用 read() 方法讀取數據。

注意 on('data'),on('readable'),pipe() 不要混合使用,會導致不明確的行為。

const fs = require('fs');

let rs = fs.createReadStream('./1.txt', {
    highWaterMark: 1
});

//當流中有數據可供讀取時就觸發
rs.on('readable', function () {
    let data;
    //循環讀取數據
    //參數表示要讀取的字節數
    //如果可讀的數據不足字節數,則返回緩沖區剩余數據
    //如是沒有指定字節數,則返回緩沖區中所有數據
    while (data = rs.read()) {
        console.log(`讀取到 ${data.length} 字節數據`);
        console.log(data.toString());
    }
});

 

五、創建可寫流,並監聽事件

const fs = require('fs');

//創建一個文件可寫流
let ws = fs.createWriteStream('./1.txt', {
    highWaterMark: 3
});

//往流中寫入數據
//參數一表示要寫入的數據
//參數二表示編碼方式
//參數三表示寫入成功的回調
//緩沖區滿時返回false,未滿時返回true。
//由於上面我們設置的緩沖區大小為 3字節,所以到寫入第3個時,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));

function writeData() {
    let cnt = 9;
    return function () {
        let flag = true;
        while (cnt && flag) {
            flag = ws.write(`${cnt}`);
            console.log('緩沖區中寫入的字節數', ws.writableLength);
            cnt--;
        }
    };
}

let wd = writeData();
wd();

//當緩沖區中的數據滿的時候,應停止寫入數據,
//一旦緩沖區中的數據寫入文件了,並清空了,則會觸發 'drain' 事件,告訴生產者可以繼續寫數據了。
ws.on('drain', function () {
    console.log('可以繼續寫數據了');
    console.log('緩沖區中寫入的字節數', ws.writableLength);
    wd();
});

//當流或底層資源關閉時觸發
ws.on('close', function () {
    console.log('文件被關閉');
});

//當寫入數據出錯時觸發
ws.on('error', function () {
    console.log('寫入數據錯誤');
});

寫入流的 end() 方法 和 'finish' 事件監聽

const fs = require('fs');

//創建一個文件可寫流
let ws = fs.createWriteStream('./1.txt', {
    highWaterMark: 3
});

//往流中寫入數據
//參數一表示要寫入的數據
//參數二表示編碼方式
//參數三表示寫入成功的回調
//緩沖區滿時返回false,未滿時返回true。
//由於上面我們設置的緩沖區大小為 3字節,所以到寫入第3個時,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));

//調用end()表明已經沒有數據要被寫入,在關閉流之前再寫一塊數據。
//如果傳入了回調函數,則將作為 'finish' 事件的回調函數
ws.end('最后一點數據', 'utf8');

//調用 end() 且緩沖區數據都已傳給底層系統時觸發
ws.on('finish', function () {
    console.log('寫入完成');
});

寫入流的 cork() 和 uncork() 方法,主要是為了解決大量小塊數據寫入時,內部緩沖可能失效,導致的性能下降。

const fs = require('fs');

let ws = fs.createWriteStream('./1.txt', {
    highWaterMark: 1
});

//調用 cork() 后,會強制把所有寫入的數據緩沖到內存中。
//不會因為寫入的數據超過了 highWaterMark 的設置而寫入到文件中。
ws.cork();
ws.write('1');
console.log(ws.writableLength);
ws.write('2');
console.log(ws.writableLength);
ws.write('3');
console.log(ws.writableLength);

//將調用 cork() 后的緩沖數據都輸出到目標,也就是寫入文件中。
ws.uncork();

注意 cork() 的調用次數要與 uncork() 一致。

const fs = require('fs');

let ws = fs.createWriteStream('./1.txt', {
    highWaterMark: 1
});

//調用一次 cork() 就應該寫一次 uncork(),兩者要一一對應。
ws.cork();
ws.write('4');
ws.write('5');
ws.cork();
ws.write('6');

process.nextTick(function () {
    //注意這里只調用了一次 uncork()
    ws.uncork();
    //只有調用同樣次數的 uncork() 數據才會被輸出。
    ws.uncork();
});

 

 六、可讀流的 pipe() 方法

pipe() 方法類似下面的代碼,在可讀流與可寫流之前架起一座橋梁。

const fs = require('fs');

//創建一個可讀流
let rs = fs.createReadStream('./1.txt', {
    highWaterMark: 3
});

//創建一個可寫流
let ws = fs.createWriteStream('./2.txt', {
    highWaterMark: 3
});

rs.on('data', function (data) {
    let flag = ws.write(data);
    console.log(`往可寫流中寫入 ${data.length} 字節數據`);
    //如果寫入緩沖區已滿,則暫停可讀流的讀取
    if (!flag) {
        rs.pause();
        console.log('暫停可讀流');
    }
});

//監控可讀流數據是否讀完
rs.on('end', function () {
    console.log('數據已讀完');
    //如果可讀流讀完了,則調用 end() 表示可寫流已寫入完成
    ws.end();
});

//如果可寫流緩沖區已清空,可以再次寫入,則重新打開可讀流
ws.on('drain', function () {
    rs.resume();
    console.log('重新開啟可讀流');
});

我們用 pipe() 方法完成上面的功能。

const fs = require('fs');

//創建一個可讀流
let rs = fs.createReadStream('./1.txt', {
    highWaterMark: 3
});

//創建一個可寫流
let ws = fs.createWriteStream('./2.txt', {
    highWaterMark: 3
});

let ws2 = fs.createWriteStream('./3.txt', {
    highWaterMark: 3
});

//綁定可寫流到可讀流,自動將可讀流切換到流動模式,將可讀流的所有數據推送到可寫流。
rs.pipe(ws);

//可以綁定多個可寫流
rs.pipe(ws2);

我們也可以用 unpipe() 手動的解綁可寫流。

const fs = require('fs');

//創建一個可讀流
let rs = fs.createReadStream('./1.txt', {
    highWaterMark: 3
});

//創建一個可寫流
let ws = fs.createWriteStream('./2.txt', {
    highWaterMark: 3
});

let ws2 = fs.createWriteStream('./3.txt', {
    highWaterMark: 3
});

rs.pipe(ws);
rs.pipe(ws2);

//解綁可寫流,如果參數沒寫,則解綁所有管道
setTimeout(function () {
    rs.unpipe(ws2);
}, 0);

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM