摘要:在學習Node的過程中,Stream流是常用的東東,在了解怎么使用它的同時,我們應該要深入了解它的具體實現。今天的主要帶大家來寫一寫可讀流的具體實現,就過來,就過來,上碼啦!
碼前准備
在寫代碼之前我們首先要整理下思路,我們要做什么,以及怎么來做。本篇文章以文件可讀流為例,一個可讀流大體分為四步:
- 初始化參數
- 打開文件
- 讀取文件
- 結束,關閉文件
一、先來一波調用
- 1.先引入一個readStream模塊
- 2.實例化並傳入參數
var readStream=require('readStream.js');
var rs=new readStream('test.txt',{
flags:'r', //打開文件的模式
autoClose:true, //結束是否自動關閉
encoding:'utf8', //字符編碼
highWaterMark:3, //每次讀取的字節數
start:0, //從下標為多少的位置開始讀取,默認以0開始
end:3, //結束下標位置
});
- 3.監聽data事件,接收讀到的值
對於有讀取數據時,會觸發data事件,我們在此先監聽data事件。關於事件的監聽和觸發,在node中用的是‘events’模塊,如果不太了解的盆友,可以關注我哈,后續的文章會介紹到哦!本篇的重點是流,我們就先直接用了。
rs.on('data',function(data){
console.log(data);
})
二、接下來定義readStream這個模塊
1.因為我們以文件的可讀流來做的,在此我們要引入一個文件模塊。還有一個事件模塊,並且要繼承它,每一個可讀流都是‘events’的一個實例。
首先我們先初始化參數:
var fs=require('fs');
var EventEmitter=require('events');
class readStream extends EventEmitter{
constructor(path,options){
super();
this.path=path;
this.flags=opitons.flags||'r';
this.autoClose=opitons.autoClose||true;
this.encoding=options.encoding||null;
this.highWaterMark=options.highWaterMark||64*1024;
this.start=options.start||0;
this.end=options.end;
this.pos=this.start;
this.buffer=Buffer.alloc(this.highWaterMark);
this.flowing=null;
this.open();
}
}
以上除了初始化傳遞進來的參數,還加了幾個pos,buffer,open(),flowing,為什么要加這些呢?這些值是來做什么用的?我們在此做出解答:
- pos:是用在存儲每一次讀取文件時,讀取的位置。比如當highWater<end時,data有可能會觸發多次,每次的位置應該是上次讀取位置的下一個,pos就是用來記錄這個位置的下標,所以初始值為start。
- buffer:分配一個長度為this.highWaterMark的Buffer。
- flowing:是指當前狀態是否是流動的,有三個值,初始為null。當開始監聽data事件時,值為true,則開始讀取文件。當值為false時,暫停讀取文件。為什么剛剛我說data可能會多次觸發,因為當flowing被設為false時,data事件將停止觸發。想要改變flowing的值,node提供了兩個方法暫停pause()和恢復resume()。
2.讀取一個文件應該先打開文件,我們來定義該方法:
open(){
fs.open(this.path,this.flags,(err,fd)=>{
if(err){
this.emit('err');
}
this.fd=fd;
this.emit('open');
});
}
2.1在打開文件的時候,如果文件打開報錯,我們除了要觸發錯誤事件外,還要注意一個參數。autoClose是指在文件讀取完畢或拋出錯誤后,自己關閉文件。
於是我們根據這個參數值,在現有的open方法中對拋錯的情況做出優化。
open(){
fs.open(this.path,this.flags,(err,fd)=>{
if(err){
if(autoClose){
if(typeof this.fd === 'number'){
fs.close(this.fd,()=>{
this.emit('close');
});
}
this.emit('close');
}
this.emit('err');
}
this.fd=fd;
this.emit('open');
})
}
3.打開文件后,並不是立馬讀取,而是要檢查是否有data事件綁定監聽!
對此,我們要在構造函數內檢查如果添加了data的事件監聽
class readStream extends EventEmitter{
constructor(path,options){
super();
...
this.on('newListener',(eventName,callback)=>{
if(eventName=='data'){
this.flowing=true;
this.read();
}
})
}
}
完成以上步驟后,我們要做的就是讀取文件內容啦,下面來自定義一個read方法:
- 先判斷是否有讀取到內容,若有讀取到內容
- --改變下次讀取的起點位置
- --獲取到相同長度的Buffer空間
- --若設了字符編碼,要將data作相應的轉換
- --判斷此時this.pos的位置是否已超出了結束位置
- --如果folwing為true,則再次調用read方法
- 讀取不到內容則拋出一個錯誤,並關閉文件
代碼如下
read(){
let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark;
fs.read(this.fd,this.buffer,0,howToLength,this.pos,(err,bytesBase)=>{
if(bytesBase>0){
this.pos+=bytesBase;
this.buf=this.buffer.slice(0,bytesBase);
let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString();
this.emit('data',data);
if(this.end>this.pos){
this.emit('end');
if(autoClose){
if(typeof this.fd === 'number'){
fs.close(this.fd,()=>{
this.emit('close');
});
}
this.emit('close');
}
}
if(flowing){
this.read();
}
}else{
this.emit('err');
if(typeof this.fd === 'number'){
if(autoClose){
fs.close(this.fd,()=>{
this.emit('close');
});
}
this.emit('close');
}
}
})
}
到此,一個read方法就寫的差不多了,但是有個問題是要注意的,open方法是異步的,有可能出現調用read方法時,this.fd還沒有值。為了避免這個錯誤,我們改寫一下read方法。
read(){
if(typeof this.fd !== 'number'){
this.once('open',()=>this.read());
}
...
}
這樣的話,一個基礎的readStream類才算寫完整。我們是不是要考慮下,有沒有什么可以優化的地方?細心的伙伴是不是發現有重復的代碼?
對,就是文件的關閉,我們提出一個destory方法,用作關閉文件。
destory(){
if(typeof this.fd==='number'){
if(autoClose){
fs.close(this.fd,()=>{
this.emit('close');
});
return ;
}
this.emit('close');
}
}
三、擴展
方法的調用介紹變量flowing時,我們有提到'暫停'方法pause(),'重啟'方法resume()來改變flowing的值。我們加入到代碼中。
- 首先加入調用,我們在第一次讀取數據后暫停讀取,在3秒后繼續讀取。
rs.on('data',(data)=>{
console.log(data);
this.pause();
});
setTimeout(()=>{
this.resume();
},3000)
- 這兩個方法的調用也是一樣簡單:
pause(){
this.flowing=false;
}
resume(){
this.flowing=true;
this.read();
}
OK,大功告成了,下面整理出完整代碼
var fs=require('fs');
var EventEmitter=require('events');
class readStream extends EventEmitter{
constructor(path,options){
super();
this.path=path;
this.flages=options.flages||'r';
this.autoClose=options.autoClose||true;
this.encoding=options.encoding||null;
this.highWaterMark=options.highWaterMark||64*1024;
this.end=options.end;
this.start=opitons.start||0;
this.pos=this.start;
this.flowing=false;
this.buffer=Buffer.alloc(this.highWaterMark);
this.open();
this.on('newListener',(eventName,callback){
if(eventName=='data'){
this.flowing=true;
fs.read();
}
});
open(){
fs.open(this.path,this.flags,(err,fd){
if(err){
if(this.autoClose){
this.destory();
}
this.emit('err',err);
return ;
}
this.fd=fd;
this.emit('open');
});
}
destory(){
if(typeof this.fd ='number'){
fs.close(this.fd,()=>{
this.emit('close');
});
return ;
}
this.emit('close');
}
read(){
if(typeof this.fd !== 'number'){
return this.once('open',()=>this.read());
}
let howToLength=this.end ? Math.min((this.end-this.pos),this.highWaterMark) : this.highWaterMark;
fs.read(this.fd,this.buffer,0,howToLenghth,this.pos,(err,bytesBase)=>{
if(bytesBase>0){
this.pos+=bytesBase;
let buf=this.buffer.slice(0,bytesBase);
let data=this.encoding ? this.buffer.toString(this.encoding) : this.buffer.toString();
this.emit('data',data);
if(this.pos>this.end){
this.emit('end');
this.destory();
}
if(flowing){
this.read()
}
}else{
this.emit('err');
this.destory();
}
})
}
pause(){
this.flowing=false;
}
resume(){
this.flowing=true;
this.read();
}
}
}