本節主要討論了 Netty 的數據處理組件 ChannelHandler。
一、Channel 生命周期
Channel 有個簡單但強大的狀態模型,下面是 Channel 的四個狀態:
Channel 的正常生命周期如下圖,當這些狀態發生變化時,對應的事件將會生成。
二、ChannelHandler 生命周期
ChannelHandler 定義的生命周期如下,當 ChannelHandler 添加到 ChannelPipeline,或者從 ChannelPipeline 移除后,這些將會調用。
三、ChannelInbounderHandler
ChannelInboundHandler 的生命周期方法如下,當接收到數據或者與之關聯的 Channel 狀態改變時調用。
可以看到,這些方法與 Channel 的生命周期接近。
其中 channelRead 和 channelReadComplete 方法在讀操作開始和完成時調用。
另外,channelWritabilityChanged 方法在 channel 寫狀態發生變化時調用。
四、ChannelOutboundHandler
ChannelOutboundHandler 提供了出站操作時調用的方法。
另外,它具有在請求時演示操作或者事件的能力。比如,當你在寫數據到遠程的過程中被意外暫停,你可以延時進行刷新操作,然后在遲些時候繼續。
下面是提供的方法(繼承自 ChannelHandler 未列出來):
五、資源管理
Netty 使用引用計數器來處理池化的 ByteBuf。所以當 ByteBuf 完全處理后,要確保引用計數器被調整。
當你覆蓋了 channelRead 操作,在處理完消息之后,需要釋放它,如下:
1 public class DiscardServerHandler extends ChannelInboundHandlerAdapter { 2 /** 3 * 收到數據時調用 4 */ 5 @Override 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 7 // 丟棄收到的數據 8 ((ByteBuf) msg).release(); 9 } 10 }
Netty 提供了一個特殊的稱為 SimpleChannelInboundHander 的實現,該實現將在用戶通過 channelRead0() 方法處理完數據之后,自動釋放該消息。
當你在處理寫操作,並丟棄消息時,你需要通知 ChannelPromise 數據已經被處理,如下:
1 public class DiscardOutbounderHandler extends ChannelOutboundHandlerAdapter { 2 @Override 3 public void write(ChannelHandlerContext ctx, 4 Object msg, ChannelPromise promise) throws Exception { 5 ReferenceCountUtil.release(msg); // 釋放資源 6 promise.setSuccess(); // 通知 ChannelPromise 數據已經被處理 7 } 8 }
六、使用 ChannelHandler
下圖展示了 ChannelPipeline,Channel,ChannelHandler 和 ChanelHandlerContext 的關系:
- Channel 綁定到 ChannelPipeline
- ChannelPipeline 綁定到包含 ChannelHandler 的 Channel
- ChannelHandler
- 當添加 ChannelHandler 到 ChannelPipeline 時,ChannelHandlerContext 被創建
如果要完成一個寫操作,有以下兩種方式:
第一種就是從 ChannelHandlerContext 獲取到 Channel 的引用,執行 Channel 上的 write() 方法,如下:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 // 丟棄收到的數據 3 ((ByteBuf) msg).release(); 4 5 Channel channel = ctx.channel(); // 獲取channel引用 6 // 通過channel寫緩存 7 channel.write(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8)); 8 }
第二種是從 ChannelHandlerContext 獲取到 ChannelPipeline 的引用,執行 ChannelPipeline 上的 write() 方法,如下:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 ChannelPipeline pipeline = ctx.pipeline(); // 獲取pipeline引用 3 // 通過pipeline寫緩存 4 pipeline.write(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8)); 5 }
寫操作流程如下:
- 事件傳遞給 ChannelPipeline 的第一個 ChannelHandler
- ChannelHandler 通過關聯的 ChannelHandlerContext 傳遞事件給 ChannelPipeline 中的下一個 ChannelHandler
- 與 2 類似
但是有些時候不希望總是從 ChannelPipeline 的第一個 ChannelHandler 開始事件,我們希望從一個特定的 ChannelHandler 開始處理。你必須引用於此 ChannelHandler 的前一個 ChannelHandler 關聯的 ChannelHandlerContext,利用它調用與自身關聯的 ChannelHandler 的下一個 ChannelHandler。如下:
1 ChannelHandlerContext ctx = context; // 獲得 ChannelHandlerContext引用 2 // write()將會把緩沖區發送到下一個ChannelHandler 3 ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
流程如下:
- 直接從特定的 ChannelHandler 開始執行
- 事件發送到下一個 ChannelHandler
- 經過最后一個 ChannelHandler 后,事件從 ChannelPipeline 移除