一、前言
前面學習了Netty的ByteBuf,接着學習ChannelHandler和ChannelPipeline。
二、ChannelHandler和ChannelPipeline
2.1 ChannelHandler
在ChannelPipeline中,ChannelHandler可以被鏈在一起處理用戶邏輯。
1. Channel生命周期
Channel接口定義了一個簡單但是強大的狀態模型,該模型與ChannelInboundHandler API緊密聯系,Channel有如下四種狀態。
Channel的生命周期如下圖所示。
當狀態發生變化時,就會產生相應的事件。
2. ChannelHandler的生命周期
ChannelHandler定義的生命周期如下圖所示。
Netty定義了ChannelHandler的兩個重要的子類
· ChannelInboundHandler,處理各種入站的數據和狀態的變化。
· ChannelOutboundHandler,處理出站數據並允許攔截的所有操作。
3. ChannelInboundHandler接口
下圖展示了ChannelInboundHandler接口生命周期中的方法,當接受到數據或者其對應的Channel的狀態發生變化則會調用方法
當ChannelInboundHandler的實現覆蓋channelRead()方法時,它負責顯式釋放與池的ByteBuf實例相關聯的內存,可以使用ReferenceCountUtil.release() 方法進行釋放。如下代碼展示了該方法的使用。
public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } }
上述顯式的釋放內存空間會顯得比較麻煩,而如下代碼則無需顯式釋放內存空間。
@Sharable public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // No need to do anything special } }
上述代碼中,SimpleChannelInboundHandler會自動釋放資源,因此無需顯式釋放。
4. ChannelOutboundHandler接口
ChannelOutboundHandler處理出站操作和數據,它的方法會被Channel、ChannelPipeline、ChannelHandlerContext觸發。ChannelOutboundHandler可按需推遲操作或事件。例如對遠程主機的寫入被暫停,你可以延遲刷新操作並在稍后重啟。
5. ChannelHandler適配器
可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類作為自己的ChannelHandler程序的起點,這些適配器提供了ChannelInboundHandler和ChannelOutboundHandler的簡單實現,它們繼承了共同父類接口ChannelHandler的方法,其繼承結構如下圖所示
ChannelHandlerAdapter還提供了isSharable方法,如果有Sharable注釋則會返回true,這也表明它可以被添加至多個ChannelPipiline中。ChannelInboundHandlerAdapter and ChannelOutboundHandlerAdapter的方法體中會調用ChannelHandlerContext對應的方法,因此可以將事件傳遞到管道的下個ChannelHandler中。
6. 資源管理
無論何時調用ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法,都需要保證沒有資源泄露,由於Netty使用引用計數來管理ByteBuf,因此當使用完ByteBuf后需要調整引用計數。
為了診斷可能出現的問題,Netty提供了ResourceLeakDetector,它將抽取應用程序大約1%的緩沖區分配來檢查內存泄漏,其額外的開銷很小,內存檢測有如下四種級別
可以通過java -Dio.netty.leakDetectionLevel=ADVANCED 設置內存檢測級別。
當讀取數據時,可以在readChannel方法中調用ReferenceCountUtil.release(msg)方法釋放資源,或者實現SimpleChannelInboundHandler(會自動釋放資源);而當寫入數據時,可以在write方法中調用ReferenceCountUtil.release(msg)釋放資源,具體代碼如下
@Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); promise.setSuccess(); } }
不僅需要釋放資源,並且需要通知ChannelPromise,否則ChannelFutureListener可能收不到事件已經被處理的通知。如果消息到達實際的傳輸層,就可以在寫操作完成或者關閉通道時會自動釋放資源。
2.2 ChannelPipeline接口
如果將ChannelPipeline視為ChannelHandler實例鏈,可攔截流經通道的入站和出站事件,即可明白ChannelHandler之間的交互是如何構成應用程序數據和事件處理邏輯的核心的。當創建一個新的Channel時,都會分配了一個新的ChannelPipeline,該關聯是永久的,該通道既不能附加另一個ChannelPipeline也不能分離當前的ChannelPipeline。
一個事件要么被ChannelInboundHander處理,要么被ChannelOutboundHandler處理,隨后,它將通過調用ChannelHandlerContext的實現來將事件轉發至同一超類型的下一個處理器。ChannelHandlerContext允許ChannelHandler與其ChannelPipeline和其他ChannelHandler進行交互,一個處理器可以通知ChannelPipeline中的下一個處理器,甚至可以修改器隸屬於的ChannelPipeline。
下圖展示了ChannelHandlerPipeline、ChannelInboundHandler和ChannelOutboundHandler之間的關系
可以看到ChannelPipeline是由一系列ChannelHandlers組成,其還提供了通過自身傳播事件的方法,當進站事件觸發時,其從ChannelPipeline的頭部傳遞到尾部,而出站事件會從右邊傳遞到左邊。
當管道傳播事件時,其會確定下一個ChannelHandler的類型是否與移動方向匹配,若不匹配,則會跳過並尋找下一個,直至找到相匹配的ChannelHandler(一個處理器可以會同時實現ChannelInboundHandler和ChannelOutboundHandler)。
1. 修改ChannelPipeline
ChannelHandler可實時修改ChannelPipeline的布局,如添加、刪除、替換其他ChannelHandler(其可從ChannelPipeline中移除自身),如如下圖所示的方法。
通常,ChannelPipeline中的每個ChannelHandler通過其EventLoop(I / O線程)處理傳遞給它的事件,不要阻塞該線程,因為它會對I/O的整體處理產生負面影響。
2.3 ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler與ChannelPipeline之間的關聯,當ChannelHandler被添加至ChannelPipeline中時其被創建,ChannelHandlerContext的主要功能是管理相關ChannelHandler與同一ChannelPipeline中的其他ChannelHandler的交互。
ChannelHandlerContext中存在很多方法,其中一些也存在於ChannelHandler和ChannelPipeline中,但是差別很大。如果在ChannelHandler或者ChannelPipeline中調用該方法,它們將在整個管道中傳播,而如果在ChannelHandlerContext中調用方法,那么會僅僅傳遞至下個能處理該事件的ChannelHandler。
1. 使用ChannelHandlerContext
ChannelHandlerContext、ChannelHandler、ChannelHandlerContext、Channel之間的關系如下圖所示
可以通過ChannelHandlerContext來訪問Channel,並且當調用Channel的write方法時,寫事件會在管道中傳遞,代碼如下
ChannelHandlerContext ctx = ..; Channel channel = ctx.channel(); channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
除了使用Channel的write方法寫入數據外,還可以使用ChannelPipeline的write方法寫入數據,代碼如下
ChannelHandlerContext ctx = ..; ChannelPipeline pipeline = ctx.pipeline(); pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
上述兩段代碼在管道中產生的效果相同,如下圖所示。
其中兩種方法的寫事件都會通過ChannelHandlerContext在管道中傳播。
若想從指定的ChannelHandler開始傳遞事件,那么需要引用到指定ChannelHandler之前的一個ChannelHandlerContext,該ChannelHandlerContext將調用其關聯的ChannelHandler。
如下圖所示,展示了從指定ChannelHandler開始處理事件。
2. ChannelHandler和ChannelHandlerContext的高級用法
可以通過調用ChannelHandlerContext的pipeline方法獲得其對應的ChannelPipeline引用,這可以在運行中管理ChannelHandler,如添加一個ChannelHandler。
另一種高級用法是緩存ChannelHandlerContext的引用,以供之后使用。如下代碼展示了用法
public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; } public void send(String msg) { ctx.writeAndFlush(msg); } }
因為ChannelHandler可以屬於多個ChannelPipeline,所以它可以綁定到多個ChannelHandlerContext實例,當使用時必須使用@Sharable注釋,否則,當將其添加至多個ChannelPipeline時會拋出異常。如下代碼所示
@Sharable public class SharableHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Channel read message: " + msg); ctx.fireChannelRead(msg); } }
@Sharable注釋沒有任何狀態,而如下代碼會出現錯誤。
@Sharable public class UnsharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { count++; System.out.println("channelRead(...) called the " + count + " time"); ctx.fireChannelRead(msg(); } }
由於上述代碼包含了狀態,即count計數,將此類的實例添加到ChannelPipeline時,在並發訪問通道時很可能會產生錯誤。可通過在channelRead方法中進行同步來避免錯誤。
2.4 異常處理
在出站和進站時,可能會發生異常,Netty提供了多種方法處理異常。
1. 處理進站異常
當處理進站事件時發生異常,它將從ChannelInboundHandler中被觸發的位置開始流過ChannelPipeline,為處理異常,需要在實現ChannelInboundHandler接口是重寫exceptionCaught方法。如下示例所示。
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
由於異常會隨着進站事件在管道中傳遞,包含異常處理的ChannelHandler通常放在了管道的尾部,因此可以保證無論異常發生在哪個ChannelHandler中,其最終都會被處理。
2. 處理出站異常
在出站操作中處理的正常完成和處理異常都基於以下通知機制。
· 每個出站操作返回一個ChannelFuture,在ChannelFuture注冊的ChannelFutureListeners在操作完成時通知成功或錯誤。
· ChannelOutboundHandler的幾乎所有方法都會傳遞ChannelPromise實例,ChannelPromise是ChannelFuture的子類,其也可以為異步通知分配監聽器,並ChannelPromise還提供可寫的方法來提供即時通知。可通過調用ChannelFuture實例的addListener(ChannelFutureListener)方法添加一個ChannelFutureListener,最常用的是調用出站操作所返回的ChannelFuture的addListener方法,如write方法,具體代碼如下所示。
ChannelFuture future = channel.write(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } });
第二種方法是將ChannelFutureListener添加到ChannelPromise中,並將其作為參數傳遞給ChannelOutboundHandler的方法,具體代碼如下所示
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } }); } }
三、總結
本篇博文講解了ChannelHandler,以及其與ChannelPipeline、ChannelHandlerContext之間的關系,及其之間的交互,同時還了解了如何處理進站與出站時所拋出的異常。謝謝各位園友的觀看~