【Netty】ChannelHandler和ChannelPipeline


一、前言

  前面學習了Netty的ByteBuf,接着學習ChannelHandler和ChannelPipeline。

二、ChannelHandler和ChannelPipeline

  2.1 ChannelHandler

  在ChannelPipeline中,ChannelHandler可以被鏈在一起處理用戶邏輯。

  1. Channel生命周期

  Channel接口定義了一個簡單但是強大的狀態模型,該模型與ChannelInboundHandler API緊密聯系,Channel有如下四種狀態。

  

  Channel的生命周期如下圖所示。

  

  當狀態發生變化時,就會產生相應的事件。

  2. ChannelHandler的生命周期

  ChannelHandler定義的生命周期如下圖所示。

  

  Netty定義了ChannelHandler的兩個重要的子類

    · ChannelInboundHandler,處理各種入站的數據和狀態的變化。

    · ChannelOutboundHandler,處理出站數據並允許攔截的所有操作。

  3. ChannelInboundHandler接口

  下圖展示了ChannelInboundHandler接口生命周期中的方法,當接受到數據或者其對應的Channel的狀態發生變化則會調用方法

  

  當ChannelInboundHandler的實現覆蓋channelRead()方法時,它負責顯式釋放與池的ByteBuf實例相關聯的內存,可以使用ReferenceCountUtil.release() 方法進行釋放。如下代碼展示了該方法的使用。

public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ReferenceCountUtil.release(msg);
    }
}

  上述顯式的釋放內存空間會顯得比較麻煩,而如下代碼則無需顯式釋放內存空間。  

@Sharable
public class SimpleDiscardHandler
    extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        Object msg) {
            // No need to do anything special
    }
}

  上述代碼中,SimpleChannelInboundHandler會自動釋放資源,因此無需顯式釋放。

  4. ChannelOutboundHandler接口

  ChannelOutboundHandler處理出站操作和數據,它的方法會被Channel、ChannelPipeline、ChannelHandlerContext觸發。ChannelOutboundHandler可按需推遲操作或事件。例如對遠程主機的寫入被暫停,你可以延遲刷新操作並在稍后重啟。

  5. ChannelHandler適配器

  可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類作為自己的ChannelHandler程序的起點,這些適配器提供了ChannelInboundHandler和ChannelOutboundHandler的簡單實現,它們繼承了共同父類接口ChannelHandler的方法,其繼承結構如下圖所示

  

  ChannelHandlerAdapter還提供了isSharable方法,如果有Sharable注釋則會返回true,這也表明它可以被添加至多個ChannelPipiline中。ChannelInboundHandlerAdapter and ChannelOutboundHandlerAdapter的方法體中會調用ChannelHandlerContext對應的方法,因此可以將事件傳遞到管道的下個ChannelHandler中。

  6. 資源管理

  無論何時調用ChannelInboundHandler.channelRead()和ChannelOutboundHandler.write()方法,都需要保證沒有資源泄露,由於Netty使用引用計數來管理ByteBuf,因此當使用完ByteBuf后需要調整引用計數。

  為了診斷可能出現的問題,Netty提供了ResourceLeakDetector,它將抽取應用程序大約1%的緩沖區分配來檢查內存泄漏,其額外的開銷很小,內存檢測有如下四種級別

  

  可以通過java -Dio.netty.leakDetectionLevel=ADVANCED 設置內存檢測級別。

  當讀取數據時,可以在readChannel方法中調用ReferenceCountUtil.release(msg)方法釋放資源,或者實現SimpleChannelInboundHandler(會自動釋放資源);而當寫入數據時,可以在write方法中調用ReferenceCountUtil.release(msg)釋放資源,具體代碼如下 

@Sharable
public class DiscardOutboundHandler
    extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx,
            Object msg, ChannelPromise promise) {
            ReferenceCountUtil.release(msg);
            promise.setSuccess();
        }
}

  不僅需要釋放資源,並且需要通知ChannelPromise,否則ChannelFutureListener可能收不到事件已經被處理的通知。如果消息到達實際的傳輸層,就可以在寫操作完成或者關閉通道時會自動釋放資源。

  2.2 ChannelPipeline接口

  如果將ChannelPipeline視為ChannelHandler實例鏈,可攔截流經通道的入站和出站事件,即可明白ChannelHandler之間的交互是如何構成應用程序數據和事件處理邏輯的核心的。當創建一個新的Channel時,都會分配了一個新的ChannelPipeline,該關聯是永久的,該通道既不能附加另一個ChannelPipeline也不能分離當前的ChannelPipeline。

  一個事件要么被ChannelInboundHander處理,要么被ChannelOutboundHandler處理,隨后,它將通過調用ChannelHandlerContext的實現來將事件轉發至同一超類型的下一個處理器。ChannelHandlerContext允許ChannelHandler與其ChannelPipeline和其他ChannelHandler進行交互,一個處理器可以通知ChannelPipeline中的下一個處理器,甚至可以修改器隸屬於的ChannelPipeline。

  下圖展示了ChannelHandlerPipeline、ChannelInboundHandler和ChannelOutboundHandler之間的關系

  

  可以看到ChannelPipeline是由一系列ChannelHandlers組成,其還提供了通過自身傳播事件的方法,當進站事件觸發時,其從ChannelPipeline的頭部傳遞到尾部,而出站事件會從右邊傳遞到左邊。

  當管道傳播事件時,其會確定下一個ChannelHandler的類型是否與移動方向匹配,若不匹配,則會跳過並尋找下一個,直至找到相匹配的ChannelHandler(一個處理器可以會同時實現ChannelInboundHandler和ChannelOutboundHandler)。

  1. 修改ChannelPipeline

  ChannelHandler可實時修改ChannelPipeline的布局,如添加、刪除、替換其他ChannelHandler(其可從ChannelPipeline中移除自身),如如下圖所示的方法。

  

  通常,ChannelPipeline中的每個ChannelHandler通過其EventLoop(I / O線程)處理傳遞給它的事件,不要阻塞該線程,因為它會對I/O的整體處理產生負面影響。

  2.3 ChannelHandlerContext接口

  ChannelHandlerContext代表了ChannelHandler與ChannelPipeline之間的關聯,當ChannelHandler被添加至ChannelPipeline中時其被創建,ChannelHandlerContext的主要功能是管理相關ChannelHandler與同一ChannelPipeline中的其他ChannelHandler的交互。

  ChannelHandlerContext中存在很多方法,其中一些也存在於ChannelHandler和ChannelPipeline中,但是差別很大。如果在ChannelHandler或者ChannelPipeline中調用該方法,它們將在整個管道中傳播,而如果在ChannelHandlerContext中調用方法,那么會僅僅傳遞至下個能處理該事件的ChannelHandler。

  1. 使用ChannelHandlerContext

  ChannelHandlerContext、ChannelHandler、ChannelHandlerContext、Channel之間的關系如下圖所示

  

  可以通過ChannelHandlerContext來訪問Channel,並且當調用Channel的write方法時,寫事件會在管道中傳遞,代碼如下 

ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel();
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

  除了使用Channel的write方法寫入數據外,還可以使用ChannelPipeline的write方法寫入數據,代碼如下 

ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

  上述兩段代碼在管道中產生的效果相同,如下圖所示。

  

  其中兩種方法的寫事件都會通過ChannelHandlerContext在管道中傳播。

  若想從指定的ChannelHandler開始傳遞事件,那么需要引用到指定ChannelHandler之前的一個ChannelHandlerContext,該ChannelHandlerContext將調用其關聯的ChannelHandler。

  如下圖所示,展示了從指定ChannelHandler開始處理事件。

  

  2. ChannelHandler和ChannelHandlerContext的高級用法

  可以通過調用ChannelHandlerContext的pipeline方法獲得其對應的ChannelPipeline引用,這可以在運行中管理ChannelHandler,如添加一個ChannelHandler。

  另一種高級用法是緩存ChannelHandlerContext的引用,以供之后使用。如下代碼展示了用法 

public class WriteHandler extends ChannelHandlerAdapter {
    private ChannelHandlerContext ctx;
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    public void send(String msg) {
        ctx.writeAndFlush(msg);
    }
}        

  因為ChannelHandler可以屬於多個ChannelPipeline,所以它可以綁定到多個ChannelHandlerContext實例,當使用時必須使用@Sharable注釋,否則,當將其添加至多個ChannelPipeline時會拋出異常。如下代碼所示  

@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Channel read message: " + msg);
        ctx.fireChannelRead(msg);
    }
}

  @Sharable注釋沒有任何狀態,而如下代碼會出現錯誤。  

@Sharable
public class UnsharableHandler extends           
    ChannelInboundHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        count++;
        System.out.println("channelRead(...) called the "
            + count + " time");
        ctx.fireChannelRead(msg();
    }
}

  由於上述代碼包含了狀態,即count計數,將此類的實例添加到ChannelPipeline時,在並發訪問通道時很可能會產生錯誤。可通過在channelRead方法中進行同步來避免錯誤。

  2.4 異常處理

  在出站和進站時,可能會發生異常,Netty提供了多種方法處理異常。

  1. 處理進站異常

  當處理進站事件時發生異常,它將從ChannelInboundHandler中被觸發的位置開始流過ChannelPipeline,為處理異常,需要在實現ChannelInboundHandler接口是重寫exceptionCaught方法。如下示例所示。  

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

  由於異常會隨着進站事件在管道中傳遞,包含異常處理的ChannelHandler通常放在了管道的尾部,因此可以保證無論異常發生在哪個ChannelHandler中,其最終都會被處理。

  2. 處理出站異常

  在出站操作中處理的正常完成和處理異常都基於以下通知機制。

    · 每個出站操作返回一個ChannelFuture,在ChannelFuture注冊的ChannelFutureListeners在操作完成時通知成功或錯誤。

    · ChannelOutboundHandler的幾乎所有方法都會傳遞ChannelPromise實例,ChannelPromise是ChannelFuture的子類,其也可以為異步通知分配監聽器,並ChannelPromise還提供可寫的方法來提供即時通知。可通過調用ChannelFuture實例的addListener(ChannelFutureListener)方法添加一個ChannelFutureListener,最常用的是調用出站操作所返回的ChannelFuture的addListener方法,如write方法,具體代碼如下所示。  

ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) {
        if (!f.isSuccess()) {
            f.cause().printStackTrace();
            f.channel().close();
        }
    }
});

  第二種方法是將ChannelFutureListener添加到ChannelPromise中,並將其作為參數傳遞給ChannelOutboundHandler的方法,具體代碼如下所示  

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
        ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                if (!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
}

三、總結

  本篇博文講解了ChannelHandler,以及其與ChannelPipeline、ChannelHandlerContext之間的關系,及其之間的交互,同時還了解了如何處理進站與出站時所拋出的異常。謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM