巧妙復制一個流


場景

實際業務中可能出現重復消費一個可讀流的情況,比如在前置過濾器解析請求體,拿到body進行相關權限及身份認證;認證通過后框架或者后置過濾器再次解析請求體傳遞給業務上下文。因此,重復消費同一個流的需求並不奇葩,這類似於js上下文中通過 deep clone一個對象來操作這個對象副本,防止源數據被污染。

const Koa = require('koa');
const app = new Koa();

let parse = function(ctx){
    return new Promise((res)=>{
        let chunks = [],len  = 0, body = null;
        ctx.req.on('data',(chunk)=>{
            chunks.push(chunk)
            len += chunk.length
        });
        ctx.req.on('end',()=>{
            body = (Buffer.concat(chunks,len)).toString();
            res(body);
        });
    })
}
// 認證
app.use(async (ctx,next) => {
    let body = JSON.parse(decodeURIComponent(await parse(ctx)));
    if(body.name != 'admin'){
        return ctx.body = 'permission denied!'
    }
    await next();
})
// 解析body體,傳遞給業務層
app.use(async (ctx,next) => {
    let body = await parse(ctx);
    ctx.postBody = body;
    await next();
})
app.use(async ctx => {
  ctx.body = 'Hello World\n';
  ctx.body += `post body: ${ctx.postBody}`;
});

app.listen(3000);

上述代碼片段無法正常運行,請求無法得到響應。這是因為在前置過濾器的認證邏輯中消費了請求體,在第二級過濾器中就無法再次消費請求體,因此請求會阻塞。實際業務中,認證邏輯往往是與每個公司規范相關的,是一個“二方庫”;而示例中的第二季過濾器則通常作為一個三方庫存在,因此為了不影響第三方包消費請求體,必須在認證的二方包中保存 ctx.req 這個可讀流的數據仍然存在,這就涉及到本文的主旨了。

實現

復制流並不像復制一個對象一樣簡單與直接,流的使用是一次性的,一旦一個可讀流被消費(寫入一個Writeable對象中),那么這個可讀流就是不可再生的,無法再使用。可是通過一些簡單的技巧可以再次復原一個可讀流,不過這個復原出來的流雖然內容和之前的流相同,但卻不是同一個對象了,因此這兩個對象的屬性及原型都不同,這往往會影響后續的使用,不過辦法總是有的,且看下文。

實現一:可讀流的“影分身之術”

可讀流的“影分身之術”和鳴人的差不多,不過僅限於被克隆對象的 這一特性,即保證克隆出的流有着相同的數據。但是克隆出來的流卻無法擁有原對象的其他屬性,但我們可通過原型鏈繼承的方式實現屬性及方法的繼承。

let Readable = require('stream').Readable;
let fs = require('fs');
let path = require('path');

class NewReadable extends Readable{
    constructor(originReadable){
        super();
        this.originReadable = originReadable;
        this.start();
    }

    start() {
        this.originReadable.on('data',(chunck)=>{
            this.push(chunck);
        });

        this.originReadable.on('end',()=>{
            this.push(null);
        });
        
        this.originReadable.on('error',(e)=>{
            this.push(e);
        });
    }

    // 作為Readable的實現類,必須實現_read函數,否則會throw Error
    _read(){
    }
}

app.use(async (ctx,next) => {
    let cloneReq = new NewReadable(ctx.req);
    let cloneReq2 = new NewReadable(ctx.req);
    // 此時,ctx.req已被消費完(沒有內容),所有的數據都完全在克隆出的兩個流上

    // 消費cloneReq,獲取認證數據
    let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));

    // 將克隆出的cloneReq2重新設置原型鏈,繼承ctx.req原有屬性
    cloneReq2.__proto__ = ctx.req;
    // 此后重新給ctx.req復制,留給后續過濾器消費
    ctx.req = cloneReq2;

    if(body.name != 'admin'){
        return ctx.body = 'permission denied!'
    }
    await next();
})

點評: 這種影分身之術可以同時復制出多個可讀流,同時需要針對原來的流重新進行賦值,並繼承原有屬性,這樣才能不影響后續的重復消費。

實現二:懶人實現

stream模塊有一個特殊的類,即 Transform。關於Transfrom的特性,我曾在 深入node之Transform 一文中詳細介紹過,他擁有可讀可寫流雙重特性,那么利用Transfrom可以快速簡單的實現克隆。

首先,通過 pipe 函數將可讀流導向兩個 Transform流(之所以是兩個,是因為需要在前置過濾器消費一個流,后續的過濾器消費第二個)。

let cloneReq = new Transform({
    highWaterMark: 10*1024*1024,
    transform: (chunk,encode,next)=>{
        next(null,chunk);
    }
});
let cloneReq2 = new Transform({
    highWaterMark: 10*1024*1024,
    transform: (chunk,encode,next)=>{
        next(null,chunk);
    }
});
ctx.req.pipe(cloneReq)
ctx.req.pipe(cloneReq2)

上述代碼中,看似 ctx.req 流被消費(pipe)了兩次,實際上 pipe 函數則可以看成 Readable和Writeable實現backpressure的一種“語法糖”實現,具體可通過 node中的Stream-Readable和Writeable解讀 了解,因此得到的結果就是“ctx.req被消費了一次,可是數據卻復制在cloneReq和cloneReq2這兩個Transfrom對象的讀緩沖區里,實現了clone”

其實pipe針對Readable和Writeable做了限流,首先針對Readable的data事件進行偵聽,並執行Writeable的write函數,當Writeable的寫緩沖區大於一個臨界值(highWaterMark),導致write函數返回false(此時意味着Writeable無法匹配Readable的速度,Writeable的寫緩沖區已經滿了),此時,pipe修改了Readable模式,執行pause方法,進入paused模式,停止讀取讀緩沖區。而同時Writeable開始刷新寫緩沖區,刷新完畢后異步觸發drain事件,在該事件處理函數中,設置Readable為flowing狀態,並繼續執行flow函數不停的刷新讀緩沖區,這樣就完成了pipe限流。需要注意的是,Readable和Writeable各自維護了一個緩沖區,在實現的上有區別:Readable的緩沖區是一個數組,存放Buffer、String和Object類型;而Writeable則是一個有向鏈表,依次存放需要寫入的數據。

最后,在數據復制的同時,再給其中一個對象復制額外的屬性即可:

// 將克隆出的cloneReq2重新設置原型鏈,繼承ctx.req原有屬性
cloneReq2.__proto__ = ctx.req;
// 此后重新給ctx.req復制,留給后續過濾器消費
ctx.req = cloneReq2;

至此,通過Transform實現clone已完成。完整的代碼如下(最前置過濾器):

// 認證
app.use(async (ctx,next) => {
    // let cloneReq = new NewReadable(ctx.req);
    // let cloneReq2 = new NewReadable(ctx.req);
    let cloneReq = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    let cloneReq2 = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    ctx.req.pipe(cloneReq)
    ctx.req.pipe(cloneReq2)
    // 此時,ctx.req已被消費完(沒有內容),所有的數據都完全在克隆出的兩個流上

    // 消費cloneReq,獲取認證數據
    let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));

    // 將克隆出的cloneReq2重新設置原型鏈,繼承ctx.req原有屬性
    cloneReq2.__proto__ = ctx.req;
    // 此后重新給ctx.req復制,留給后續過濾器消費
    ctx.req = cloneReq2;

    if(body.name != 'admin'){
        return ctx.body = 'permission denied!'
    }
    await next();
})

說明

  1. ctx.req執行兩次pipe到對應cloneReq和cloneReq2,然后立即消費cloneReq對象,這樣合理嗎?如果源數據夠大,pipe還未結束就在消費cloneReq,會不會有什么問題?

    其實 pipe函數里面大多是異步操作,即針對 源和目的流做的一些流控措施。目的流使用的是cloneReq對象,該對象在實例化的過程中 transform函數直接通過調用next函數將接受到的數據傳入到Transform對象的可讀流緩存中,同時觸發‘readable和data事件’。這樣,我們在下文消費cloneReq對象也是通過“偵聽data事件”實現的,因此即使ctx.req的數據仍沒有被消費完,下文仍可以正常消費cloneReq對象。數據流仍然可以看做是從ctx.req --> cloneReq --> 消費。

  2. 使用Transform流實現clone 可讀流的弊端:

    上例中,Transfrom流的實例化傳入了一個參數 highWaterMark,該參數在Transfrom中的作用 在 上文 深入node之Transform 中有過詳解,即當Transfrom流的讀緩沖大小 < highWaterMark時,Transfrom流就會將接收到的數據存儲在讀緩沖里,等待消費,同時執行 transfrom函數;否則什么都不做。

    因此,當要clone的源內容大於highWaterMark時,就無法正常使用這種方式進行clone了,因為由於源內容>highWaterMark,在沒有后續消費Transfrom流的情況下就不執行transfrom方法(當Transfrom流被消費時,Transfrom流的讀緩沖就會變小,當其大小<highWaterMark時,又可以執行transfrom方法繼續存儲源數據),無法存儲源文件內容。

    所以設置一個合理的highWaterMark大小很重要,默認的highWaterMark為 16kB。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM