ChannelHandler
1. Channel 生命周期
Channel 的生命周期狀態如下:
狀態 | 描述 |
---|---|
ChannelUnregistered | Channel 已經被創建,但還未注冊到 EventLoop |
ChannelRegistered | Channel 已經被注冊到 EventLoop |
ChannelActive | Channel 處於活動狀態(已經連接到它的遠程節點),可以接收和發送數據 |
ChannelInactive | Channel 沒有連接到遠程節點 |
Channel 的生命周期按上表從上到下所示,當狀態發生改變時,將會生成對應的事件。這些事件將會被轉發到 ChannelPipeline 中的 ChannelHandler,隨后其可以做出響應
2. ChannelHandler 生命周期
下表列出了 ChannelHandler 定義的生命周期操作,在 ChannelHandler 被添加到 ChannelPipeline 或者被從 ChannelPipeline 移除時會調用這些操作,這些方法中的每一個都接受一個 ChannelHandlerContext 參數
類型 | 描述 |
---|---|
handlerAdded | 當把 ChannelHandler 添加到 ChannelPipeline 中時被調用 |
handlerRemoved | 當從 ChannelPipeline 中移除 ChannelHandler 時被調用 |
exceptionCaught | 當處理過程中在 ChannelPipeline 中有錯誤產生時被調用 |
3. ChannelInboundHandler 接口
ChannelInboundHandler 是 ChannelHandler 接口的子接口,處理入站數據以及各種狀態變化。下表列出 ChannelInboundHandler 的生命周期方法,這些方法將會在數據被接收時或者與其對應的 Channel 狀態發生改變時被調用
類型 | 描述 |
---|---|
channelRegistered | 當 Channel 已經注冊到它的 EventLoop 並且能夠處理 IO 時被調用 |
channelUnregistered | 當 Channel 從它的 EventLoop 注銷並且無法處理任何 IO 時被調用 |
channelActive | 當 Channel 處於活動狀態時被調用,Channel 已經連接/綁定並且就緒 |
channelInactive | 當 Channel 離開活動狀態並且不再連接它的遠程節點時被調用 |
channelReadComplete | 當 Channel 上的一個讀操作完成時被調用 |
channelRead | 當從 Channel 讀取數據時被調用 |
ChannelWritabilityChanged | 當 Channel 的可寫狀態發生改變時被調用 |
useEventTriggered | 當 ChannelInboundHandler.fireUserEventTriggered() 方法被調用時被調用,因為一個 POJO 流經 ChannelPipeline |
當某個 ChannelInboundHandler 的實現重寫 channelRead() 方法時,它將負責顯式地釋放與池化 ByteBuf 實例相關的內存。Netty 為此提供了一個實用方法 ReferenceCountUtil.release()
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
這種方式可能會很煩瑣,另一種更加簡單的方式是使用 SimpleChannelInboundHandler
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandlerAdapter<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
// 不需要顯式進行資源釋放
}
}
由於 SimpleChannelInboundHandler 會自動釋放資源,所以你不應該存儲任何指向消息的引用,因為這些引用都將會失效
4. ChannelOutboundHandler 接口
出站操作和數據由 ChannelOutboundHandler 處理,它的方法將被 Channel、ChannelPipeline 以及 ChannelHandlerContext 調用
ChannelOutboundHandler 的一個強大的功能是可以按需推遲操作或者事件,使得可以通過一些復雜的方法來處理請求,例如,如果到遠程節點的寫入被暫停了,那么你可以推遲沖刷操作並在稍后繼續
下標顯示了所有由 ChannelOutboundHandler 本身所定義的方法
類型 | 描述 |
---|---|
bind(ChannelHandlerContext, SocketAddress, ChannelPromise) | 當請求將 Channel 綁定到本地地址時被調用 |
connect(ChannelHandlerContext, SocketAddress, ChannelPromise) | 當請求將 Channel 連接到遠程節點時被調用 |
disconnect(ChannelHandlerContext, ChannelPromise) | 當請求將 Channel 從遠程節點斷開時被調用 |
close(ChannelHandlerContext, ChannelPromise) | 當請求關閉 Channel 時被調用 |
deregister(ChannelHandlerContext, ChannelPromise) | 當請求將 Channel 從它的 EventLoop 注銷時被調用 |
read(ChannelHandlerContext) | 當請求從 Channel 讀取更多的數據時被調用 |
flush(ChannelHandlerContext) | 當請求通過 Channel 將入隊數據沖刷到遠程節點時被調用 |
write(ChannelHandlerContext, Object, ChannelPromise) | 當請求通過 Channel 將數據寫到遠程節點時被調用 |
ChannelOutboundHandler 中的大部分方法都需要一個 ChannelPromise 參數,以便在操作完成時得到通知。CHannelPromise 是 ChannelFuture 的一個子類,定義了一些可寫方法
5. ChannelHandler 適配器
可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 類作為自己的 Channel 的起始點,這兩個適配器分別提供了 ChannelInboundHandler 和 CHannelOutboundHandler 的基本實現,並擴展抽象類 ChannelHandlerAdapter
ChannelHandlerAdapter 還提供了實用方法 isSharable(),如果其對應的實現被標記為 Sharable,那么這個方法將返回 true,表示它可以被添加到多個 ChannelPipeline 中
如果想在自己的 ChannelHandler 中使用這些適配器類,只需要簡單地擴展它們,並重寫自定義方法
ChannelPipeline 接口
可以把 ChannelPipeline 理解成一個攔截流經 Channel 的入站和出站事件的 ChannelHandler 實例鏈,每一個新創建的 Channel 都將會被分配一個新的 ChannelPipeline,Channel 不能附加到另一個 ChannelPipeline,也不能從當前分離。根據事件的起源,事件將會被 ChannelInboundHandler 或者 ChannelOutboundHandler 處理,隨后,通過調用 ChannelHandlerContext 實現,它將被轉發給同一超類型的下一個 ChannelHandler
一個典型的同時具有入站和出站 ChannelHandler 的 ChannelPipeline 布局如圖所示
在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配,如果不匹配,ChannelPipeline 將跳過該 ChannelHandler 並前進到下一個,直到找到和該事件所期望的方向相匹配
1. 修改 ChannelPipeline
通過調用 ChannelPipeline 上的相關方法,ChannelHandler 可以添加、刪除或者替換其他的 ChannelHandler,當然也包括它自己
下表列出了由 ChannelHandler 修改 ChannelPipeline 的相關方法
名稱 | 描述 |
---|---|
addFirst addBefore addAfter addLast |
將一個 ChannelHandler 添加到 ChannelPipeline |
remove | 將一個 ChannelHandler 從 ChannelPipeline 中移除 |
replace | 將 ChannelPipeline 中的一個 ChannelHandler 替換為另一個 ChannelHandler |
ChannelPipeline 的用於訪問 ChannelHandler 的操作
名稱 | 描述 |
---|---|
get | 通過類型或者名稱返回 ChannelHandler |
context | 返回和 ChannelHandler 綁定的 ChannelHandlerContext |
names | 返回 ChannelPipeline 中所有 ChannelHandler 的名稱 |
2. 觸發事件
ChannelPipeline 的 API 公開了用於調用入站和出站操作的附加方法
ChannelPipeline 的入站操作如表
方法名稱 | 描述 |
---|---|
fireChannelRegistered | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext) 方法 |
fireChannelUnregistered | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelUnregistered(ChannelHandlerContext) 方法 |
fireChannelActive | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelActive(ChannelHandlerContext) 方法 |
fireChannelInactive | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelInactive(ChannelHandlerContext) 方法 |
fireExceptionCaught | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 exceptionCaught(ChannelHandlerContext, Throwable) 方法 |
fireUserEventTriggered | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 userEventTriggered(ChannelHandlerContext, Object) 方法 |
fireChannelRead | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelRead(ChannelHandlerContext) 方法 |
fireChannelReadComplete | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelReadComplete(ChannelHandlerContext) 方法 |
fireChannelWritabilityChanged | 調用 ChannelPipeline 中下一個 ChannelInboundHandler 的 channelWritabilityChanged(ChannelHandlerContext) 方法 |
ChannelPipeline 的出站操作如表
方法名稱 | 描述 |
---|---|
bind | 將 Channel 綁定到一個本地地址,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 bind(ChannelHandlerContext, SocketAddress, ChannelPromise) 方法 |
connect | 將 Channel 綁定到一個遠程地址,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 connect(ChannelHandlerContext, SocketAddress, ChannelPromise) 方法 |
disconnect | 將 Channel 斷開連接,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 disconnect(ChannelHandlerContext, ChannelPromise) 方法 |
close | 將 Channel 關閉,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 close(ChannelHandlerContext, ChannelPromise) 方法 |
deregister | 將 Channel 從它先前所分配的 EventExecutor(即 EventLoop)中注銷,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 deregister(ChannelHandlerContext, ChannelPromise) 方法 |
flush | 沖刷 Channel 所有掛起的寫入,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 flush(ChannelHandlerContext) 方法 |
write | 將消息寫入 Channel,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 write(ChannelHandlerContext, Object, ChannelPromise) 方法 |
writeAndFlush | 這是一個先調用 write() 方法再接着調用 flush() 方法的便利方法 |
read | 請求從 Channel 中讀取更多的數據,這將調用 ChannelPipeline 中的下一個 ChannelOutboundHandler 的 read(ChannelHandlerContext) 方法 |
總結一下:
- ChannelPipeline 保存了與 Channel 相關聯的 ChannelHandler
- ChannelPipeline 可以根據需要,動態修改 ChannelHandler
- ChannelPipeline 有豐富的 API 用以響應入站和出站事件
ChannelContext 接口
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有 ChannelHandler 添加到 ChannelPipeline 中時,都會創建 ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所關聯的 ChannelHandler 和在同一個 ChannelPipeline 中的其他 ChannelHandler 之間的交互
ChannelHandlerContext 有很多方法,其中一些方法也存在於 Channel 和 ChannelPipeline 上,不同的是,如果調用 Channel 或者 ChannelHandlerPipeline 上的這些方法,它們將沿着整個 ChannelPipeline 進行傳播。而調用位於 ChannelHandlerContext 上的相同方法,則將從當前所關聯的 ChannelHandler 開始,並且只會傳播給位於該 ChannelPipeline 中的下一個能處理該事件的 ChannelHandler
1. 使用 ChannelHandlerContext
Channel、ChannelPipeline、ChannelHandler 以及 ChannelHandlerContext 之間的關系如圖所示
通過 ChannelHandler 或 ChannelPipeline 操作,事件將傳播整個 ChannelPipeline,但在 ChannelHandler 的級別上,事件從上一個 ChannelHandler 到下一個 ChannelHandler 的移動是由 ChannelHandlerContext 上的調用完成的
為什么想要從 ChannelPipeline 中的某個特定點開始傳播事件?
- 減少將事件傳經對它不感興趣的 ChannelHandler 所帶來的開銷
- 避免將事件傳經那些可能會對它感興趣的 ChannelHandler
要想調用從某個特定的 ChannelHandler 開始的處理過程,必須獲取該 ChannelHandler 之前的 ChannelHandler 所關聯的 ChannelHandlerContext,這個 ChannelHandlerContext 將調用和它所關聯的 ChannelHandler 之后的 ChannelHandler
我們還可以通過調用 ChannelHandlerContext 上的 pipeline() 方法來獲得 ChannelPipeline 的引用,進而得以在運行時操作 ChannelHandler
異常處理
1. 處理入站異常
如果在處理入站事件的過程中有異常拋出,那么它將從它在 ChannelInboundHandler 里被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你需要在你的 ChannelInboundHandler 實現重寫以下方法,否則默認將異常轉發給下一個 ChannelHandler:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
因為異常會按照入站方向繼續流動,因此實現了處理異常邏輯的 ChannelInboundHandler 通常位於 ChannelPipeline 的最后,確保異常總能被處理
2. 處理出站異常
用於處理出站操作的正常完成以及異常的選項,都基於以下通知機制:
- 每個出站操作都將返回一個 ChannelFuture,注冊到 ChannelFuture 的 ChannelFutureListener 將在操作完成時通知結果
- 幾乎所有的 ChannelOutboundHandler 上的方法都會傳入一個 ChannelPromise 實例,作為 ChannelFuture 的子類,ChannelPromise 也可以被分配用於異步通知的監聽器
添加 ChannelFutureListener 只需要調用 ChannelFuture實例上的 addListener(ChannelFutureListener) 方法,並且有兩種方式可以做到,第一種是調用出站操作(如 write 方法)所返回的 ChannelFuture 上的 addListener() 方法;第二種是 ChannelFutureListener 添加到作為參數傳遞的 ChannelOutboundHandler 的方法的 ChannelPromise
所以,如果你的 ChannelOutboundHandler 拋出異常,Netty 本身會通知任何已經注冊到對應 ChannelPromise 的注冊器