ChannelPipeline


Netty的ChannelPipeline和ChannelHandler機制類似於Servlet和Filter過濾器,這類攔截器實際上是職責鏈模式的一種變形,主要是為了方便事件的攔截和用戶業務邏輯的定制。Netty的Channel過濾器實現原理與Servlet Filter機制一致,它將Channel的數據管道抽象為ChannelPipeline,消息在ChannelPipeline中流動和傳遞。ChannelPipeline持有I/O事件攔截器ChannelHandler的鏈表,由ChannelHandler對I/O事件進行攔截和處理,可以方便地通過新增和刪除ChannelHandler來實現不同的業務邏輯定制,不需要對已有的ChannelHandler進行修改,能夠實現對修改封閉和對擴展的支持。

ChannelPipeline功能說明

ChannelPipeline是ChannelHandler的容器,它負責ChannelHandler的管理和事件攔截與調度。

ChannelPipeline的事件處理

一個消息被ChannelPipeline的ChannelHandler鏈攔截和處理的全過程:

(1)底層的SocketChannel read()方法讀取ByteBuf,觸發ChannelRead事件,由I/O線程NioEventLoop調用ChannelPipeline的fireChannelRead(Object msg)方法,將消息(ByteBuf)傳輸到ChannelPipeline中;

(2)消息依次被HeadHandler、ChannelHandler1、ChannelHandler2……TailHandler攔截和處理,在這個過程中,任何ChannelHandler都可以中斷當前的流程,結束消息的傳遞;

(3)調用ChannelHandlerContext的write方法發送消息,消息從TailHandler開始,途經ChannelHandlerN……ChannelHandler1、HeadHandler,最終被添加到消息發送緩沖區中等待刷新和發送,在此過程中也可以中斷消息的傳遞,例如當編碼失敗時,就需要中斷流程,構造異常的Future返回。

Netty中的事件分為inbound事件和outbound事件。inbound事件通常由I/O線程觸發,例如TCP鏈路建立事件、鏈路關閉事件、讀事件、異常通知事件等。

觸發inbound事件的方法如下。

(1)ChannelHandlerContext.fireChannelRegistered():Channel注冊事件;

(2)ChannelHandlerContext.fireChannelActive():TCP鏈路建立成功,Channel激活事件;

(3)ChannelHandlerContext.fireChannelRead(Object):讀事件;

(4)ChannelHandlerContext.fireChannelReadComplete():讀操作完成通知事件;

(5)ChannelHandlerContext.fireExceptionCaught(Throwable):異常通知事件;

(6)ChannelHandlerContext.fireUserEventTriggered(Object):用戶自定義事件;

(7)ChannelHandlerContext.fireChannelWritabilityChanged():Channel的可寫狀態變化通知事件;

(8)ChannelHandlerContext.fireChannelInactive():TCP連接關閉,鏈路不可用通知事件。

Outbound事件通常是由用戶主動發起的網絡I/O操作,例如用戶發起的連接操作、綁定操作、消息發送等操作,它對應圖17-1的右半部分。

觸發outbound事件的方法如下:

(1)ChannelHandlerContext.bind(SocketAddress, ChannelPromise):綁定本地地址事件;

(2)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):連接服務端事件;

(3)ChannelHandlerContext.write(Object, ChannelPromise):發送事件;

(4)ChannelHandlerContext.flush():刷新事件;

(5)ChannelHandlerContext.read():讀事件;

(6)ChannelHandlerContext.disconnect(ChannelPromise):斷開連接事件;

(7)ChannelHandlerContext.close(ChannelPromise):關閉當前Channel事件。

自定義攔截器

ChannelPipeline通過ChannelHandler接口來實現事件的攔截和處理,由於ChannelHandler中的事件種類繁多,不同的ChannelHandler可能只需要關心其中的某一個或者幾個事件,所以,通常ChannelHandler只需要繼承ChannelHandlerAdapter類覆蓋自己關心的方法即可。

例如,下面的例子展示了攔截Channel Active事件,打印TCP鏈路建立成功日志,代碼如下:

public class MyInboundHandler extends ChannelHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("TCP connected!");
        ctx.fireChannelActive();
    }
}

構建pipeline

事實上,用戶不需要自己創建pipeline,因為使用ServerBootstrap或者Bootstrap啟動服務端或者客戶端時,Netty會為每個Channel連接創建一個獨立的pipeline。對於使用者而言,只需要將自定義的攔截器加入到pipeline中即可。

pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());

對於類似編解碼這樣的ChannelHandler,它存在先后順序,例如MessageToMessageDecoder,在它之前往往需要有ByteToMessageDecoder將ByteBuf解碼為對象,然后對對象做二次解碼得到最終的POJO對象。

ChannelPipeline的主要特性

ChannelPipeline支持運行態動態的添加或者刪除ChannelHandler,在某些場景下這個特性非常實用。例如當業務高峰期需要對系統做擁塞保護時,就可以根據當前的系統時間進行判斷,如果處於業務高峰期,則動態地將系統擁塞保護ChannelHandler添加到當前的ChannelPipeline中,當高峰期過去之后,就可以動態刪除擁塞保護ChannelHandler了。

ChannelPipeline是線程安全的,這意味着N個業務線程可以並發地操作ChannelPipeline而不存在多線程並發問題。但是,ChannelHandler卻不是線程安全的,這意味着盡管ChannelPipeline是線程安全的,但是用戶仍然需要自己保證ChannelHandler的線程安全。

ChannelPipeline源碼分析

ChannelPipeline的代碼相對比較簡單,它實際上是一個ChannelHandler的容器,內部維護了一個ChannelHandler的鏈表和迭代器,可以方便地實現ChannelHandler查找、添加、替換和刪除。

ChannelPipeline對ChannelHandler的管理

ChannelPipeline是ChannelHandler的管理容器,負責ChannelHandler的查詢、添加、替換和刪除,它與Map等容器的實現非常類似。

由於ChannelPipeline支持運行期動態修改,在調用類似addBefore(ChannelHandlerInvoker invoker, String baseName, final String name, ChannelHandler handler)方法時,存在兩種潛在的多線程並發訪問場景。

  1. I/O線程和用戶業務線程的並發訪問;
  2. 用戶多個線程之間的並發訪問。

為了保證ChannelPipeline的線程安全性,需要通過線程安全容器或者鎖來保證並發訪問的安全,此處Netty直接使用了synchronized關鍵字,保證同步塊內的所有操作的原子性。首先根據baseName獲取它對應的DefaultChannelHandlerContext,ChannelPipeline維護了ChannelHandler名和ChannelHandlerContext實例的映射關系。  新增的ChannelHandler名進行重復性校驗,如果已經有同名的ChannelHandler存在,則不允許覆蓋,拋出IllegalArgumentException("Duplicate handler name: " + name)異常。校驗通過之后,使用新增的ChannelHandler等參數構造一個新的DefaultChannelHandlerContext實例。將新創建的DefaultChannelHandlerContext添加到當前的pipeline中(首先需要對添加的ChannelHandlerContext做重復性校驗,如果ChannelHandler不是可以在多個ChannelPipeline中共享的,且已經被添加到ChannelPipeline中,則拋出ChannelPipelineException異常。),加入成功之后,緩存ChannelHandlerContext,發送新增ChannelHandlerContext通知消息

ChannelPipeline的inbound事件

當發生某個I/O事件的時候,例如鏈路建立、鏈路關閉、讀取操作完成等,都會產生一個事件,事件在pipeline中得到傳播和處理,它是事件處理的總入口。由於網絡I/O相關的事件有限,因此Netty對這些事件進行了統一抽象,Netty自身和用戶的ChannelHandler會對感興趣的事件進行攔截和處理。

pipeline中以fireXXX命名的方法都是從IO線程流向用戶業務Handler的inbound事件,它們的實現因功能而異,但是處理步驟類似,總結如下。

(1)調用HeadHandler對應的fireXXX方法;

(2)執行事件相關的邏輯操作。

以fireChannelActive方法為例,調用head.fireChannelActive()之后,判斷當前的Channel配置是否自動讀取,如果為真則調用Channel的read方法

    DefaultChannelPipeline

    @Override
    public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();

        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }

ChannelPipeline的outbound事件

由用戶線程或者代碼發起的I/O操作被稱為outbound事件,事實上inbound和outbound是Netty自身根據事件在pipeline中的流向抽象出來的術語,在其他NIO框架中並沒有這個概念。

Pipeline本身並不直接進行I/O操作,最終都是由Unsafe和Channel來實現真正的I/O操作的。Pipeline負責將I/O事件通過HeadHandler進行調度和傳播,最終調用Unsafe的I/O方法進行I/O操作。最終由TailHandler調用Unsafe的connect方法發起真正的連接,pipeline僅僅負責事件的調度。

DefaultChannelPipeline

    @Override
    public ChannelPipeline fireChannelRegistered() {
        head.fireChannelRegistered();
        return this;
    }

    /**
     * Removes all handlers from the pipeline one by one from tail (exclusive) to head (inclusive) to trigger
     * handlerRemoved().  Note that the tail handler is excluded because it's neither an outbound handler nor it
     * does anything in handlerRemoved().
     */
    private void teardownAll() {
        tail.prev.teardown();
    }

    @Override
    public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();

        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }

    @Override
    public ChannelPipeline fireChannelInactive() {
        head.fireChannelInactive();
        teardownAll();
        return this;
    }

    @Override
    public ChannelPipeline fireExceptionCaught(Throwable cause) {
        head.fireExceptionCaught(cause);
        return this;
    }

    @Override
    public ChannelPipeline fireUserEventTriggered(Object event) {
        head.fireUserEventTriggered(event);
        return this;
    }

    @Override
    public ChannelPipeline fireChannelRead(Object msg) {
        head.fireChannelRead(msg);
        return this;
    }

    @Override
    public ChannelPipeline fireChannelReadComplete() {
        head.fireChannelReadComplete();
        if (channel.config().isAutoRead()) {
            read();
        }
        return this;
    }

    @Override
    public ChannelPipeline fireChannelWritabilityChanged() {
        head.fireChannelWritabilityChanged();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return tail.bind(localAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return tail.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return tail.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return tail.close();
    }

    @Override
    public ChannelPipeline flush() {
        tail.flush();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
    }

    @Override
    public ChannelFuture disconnect(ChannelPromise promise) {
        return tail.disconnect(promise);
    }

    @Override
    public ChannelFuture close(ChannelPromise promise) {
        return tail.close(promise);
    }

    @Override
    public ChannelPipeline read() {
        tail.read();
        return this;
    }

    @Override
    public ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
        return tail.write(msg, promise);
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return tail.writeAndFlush(msg, promise);
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM