Netty實戰六之ChannelHandler和ChannelPipeline


1、Channel的生命周期

Interface Channel定義了一組和ChannelInboundHandler API密切相關的簡單但功能強大的狀態模型,以下列出Channel的4個狀態。

ChannelUnregistered:Channel已經被創建,但還未注冊到EventLoop

ChannelRegistered:Channel已經被注冊到了EventLoop

ChannelActive:Channel處於活動狀態(已經連接到它的遠程節點)。它現在可以接收和發送數據了

ChannelInactive:Channel沒有連接到遠程節點

Channel的正常生命周期如下圖所示,當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline中的ChannelHandler,其可以隨后對它們做出響應。 輸入圖片說明

2、ChannelHandler的生命周期

下表列出了interface ChannelHandler定義的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被從ChannelPipeline中移除時會調用這些操作,這些方法中的每一個都接受一個ChannelHandlerContext參數。

handlerAdded:當把ChannelHandler添加到ChannelPipeline中時被調用

handlerRemoved:當從ChannelPipeline中移除ChannelHandler時被調用

exceptionCaught:當處理過程中在ChannelPipeline中有錯誤產生時被調用

Netty定義了下面兩個重要的ChannelHandler子接口:

·ChannelInboundHandler——處理入站數據以及各種狀態變化

·ChannelOutboundHandler——處理出站數據並且允許攔截所有的操作

3、ChannelInboundHandler接口

當某個ChannelInboundHandler的實現重寫channelRead()方法時,它將負責顯式地釋放與池化ByteBuf實例相關的內存,Netty為此提供了一個實用方法ReferenceCountUtil.release()。

@ChannelHandler.Sharable //擴展了ChannelInboundHandlerAdapter public class DiscardHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //丟棄已接收的消息 ReferenceCountUtil.release(msg); } } 

Netty將使用WARN級別的日志消息記錄未釋放的資源,使得可以非常簡單地在代碼中發現違規的實例,但是以這種方式管理資源可能很繁瑣。一個更加簡單的方式是使用SimpleChannelInboundHandler。

@ChannelHandler.Sharable //擴展了SimpleChannelInboundHandler public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object>{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { //不需要任何顯式的資源釋放 //No need to do anything special } } 

由於SimpleChannelInboundHandler會自動釋放資源,所以你不應該存儲指向任何消息的引用供將來使用,因為這些引用都將會失效。

4、ChannelOutboundHandler接口

出站操作和數據將由ChannelOutboundHandler處理,它的方法將被Channel、ChannelPipeline以及ChannelHandlerContext調用。

ChannelOutboundHandler的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些復雜的方法來處理請求。例如,如果到遠程節點的寫入被暫停了,那么你可以推遲沖刷並在稍后繼續。

ChannelPromise與ChannelFuture : ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數,以便在操作完成時得到通知。ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),從而使ChannelFuture不可變。

5、ChannelHandler適配器

你可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類作為自己的ChannelHandler的起始點。這兩個適配器分別提供了ChannelInboundHandler和ChannelOutboundHandler的基本實現,通過擴展抽象類ChannelHandlerAdapter,他們獲得了他們共同的超接口ChannelHandler的方法。生成的類的層次結構如下圖。 輸入圖片說明ChannelHandlerAdapter還提供了使用方法isSharable(),如果其對應的實現被標注為Sharable,那么這個方法都將返回true,表示它可以被添加到多個ChannelPipeline中。

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法體調用了其相關聯的ChannelHandlerContext上的等效方法,從而將事件轉發到了ChannelPipeline中的下一個ChannelHandler中。

6、資源管理

每當通過調用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()方法來處理數據時,你都需要確保沒有任何的資源泄露。你可能還記得前面的章節中所提到的,Netty使用引用技術來處理池化的ByteBuf。所以在完全使用完某個ByteBuf后,調整其引用計數是很重要的。

為了幫助你診斷潛在的(資源泄露)問題,Netty提供了class ResourceLeakDetector,它將對你應用程序的緩沖區分配做大約1%的采樣來檢測內存泄露。相關的開銷是非常小的。

Netty定義了4種泄露檢測級別。

DISABLED——禁用泄露檢測

SIMPLE——使用1%的默認采樣率檢測並報告任何發現的泄露

ADVANCED——使用默認的采樣率,報告所發現的任何的泄露以及對應的消息被訪問的位置

PARANOID——類似於ADVANCED,但是其將會對每次訪問都進行采樣,這對性能將會有很大的影響,應該只在調試階段使用

泄露檢測級別可以通過將下面的Java系統屬性設置為表中的一個值來定義:

java -Dio.netty.leakDetectionLevel = ADVANCED

如果帶着該JVM選項重新啟動你的應用程序,你將看到自己的應用程序最近被泄露的緩沖區被訪問的位置。

實現ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法時,應該如何使用這個診斷工具來防止泄露呢?讓我們看看你的channelRead()操作直接消費入站消息的情況,也就是說,他不會通過調用ChannelHandlerContext.fireChannelRead()方法將入站消息轉發給下一個ChannelInboundHandler。

消費入站消息的簡單方式: 由於消費入站數據是一項常規任務,所以Netty提供了一個特殊的被稱為SimpleChannelInboundHandler的ChannelInboundHandler實現,這個實現會在消息被channelRead0()方法消費之后自動釋放消息。

在出站方向這邊,如果你處理了write()操作並丟棄了一個消息,那么你也應該負責釋放它。以下代碼展示了一個丟棄所有的寫入數據的實現。

@ChannelHandler.Sharable public class DiscardoutBoundHandler extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //釋放資源 ReferenceCountUtil.release(msg); //通知ChannelPromise數據已經被處理了 promise.setSuccess(); } } 

重要的是,不僅要釋放資源,還要通知ChannelPromise。否則可能會出現ChannelFutureListener收不到某個消息已經被處理了的通知的消息。

總之,如果一個消息被消費或者丟棄了,並且沒有傳遞給ChannelPipeline中的下一個ChannelOutboundHandler,那么用戶就有責任調用ReferenceCountUtil.release()。如果消息到達了實際的傳輸層,那么當它被寫入時或者Channel關閉時,都將被自動釋放。

7、ChannelPipeline接口

如果你認為ChannelPipeline是一個攔截流經Channel的入站和出站事件的ChannelHandler實例鏈,那么就很容易看出這些ChannelHandler之間的交互式如何組成一個應用程序數據和時間處理邏輯的核心的。

每一個新創建的Channel都將會被分配一個新的ChannelPipeline。這項關聯時永久的,Channel即不能附加另外一個ChannelPipeline,也不能分離其當前的,在Netty組件的生命周期中,這是一項固定的操作,不需要開發人員的任何干預。

根據事件的起源,事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理,隨后,通過調用ChannelHandlerContext實現,它將被轉發給同一個超類型的下一個ChannelHandler。

ChannelHandlerContext:ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler交互,ChannelHandler可以通知其所屬的ChannelPipeline中的下一個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline。ChannelHandlerContext具有豐富的用於處理事件和執行I/O操作的API。

下圖展示了一個典型的同時具有入站和出站ChannelHandler的ChannelPipeline的布局,並且印證了我們之前的關於ChannelPipeline主要由一系列的ChannelHandler所組成的說法,ChannelPipeline還提供了通過ChannelPipeline本身傳播事件的方法。如果一個入站事件被觸發,它將被從ChannelPipeline的頭部開始一直被傳播到ChannelPipeline的尾端。如圖所示,一個出站I/O事件將從ChannelPipeline的最右邊開始,然后向左傳播。 輸入圖片說明在ChannelPipeline傳播事件時,它會測試ChannelPipeline中的下一個ChannelHandler的類型是否和事件的運動方向相匹配。如果不匹配,ChannelPipeline將跳過該ChannelHandler並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。

8、修改ChannelPipeline

通過調用ChannelPipeline上的相關方法,ChannelHandler可以添加、刪除或者替換其他的ChannelHandler,從而實時地修改ChannelPipeline的布局。

ChannelPipeline pipeline = ...;
        FirstHandler firstHandler = new FirstHandler(); //將該實例作為“handler1”添加到ChannelPipeline中 pipeline.addLast("handler1",firstHandler); //將一個SecondHandler的實例作為“handler2”添加到ChannelPipeline的第一個槽中,這意味着它將被放置在已有的“handler1”之前 pipeline.addLast("handler2",new SecondHandler()); //將一個ThirdHandler的實例作為“handler3”添加到ChannelPipeline的最后一個槽中 pipeline.addLast("handler3",new ThirdHandler()); ... //通過名稱移除“handler3” pipeline.remove("handler3"); //通過引用移除FirstHandler pipeline.remove(firstHandler); //將SecondHandler(“handler2”)替換為FourthHandler:"handler4" pipeline.replace("handler2","handler4",new ForthHandler()); 

ChannelHandler的執行和阻塞:通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的I/O處理產生負面的影響。但有時可能需要與那些使用阻塞API的遺留代碼進行交互,對於這個情況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法,如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除,對於這種用例,Netty提供了一種叫DefaultEventExecutorGroup的默認實現。

——ChannelPipeline保存了與Channel相關聯的ChannelHandler

——ChannelPipeline可以根據需要、通過添加或者刪除ChannelHandler來動態修改

——ChannelPipeline有着豐富的API用以被調用、以響應入站和出站事件

——ChannelHandlerContext和ChannelHandler之間的關聯(綁定)是永遠不會改變的,所以緩存對它的引用是安全的

9、使用ChannelHandlerContext

以下代碼,將通過ChannelHandlerContext獲取到Channel的引用,調用Channel上的write()方法將會導致寫入事件從尾端到頭部地流經ChannelPipeline。 輸入圖片說明以下代碼展示了一個類似的例子,但是這一次是寫入ChannelPipeline。我們再次看到,(到ChannelPipeline的)引用是通過ChannelHandlerContext獲取的。

ChannelHandlerContext ctx = ..;
        //獲取到與ChannelHandlerContext相關聯的Channel的引用 Channel channel = ctx.channel(); //通過Channel寫入緩沖區 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); 

為什么會想要從ChannelPipeline中的某個特定點開始傳播事件呢?

——為了減少將事件傳經對它不感興趣的ChannelHandler所帶來的開銷

——為了避免將事件傳經那些可能會對它感興趣的ChannelHandler。

10、ChannelHandler和ChannelHandlerContext的高級用法

可以通過將ChannelHandler添加到ChannelPipeline中來實現動態的協議切換,緩存到ChannelHandlerContext的引用以供稍后使用,這可能會發生在任何的ChannelHandler方法之外,甚至來自於不同的線程。

以下代碼,緩存到ChannelHandlerContext的引用

public class WriteHandler extends ChannelHandlerAdapter{ private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //存儲到ChannelHandlerContext的引用以供稍后使用 this.ctx = ctx; } public void send(String msg){ //使用之前存儲的到ChannelHandlerContext的引用來發送消息 ctx.writeAndFlush(msg); } } 

因為一個ChannelHandler可以從屬於多個ChannelPipeline,所以它也可以綁定到多個ChannelHandlerContext實例,對於這種用法(指在多個ChannelPipeline中共享同一個ChannelHandler),對應的ChannelHandler必須要使用@Sharable注解標注;否則,試圖將它添加到多個ChannelPipeline時將會觸發異常,顯而易見,為了安全地被用於多個並發的Channel(連接),這樣的ChannelHandler必須是線程安全的。

以下代碼,展示這種模式。

@ChannelHandler.Sharable //使用注解@Sharable標注 public class SharableHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Channel read message: " + msg); //記錄方法調用,並轉發給下一個ChannelHandler ctx.fireChannelRead(msg); } } 

前面的ChannelHandler實現了符合所有的將其加入到多個ChannelPipeline的需求,即它使用了注解@Sharable標注,並且也不持有任何的狀態。

以下代碼,演示@Sharable的錯誤用法

@ChannelHandler.Sharable public class UnSharableHandler extends ChannelInboundHandlerAdapter{ private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將count字段值加1 count++; System.out.println("channelRead(...) called the " + count + " time"); //記錄方法調用,並轉發給下一個ChannelHandler ctx.fireChannelRead(msg); } } 

這段代碼的問題在於它擁有狀態,即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個並發Channel訪問時導致問題。(可以將ChannelRead()方法變為同步方法)

總之,只應該在確定了你的ChannelHandler是線程安全的時才使用@Sharable注解。

為何要共享同一個ChannelHandler:在多個ChannelPipeline中安裝同一個ChannelHandler的一個常見的原因是用於收集跨越多個Channel的統計信息。

11、處理入站異常

異常處理是任何真實應用程序的重要組成部分,它也可以通過多種方式來實現,因此,Netty提供了幾種方式用於處理入站或者出站處理過程中所拋出的異常。

如果在處理入站事件的過程中有異常被拋出,那么它將從它在ChannelInboundHandler里被觸發的那一點開始流經ChannelPipeline。要想處理這種類型的入站異常,你需要在你的ChannelInboundHandler實現exceptionCaught方法。

以下代碼,展示了其關閉Channel並打印了異常的棧跟蹤信息

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 

因為異常將會繼續按照入站方向流動(就像所有入站事件一樣),所以實現了前面所示邏輯的ChannelInboundHandler通常位於ChannelPipeline的最后,這確保了所有的入站異常都總是會被處理,無論他們可能會發生在ChannelPipeline中的什么位置。

你應該如何響應異常,可能很大程序上取決於你的應用程序,你可能想要關閉Channel(和連接),也可能會嘗試進行恢復。如果你不實現任何處理入站異常的邏輯,那么Netty將會記錄該異常沒有被處理的事實。

——ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline中的下一個ChannelHandler

——如果異常到達了ChannelPipeline的尾端,它將會被記錄為未處理

——要想定義自定義的處理邏輯,你需要重寫exceptionCaught方法,然后你需要決定是否需要將該異常傳播出去

12、處理出站異常

——每個出站操作都將返回一個ChannelFuture。注冊到ChannelFuture的ChannelFutureListener將在操作完成時被通知該操作是成功了還是出錯了

——幾乎所有的ChannelOutboundHandler上的方法都會傳入一個ChannelPromise的實例,作為ChannelFuture的子類,ChannelPromise也可以被分配用於異步通知的監聽器,但是,ChannelPromise還具有提供立即通知的可寫方法。

以下代碼,添加channelFutureListener,它將打印棧跟蹤信息,並且隨后關閉Channel

ChannelFuture future = channel.wirte(someMessage); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()){ channelFuture.cause().printStackTrace(); channelFuture.channel().close(); } } }); 

第二種方式是將ChannelFutrueListener添加到即將作為參數傳遞給ChannelOutboundHandler的方法的ChannelPromise。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()){ f.cause().printStackTrace(); f.channel().close(); } } }); } } 

ChannelPromise的可寫方法:通過調用ChannelPromise上的setSuccess()和setFailure()方法,可以使一個操作的狀態在ChannelHandler的方法返回給其調用者時便即刻被感知到。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM