理解nodejs中的stream(流)


閱讀目錄

一:nodeJS中的stream(流)的概念及作用?

什么是流呢?日常生活中有水流,我們很容易想得到的就是水龍頭,那么水龍頭流出的水是有序且有方向的(從高處往低處流)。我們在nodejs中的流也是一樣的,他們也是有序且有方向的。nodejs中的流是可讀的、或可寫的、或可讀可寫的。
並且流繼承了EventEmitter。因此所有的流都是EventEmitter的實列。

Node.js中有四種基本的流類型,如下:

1. Readable--可讀的流(比如 fs.createReadStream()).
2. Writable--可寫的流(比如 fs.createWriteStream()).
3. Duplex--可讀寫的流
4. Transform---在讀寫過程中可以修改和變換數據的Duplex流。

nodeJS中的流最大的作用是:讀取大文件的過程中,不會一次性的讀入到內存中。每次只會讀取數據源的一個數據塊。
然后后續過程中可以立即處理該數據塊(數據處理完成后會進入垃圾回收機制)。而不用等待所有的數據。

我們先來看一個簡單的流的實列來理解下:

1. 首先我們來創建一個大文件,如下代碼:

const fs = require('fs');
const file = fs.createWriteStream('./big.txt');
// 循環500萬次
for (let i = 0; i <= 5000000; i++) {
  file.write('我是空智,我來測試一個大文件, 你看看我會有多大?');
}

file.end();

我在我項目文件里面新建一個app.js文件,然后把上面的代碼放入到 app.js 里面去,可以看到循環了500萬次后,寫入500萬次數據到 big.txt中去,因此會在文件目錄下生成一個 big.txt文件,如下:

該文件在我磁盤中顯示345兆。

readFile讀取該文件:

下面我們使用 readFile 來讀取該文件看看(readFile會一次性讀入到內存中)。

我們把app.js代碼改成如下:

const fs = require('fs');
const Koa = require('koa');

const app = new Koa();

app.use(async(ctx, next) => {
  const res = ctx.res;
  fs.readFile('./big.txt', (err, data) => {
    if (err) {
      throw err;
    } else {
      res.end(data);
    }
  })
});

app.listen(3001, () => {
  console.log('listening on 3001');
});

當我們運行node app.js 后,我們查看下該代碼占用的內存(12MB)如下:

但是當我們運行 http://localhost:3001/ 后,發現占用的內存(有338MB了)如下:

readFile 它會把 big.txt的文件內容整個的讀進以Buffer格式存入到內存中,然后再寫進返回對象,那么這樣的效率非常低的,並且如果該文件如果是1G或2G以上的文件,那么內存會直接被卡死掉的。或者服務器直接會奔潰掉。

下面我們使用 Node中的createReadStream方法就可以避免占用內存多的情況發生。我們把app.js 代碼改成如下所示:

const fs = require('fs');
const Koa = require('koa');

const app = new Koa();

app.use(async(ctx, next) => {
  const res = ctx.res;
  const file = fs.createReadStream('./big.txt');
  file.pipe(res);
});

app.listen(3001, () => {
  console.log('listening on 3001');
});

然后我們繼續查看內存的使用情況,如下所示:

可以看到我們的占用的內存只有12.8兆。也就是說:createReadStream 在讀取大文件的過程中,不會一次性的讀入到內存中。
每次只會讀取數據源的一個數據塊。這就是流的優點。下面我們來分別看下流吧。

二:fs.createReadStream() 可讀流

其基本使用方法如下:

const fs = require('fs');
const rs = fs.createReadStream('./big.txt', {
  flags: 'r', // 文件的操作方式,同readFile中的配置一樣,這里默認是可讀的是 r
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取文件操作系統內部使用的文件描述符
  start: 0, // 開始讀取的位置
  end: 5, // 結束讀取的位置
  highWaterMark: 1 // 每次讀取的個數
});

fs.createReadStream有以下監聽事件:
具體有哪些事件可以查看官網(http://nodejs.cn/api/stream.html#stream_class_stream_readable) 這邊先截圖出來簡單看看,如下所示:

有了上面這些監聽方法,我們可以先看一個完整的實列,如下代碼:

const fs = require('fs');
const file = fs.createReadStream('./msg.txt', {
  flags: 'r', // 文件的操作方式,同readFile中的配置一樣,這里默認是可讀的是 r
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取文件操作系統內部使用的文件描述符
  start: 0, // 開始讀取的位置
  end: 5, // 結束讀取的位置
  highWaterMark: 1 // 每次讀取的個數
});

file.on('open', () => {
  console.log('開始讀取文件');
});

file.on('data', (data) => {
  console.log('讀取到的數據:');
  console.log(data);
});

file.on('end', () => {
  console.log('文件全部讀取完畢');
});

file.on('close', () => {
  console.log('文件被關閉');
});

file.on('error', (err) => {
  console.log('讀取文件失敗');
});

執行如下圖所示:

從上圖我們可以看到,先打開文件,執行open事件,然后就是不斷的觸發data事件,等data事情讀取結束后會觸發end事件,然后會將文件關閉,觸發close事件。

注意:msg.txt文件內容如下:hello world; 但是上面為什么只讀了 hello了,那是因為我們上面限制了從開始讀取位置讀取,然后到結束位置結束(5). 並且限定了 highWaterMark: 1,每次讀取的個數為1。當然如果我們改成每次讀取的個數為2的話,那么每次會讀2個字符。

pause() 方法:

如果我們在讀取的過程中,想暫停事件的讀取,我們可以使用 ReadStream對象的pause方法暫停data事件的觸發。 如下代碼:

file.on('data', (data) => {
  console.log('讀取到的數據:');
  console.log(data);
  file.pause();
});

然后如下圖所示:

上面暫停了使用 pause()方法,如果我們現在想重新讀取,需要使用 resume()方法,如下所示:

setTimeout(() => {
  file.resume();
}, 100);

執行結果如下:

其他的一些事件,比如 readable事件等,可以看官方文檔 (http://nodejs.cn/api/stream.html#stream_event_readable). 這里就不多分析了。

三:fs.createWriteStream() 可寫流

 如下代碼演示:

const fs = require('fs');
const file = fs.createWriteStream('./1.txt', {
  flags: 'w', // 文件的操作方式,同writeFile中的配置一樣,這里默認是可讀的是 w
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取文件操作系統內部使用的文件描述符
  start: 0, // 開始讀取的位置
  highWaterMark: 1 // 每次寫入的個數
});

let f1 = file.write('1', 'utf-8', () => {
  console.log('寫入成功1111');
});

f1 = file.write('2', 'utf-8', () => {
  console.log('寫入成功2222');
});

f1 = file.write('3', 'utf-8', () => {
  console.log('寫入成功3333');
});

// 標記文件末尾
file.end();

// 處理事件
file.on('finish', () => {
  console.log('寫入完成');
});

file.on('error', (err) => {
  console.log(err);
});

在我項目的根目錄下會生成一個 1.txt文件,里面有123內容。

詳細請看官網(http://nodejs.cn/api/fs.html#fs_fs_writefile_file_data_options_callback

管道流(pipe)

我們需要把我們上面可讀流讀到的數據需要放到可寫流中去寫入到文件里面去。我們可以如下操作代碼:

const fs = require('fs');

// 讀取msg.txt中的字符串 hello world
const msg = fs.createReadStream('./msg.txt', {
  highWaterMark: 5
});

// 寫入到1.txt中
const f1 = fs.createWriteStream('./1.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

// 監聽讀取的數據過程,把讀取的數據寫入到我們的1.txt文件里面去
msg.on('data', (chunk) => {
  f1.write(chunk, 'utf-8', () => {
    console.log('寫入成功');
  });
});

但是實現如上的機制,我們可以使用管道機制,管道提供了一個輸出流到輸入流的機制。通常我們用於從一個流中獲取數據並將數據傳遞到另外一個流中。如下圖所示:

如上代碼,我們可以改成如下代碼:

const fs = require('fs');

// 讀取msg.txt中的字符串 hello world
const msg = fs.createReadStream('./msg.txt', {
  highWaterMark: 5
});

// 寫入到1.txt中
const f1 = fs.createWriteStream('./1.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

const res = msg.pipe(f1);
console.log(res);

如上打印 res后,我們在命令行中查看下基本信息如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM