Netty學習筆記(二) - ChannelPipeline和ChannelHandler


ChannelPipeline 和 ChannelHandler 是 Netty 重要的組件之一,通過這篇文章,重點了解這些組件是如何驅動數據流動和處理的。

一、ChannelHandler

上一篇的整體架構圖里可以看到,ChannelHandler 負責處理入站和出站的數據。對於入站和出站,ChannelHandler 由不同類型的 Handler 進行處理。下面通過一個示例來演示,將上一篇文章里的 Demo 做一些修改:
增加以下類:

// OneChannelInBoundHandler.java

package com.niklai.demo.handler.inbound;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OneChannelInBoundHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(OneChannelInBoundHandler.class.getSimpleName());

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("channel active.....");
        ctx.fireChannelActive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8));
        ctx.fireChannelReadComplete();
    }
}
// TwoChannelInBoundHandler.java

package com.niklai.demo.handler.inbound;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwoChannelInBoundHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(TwoChannelInBoundHandler.class.getSimpleName());

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("channel active.....");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.write(Unpooled.copiedBuffer("TwoChannelInBoundHandler answer...", CharsetUtil.UTF_8));
        ctx.close().addListener(ChannelFutureListener.CLOSE);
    }
}
// OneChannelOutBoundHandler.java

package com.niklai.demo.handler.outbound;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OneChannelOutBoundHandler extends ChannelOutboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(OneChannelOutBoundHandler.class.getSimpleName());

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(msg, promise);
    }
}

修改 Server.java 類初始化的 childHandler 邏輯:

// Server.java

// 省略部分代碼

public static void init() {
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        serverBootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress("localhost", 9999))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 添加ChannelHandler
                        socketChannel.pipeline().addLast(new OneChannelOutBoundHandler());

                        socketChannel.pipeline().addLast(new OneChannelInBoundHandler());
                        socketChannel.pipeline().addLast(new TwoChannelInBoundHandler());
                    }
                });

        ChannelFuture future = serverBootstrap.bind().sync();
        future.channel().closeFuture().sync();
        group.shutdownGracefully().sync();
    } catch (InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
}

// 省略部分代碼

在上面的例子里,我們聲明了 OneChannelInBoundHandler 和 TwoChannelInBoundHandler 兩個類繼承 ChannelInBoundHandlerAdapter 處理入站數據,一個 OneChannelOutBoundHandler 類繼承 ChannelOutBoundHandlerAdapter 處理出站數據,依次添加到 ChannelPipeline 里。兩個 ChannelInBoundHandler 類都重寫了 channelActive、channelRead 和 channelReadComplete 方法,OneChannelOutBoundHandler 類重寫了 write 方法。

運行單元測試,控制台得到如下結果:

通過日志輸出結果,我們可以看到 Client 發送消息后,OneChannelInBoundHandler 的 channelRead 方法被觸發先獲得消息內容,調用 ctx.fireChannelRead(msg)方法后 TwoChannelInBoundHandler 的 channelRead 方法被觸發再次獲得到消息內容,此時消息已經到達隊尾。在 channelReadComplete 方法里調用 ctx.write(obj)方法依次寫入應答消息后,消息將會反向出站,OneChannelOutBoundHandler 的 write 被觸發獲得應答消息內容,在這個方法里調用 ctx.writeAndFlush(msg, promise)將應答消息繼續發送出去。

注意兩個 ChannelInBoundHandler 獲取消息是有先后順序的,順序取決於添加到 ChannelPipeline 的先后,並且只有當前 ChannelInBoundHandler 的 channelRead 方法里調用了 ctx.fireChannelRead(msg)方法后,消息才能被傳遞到后面的 ChannelInBoundHandler 的 channelRead 方法,channelReadComplete 方法同理。而在出站時,ChannelOutBoundHandler 的 write 方法會獲取到將要寫出的消息,可以選擇是否對消息進行再次處理后發送出去。

ChannelHandler 相關的類關系圖如下,ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 分別實現了 ChannelInBoundHandler 和 ChannelOutBoundHandler。接口一般通過繼承 ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 來實現業務數據處理:

以下兩個接口部分事件方法,更多方法可以查閱官方文檔

ChannelInBoundHandler

方法 描述
channelActive Channel 已經連接就緒時被調用
channelRead 當從 Channel 讀取數據時被調用
channelReadComplete 當 Channel 的讀取操作完成時被調用
exceptionCaught 當入站事件處理過程中出現異常時被調用

ChannelOutBoundHandler

方法 描述
write 當通過 Channel 寫數據時被調用
read 當從 Channel 讀取數據時被調用

二、ChannelPipeline

從上面的例子可以看到,加入到 ChannelPipeline 的一系列 ChannelHandler 組成了一個有序的鏈。每一個新創建的 Channel 都將被分配一個 ChannelPipeline,Channel 不能自己附加另外一個 ChannelPipeline,也不能取消當前的,這個是由框架決定的,不需要開發人員干預。

從上圖可以看到,事件消息會從頭部傳遞到尾部,然后再從尾部傳遞到頭部。在傳遞過程中,將會識別 ChannelHandler 的類型,入站事件由 InBoundHandler 處理,出站事件由 OutBoundHandler 處理,如果傳遞到下一個 ChannelHandler 時發現類型與當前方向不匹配,將會直接跳過並前進到下一個。如果某個 ChannelHandler 同時實現了 ChannelInBoundHandler 和 ChannelOutBundHandler 接口,那么當前 ChannelHandler 將會同時處理入站和出站事件。
以下是 ChannelPipeline 的一些主要方法:

方法 說明
addFirst
addLast
添加 ChannelHander 到當前 ChannelPipeline 的頭/尾部
addBefore
addAfter
添加 ChannelHander 到當前 ChannelPipeline 里某個 ChannelHandler 的前/后面
remove 將某個 ChannelHandler 從當前 ChannelPipeline 里移除
replace 將當前 ChannelPipeline 里的某個 ChannelHandler 替換成另外一個 ChannelHandler

除此之外,ChannelPipeline 也有一些觸發事件的方法,以下列出跟當前演示例子相關的事件方法,更多方法可以查閱官方文檔

方法 描述
fireChannelActive 調用 ChannelPipeline 里下一個 ChannelInBoundHandler 的 channelActive 方法
fireChannelRead 調用 ChannelPipeline 里下一個 ChannelInBoundHandler 的 ChannelRead 方法
write 調用 ChannelPipeline 里下一個 ChannelOutBoundHandler 的 write 方法

我們修改一下 Demo 中的例子:

// OneChannelInBoundHandler.java

// 省略代碼

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
    ctx.pipeline().fireChannelRead(msg);        // 調用ChannelPipeline的fireChannelRead方法
}

// 省略代碼

運行單元測試查看控制台日志,發現事件會反復觸發 OneChannelInBoundHandler 的 channelRead 方法,直到死循環。對比之前的運行結果可以看到,ChannelPipeline 的 fireChannelRead 方法會將事件重新從頭部開始向后傳遞,而 ctx.fireChannelRead 方法會將事件從當前的下一個 ChannelHandler 開始向后傳遞。

三、ChannelHandlerContext

ChannelHandlerContext 是一個接口,它維護了 ChannelHandler 和 ChannelPipeline 兩者之間的關系。當一個 ChannelHandler 加入到 ChannelPipeline 里時,就會創建一個 ChannelHandlerContext 關聯它們。下圖展示了它們之間的關系,當調用 ChannelHandlerContext 的 fire...方法時,事件都將會被傳遞到它關聯的 ChannelHandler 的下一個 ChannelHandler 上

ChannelHandlerContext 部分的 API 如下,更多 API 可以查閱官方文檔

方法 描述
pipeline 獲取關聯的 ChannelPipeline
handler 獲取關聯的 ChannelHandler
fireChannelRead 觸發下一個 ChannelInBoundHandler 的 channelRead 方法

四、異常處理

入站異常

如果在處理入站事件過程中發生了異常,則該異常將會從它所在的 ChannelInBoundHandler 開始傳遞直到 ChannelPipeline 尾部。通過重寫 exceptionCaught 方法,可以處理異常。
修改一下 Demo,增加 exceptionCaught

// OneChannelInBoundHandler.java

// 省略部分代碼

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8));
    ctx.fireChannelReadComplete();
    throw new Exception("This is an exception!");       // 模擬拋出一個異常
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
}

// 省略部分代碼

運行測試,可以看到異常信息已經打印到控制台日志:

再次修改 Demo,調用 ChannelHandlerContext 的 fireExceptionCaught 方法將異常繼續傳遞下去

// OneChannelInBoundHandler.java

// 省略部分代碼

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
    ctx.fireExceptionCaught(cause);       // 將異常傳遞下去
}

// 省略部分代碼
// TwoChannelInBoundHandler.java

// 省略部分代碼

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.error("TwoChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
}

// 省略部分代碼

運行測試,查看控制台日志,兩個 ChannelInBoundHandler 都會打印異常日志:

如果,兩個 ChannelInBoundHandler 都不重寫 exceptionCaught 方法處理異常,會怎樣?修改 Demo,刪除 exceptionCaught 后再次運行測試,查看控制台日志:

控制台輸出一條日志信息:An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.如果異常發生但是沒有被處理,異常將會一直傳遞到 ChannelPipeline 並記錄為未處理異常,以 WARN 級別日志輸出。

出站異常

出站操作的相關方法是異步的,處理異常信息都是基於通知機制。處理方式有兩種:
第一種是通過在方法返回值上注冊 listener:

// OneChannelOutBoundHandler.java

// 省略部分代碼

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
    ctx.close();        // 在發送消息之前關閉channel,后序寫入數據將會引發異常。
    ChannelFuture channelFuture = ctx.writeAndFlush(msg);
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (!f.isSuccess()) {
                logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause());
            }
        }
    });
}

// 省略部分代碼

第二種是在傳入的參數 promise 上注冊 listener:

// OneChannelOutBoundHandler.java

// 省略部分代碼

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
    ctx.close();        // 在發送消息之前關閉channel,后序寫入數據將會引發異常。

    promise.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (!f.isSuccess()) {
                logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause());
            }
        }
    });

    ctx.writeAndFlush(msg, promise);
}

// 省略部分代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM