Netty 系列四(ChannelHandler 和 ChannelPipeline).


一、概念

    先來整體的介紹一下這篇博文要介紹的幾個概念(Channel、ChannelHandler、ChannelPipeline、ChannelHandlerContext、ChannelPromise):

Channel:Netty 中傳入或傳出數據的載體;
ChannelHandler:Netty 中處理入站和出站數據的應用程序邏輯的容器;
ChannelPipeline:ChannelHandler鏈 的容器;
ChannelHandlerContext:代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有ChannelHandler 添加到 ChannelPipeline 中時,都會創建 ChannelHandlerContext;
ChannelPromise:ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 從而使ChannelFuture不可變。

    我們來舉一個例子描述這些概念之間的邏輯關系:服務端接收到客戶端的連接請求,創建一個Channel同客戶端進行綁定,新創建的 Channel 會都將會被分配一個新的ChannelPipeline(這項關聯是永久性的,Channel 既不會附加另外一個ChannelPipeline,也不能分離當前的)。而 ChannelPipeline 作為 ChannelHandler鏈 的容器,當Channel 生命周期中狀態發生改變時,將會生成對應的事件,這些事件將會被 ChannelPipeline 中 ChannelHandler 所響應,響應方法的參數一般都有一個 ChannelHandlerContext ,一個 ChannelHandler 對應一個 ChannelHandlerContext,ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,這項關聯也是永遠不會改變的。

二、ChannelHandler

    Netty提供了大量預定義的可以開箱即用的ChannelHandler實現,包括用於各種協議的ChannelHandler。因此,我們在自定義ChannelHandler實現用於處理我們的程序邏輯時,只需要繼承Netty 的一些默認實現即可,主要有兩種:

1、繼承 ChannelHandlerAdapter (在4.0 中 處理入站事件繼承 ChannelInboundHandlerAdapter,處理出站事件繼承 ChannelOutboundHandlerAdapter ;在5.0 推薦直接繼承 ChannelHandlerAdapter)

2、繼承 SimpleChannelInboundHandler

    這兩種方式有什么區別呢?  當我們處理 入站數據 和 出站數據時,都需要確保沒有任何的資源泄露。在入站方向,繼承 SimpleChannelInboundHandler 的實現類會在消息被處理之后自動處理消息,而繼承 ChannelHandlerAdapter 的實現類需要手動的釋放消息(ReferenceCountUtil.release(msg));在出站方向,不管繼承的是哪一種的實現類,當你處理了 write() 操作並丟棄了一個消息,那么你就應該釋放它,不僅如此,還要通知 ChannelPromise。否則可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的情況。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ReferenceCountUtil.release(msg);
    //ChannelPromise 是ChannelFuture的一個子類,設置成 true 通知 ChannelFutureListener 消息已經被處理了
    //當一個 Promise 被完成之后,其對應的 Future 的值便不能再進行任何修改了
    promise.setSuccess();
}

    tips:總之,如果一個消息被消費或者丟棄了, 並且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那么用戶就有責任調用 ReferenceCountUtil.release()。

    來看看 ChannelHandler 的 API:

isSharable :如果其對應的實現被標注為 Sharable, 那么這個方法將返回 true, 表示它可以被添加到多個 ChannelPipeline中

-- ChannelHandler 生命周期方法 --
handlerAdded :當把 ChannelHandler 添加到 ChannelPipeline 中時被調用
handlerRemoved :當從 ChannelPipeline 中移除 ChannelHandler 時被調用
exceptionCaught : 當處理過程中在 ChannelPipeline 中有錯誤產生時被調用

-- 處理入站數據以及各種狀態變化 --
channelRegistered : 當 Channel 已經注冊到它的 EventLoop 並且能夠處理 I/O 時被調用
channelUnregistered : 當 Channel 從它的 EventLoop 注銷並且無法處理任何 I/O 時被調用
channelActive : 當 Channel 處於活動狀態時被調用;Channel 已經連接/綁定並且已經就緒
channelInactive : 當 Channel 離開活動狀態並且不再連接它的遠程節點時被調用
channelReadComplete : 當Channel上的一個讀操作完成時被調用
channelRead : 當從 Channel 讀取數據時被調用
ChannelWritabilityChanged :當 Channel 的可寫狀態發生改變時被調用。
userEventTriggered : 當 ChannelnboundHandler.fireUserEventTriggered()方法被調用時被調用,因為一個 POJO 被傳經了 ChannelPipeline

-- 處理出站數據並且允許攔截所有的操作 --
bind : 當請求將 Channel 綁定到本地地址時被調用
connect : 當請求將 Channel 連接到遠程節點時被調用
disconnect : 當請求將 Channel 從遠程節點斷開時被調用
close : 當請求關閉 Channel 時被調用
deregister(5.0中被廢棄) : 當請求將 Channel 從它的 EventLoop 注銷時被調用
read : 當請求從 Channel 讀取更多的數據時被調用
flush : 當請求通過 Channel 將入隊數據沖刷到遠程節點時被調用
write :當請求通過 Channel 將數據寫到遠程節點時被調用

三、ChannelPipeline

    ChannelPipeline 是一個攔截流經 Channel 的入站和出站事件的ChannelHandler 實例鏈,它和 ChannelHandler 之間的交互組成了應用程序數據和事件處理邏輯的核心,而它們之間的關聯交互就是通過 ChannelHandlerContext。

    如果一個入站事件被觸發,它將被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。如圖,Netty 總是將 ChannelPipeline 的入站口作為頭部,而將出站口作為尾端,如圖,第一個被入站事件看到的 ChannelHandler 將是1,而第一個被出站事件看到的是 ChannelHandler 將是 5。稍微總結下這句拗口的話,入站事件順序執行(1—>2—>3—>4—>5)、出站事件逆序執行(5—>4—>3—>2—>1)。

    既然 ChannelPipeline 是 ChannelHandler鏈 的容器,讓我們來看看ChannelPipeline 是如何管理 ChannelHandler的吧!

addFirst : 將一個 ChannelHandler 添加到 ChannelPipeline 最開始位置中
addBefore :將一個 ChannelHandler 添加到 ChannelPipeline 某個ChannelHandler前
addAfter:將一個 ChannelHandler 添加到 ChannelPipeline 某個ChannelHandler后
addLast : 將一個 ChannelHandler 添加到 ChannelPipeline 最末尾位置
remove :將一個 ChannelHandler 從 ChannelPipeline 中移除
replace :將 ChannelPipeline 中的一個 ChannelHandler 替換為另一個 ChannelHandler
get :通過類型或者名稱返回 ChannelHandler
context :返回和 ChannelHandler 綁定的 ChannelHandlerContext
names :返回 ChannelPipeline 中所有 ChannelHandler 的名稱

    ChannelPipeline 的API 用於調用入站操作的附加方法:

fireChannelRegistered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelUnregistered(ChannelHandlerContext)方法
fireChannelActive: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelActive(ChannelHandlerContext)方法
fireChannelInactive: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelInactive(ChannelHandlerContext)方法
fireExceptionCaught: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的exceptionCaught(ChannelHandlerContext, Throwable)方法
fireUserEventTriggered: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的userEventTriggered(ChannelHandlerContext, Object)方法
fireChannelRead: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelRead(ChannelHandlerContext, Object msg)方法
fireChannelReadComplete: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged: 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的channelWritabilityChanged(ChannelHandlerContext)方法

    ChannelPipeline 的API 用於調用出站操作的附加方法:

bind: 將 Channel 綁定到一個本地地址,這將調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 bind方法
connect: 將 Channel 連接到一個遠程地址,這將調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 connect方法
disconnect: 將 Channel 斷開連接。這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 disconnect方法
close: 將 Channel 關閉。這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 close方法
deregister(5.0中被廢棄): 將 Channel 從它先前所分配的 EventExecutor(即 EventLoop)中注銷。這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 deregister方法
flush: 沖刷Channel所有掛起的寫入。這將調用ChannelPipeline中的下一個ChannelOutboundHandler 的 flush方法
write: 將消息寫入 Channel。這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler的write方法。注意:這並不會將消息寫入底層的 Socket,而只會將它放入隊列中。要將它寫入 Socket,需要調用 flush()或者 writeAndFlush()方法
writeAndFlush: 這是一個先調用 write()方法再接着調用 flush()方法的便利方法
read: 請求從 Channel 中讀取更多的數據。這將調用 ChannelPipeline 中的下一個ChannelOutboundHandler 的 read(ChannelHandlerContext)方法

四、ChannelHandlerContext

    ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯。

    ChannelHandlerContext 有很多的方法,其中一些方法也存在於 Channel 和 ChannelPipeline 本身上,但是有一點重要的不同。如果調用 Channel 或者 ChannelPipeline 上的這些方法,它們將沿着整個 ChannelPipeline 進行傳播。而調用位於 ChannelHandlerContext上的相同方法,則將從當前所關聯的 ChannelHandler 開始,並且只會傳播給位於該ChannelPipeline 中的下一個能夠處理該事件的 ChannelHandler。因此,盡量使用 ChannelHandlerContext 的同名方法來處理邏輯,因為它將產生更短的事件流, 應該盡可能地利用這個特性來獲得最大的性能。

五、異常處理

    入站異常處理:

1、ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;
2、如果異常到達了 ChannelPipeline 的尾端,它將會被記錄為未被處理;
3、要想定義自定義的處理邏輯,你需要重寫 exceptionCaught()方法。一般將這個Channelhandler放在 ChannelPipeline 的最后,確保所有的入站異常都總會被處理。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.channel().close();
}

    出站異常處理:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    //出站異常處理
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    });
}

六、寄語

    有時候也會迷茫,身邊的人理論基礎差一些的的,代碼不一樣敲的好好的?而我花大量時間細細的去研究這么理論真的值得嗎?仔細想想,人生很多事情本來就是徒勞無功的啊,沒必要急功近利,欲速則不達。要堅信,一切的付出總會在人生的某個時刻回報在我們身上。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM