ChannelPipeline 和 ChannelHandler 是 Netty 重要的組件之一,通過這篇文章,重點了解這些組件是如何驅動數據流動和處理的。
一、ChannelHandler
在上一篇的整體架構圖里可以看到,ChannelHandler 負責處理入站和出站的數據。對於入站和出站,ChannelHandler 由不同類型的 Handler 進行處理。下面通過一個示例來演示,將上一篇文章里的 Demo 做一些修改:
增加以下類:
// OneChannelInBoundHandler.java
package com.niklai.demo.handler.inbound;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OneChannelInBoundHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(OneChannelInBoundHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("channel active.....");
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8));
ctx.fireChannelReadComplete();
}
}
// TwoChannelInBoundHandler.java
package com.niklai.demo.handler.inbound;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TwoChannelInBoundHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(TwoChannelInBoundHandler.class.getSimpleName());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("channel active.....");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.copiedBuffer("TwoChannelInBoundHandler answer...", CharsetUtil.UTF_8));
ctx.close().addListener(ChannelFutureListener.CLOSE);
}
}
// OneChannelOutBoundHandler.java
package com.niklai.demo.handler.outbound;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OneChannelOutBoundHandler extends ChannelOutboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(OneChannelOutBoundHandler.class.getSimpleName());
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg, promise);
}
}
修改 Server.java 類初始化的 childHandler 邏輯:
// Server.java
// 省略部分代碼
public static void init() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress("localhost", 9999))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加ChannelHandler
socketChannel.pipeline().addLast(new OneChannelOutBoundHandler());
socketChannel.pipeline().addLast(new OneChannelInBoundHandler());
socketChannel.pipeline().addLast(new TwoChannelInBoundHandler());
}
});
ChannelFuture future = serverBootstrap.bind().sync();
future.channel().closeFuture().sync();
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// 省略部分代碼
在上面的例子里,我們聲明了 OneChannelInBoundHandler 和 TwoChannelInBoundHandler 兩個類繼承 ChannelInBoundHandlerAdapter 處理入站數據,一個 OneChannelOutBoundHandler 類繼承 ChannelOutBoundHandlerAdapter 處理出站數據,依次添加到 ChannelPipeline 里。兩個 ChannelInBoundHandler 類都重寫了 channelActive、channelRead 和 channelReadComplete 方法,OneChannelOutBoundHandler 類重寫了 write 方法。
運行單元測試,控制台得到如下結果:
通過日志輸出結果,我們可以看到 Client 發送消息后,OneChannelInBoundHandler 的 channelRead 方法被觸發先獲得消息內容,調用 ctx.fireChannelRead(msg)方法后 TwoChannelInBoundHandler 的 channelRead 方法被觸發再次獲得到消息內容,此時消息已經到達隊尾。在 channelReadComplete 方法里調用 ctx.write(obj)方法依次寫入應答消息后,消息將會反向出站,OneChannelOutBoundHandler 的 write 被觸發獲得應答消息內容,在這個方法里調用 ctx.writeAndFlush(msg, promise)將應答消息繼續發送出去。
注意兩個 ChannelInBoundHandler 獲取消息是有先后順序的,順序取決於添加到 ChannelPipeline 的先后,並且只有當前 ChannelInBoundHandler 的 channelRead 方法里調用了 ctx.fireChannelRead(msg)方法后,消息才能被傳遞到后面的 ChannelInBoundHandler 的 channelRead 方法,channelReadComplete 方法同理。而在出站時,ChannelOutBoundHandler 的 write 方法會獲取到將要寫出的消息,可以選擇是否對消息進行再次處理后發送出去。
ChannelHandler 相關的類關系圖如下,ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 分別實現了 ChannelInBoundHandler 和 ChannelOutBoundHandler。接口一般通過繼承 ChannelInBoundHandlerAdapter 和 ChannelOutBoundHandlerAdapter 來實現業務數據處理:
以下兩個接口部分事件方法,更多方法可以查閱官方文檔
ChannelInBoundHandler
方法 | 描述 |
---|---|
channelActive | Channel 已經連接就緒時被調用 |
channelRead | 當從 Channel 讀取數據時被調用 |
channelReadComplete | 當 Channel 的讀取操作完成時被調用 |
exceptionCaught | 當入站事件處理過程中出現異常時被調用 |
ChannelOutBoundHandler
方法 | 描述 |
---|---|
write | 當通過 Channel 寫數據時被調用 |
read | 當從 Channel 讀取數據時被調用 |
二、ChannelPipeline
從上面的例子可以看到,加入到 ChannelPipeline 的一系列 ChannelHandler 組成了一個有序的鏈。每一個新創建的 Channel 都將被分配一個 ChannelPipeline,Channel 不能自己附加另外一個 ChannelPipeline,也不能取消當前的,這個是由框架決定的,不需要開發人員干預。
從上圖可以看到,事件消息會從頭部傳遞到尾部,然后再從尾部傳遞到頭部。在傳遞過程中,將會識別 ChannelHandler 的類型,入站事件由 InBoundHandler 處理,出站事件由 OutBoundHandler 處理,如果傳遞到下一個 ChannelHandler 時發現類型與當前方向不匹配,將會直接跳過並前進到下一個。如果某個 ChannelHandler 同時實現了 ChannelInBoundHandler 和 ChannelOutBundHandler 接口,那么當前 ChannelHandler 將會同時處理入站和出站事件。
以下是 ChannelPipeline 的一些主要方法:
方法 | 說明 |
---|---|
addFirst addLast |
添加 ChannelHander 到當前 ChannelPipeline 的頭/尾部 |
addBefore addAfter |
添加 ChannelHander 到當前 ChannelPipeline 里某個 ChannelHandler 的前/后面 |
remove | 將某個 ChannelHandler 從當前 ChannelPipeline 里移除 |
replace | 將當前 ChannelPipeline 里的某個 ChannelHandler 替換成另外一個 ChannelHandler |
除此之外,ChannelPipeline 也有一些觸發事件的方法,以下列出跟當前演示例子相關的事件方法,更多方法可以查閱官方文檔
方法 | 描述 |
---|---|
fireChannelActive | 調用 ChannelPipeline 里下一個 ChannelInBoundHandler 的 channelActive 方法 |
fireChannelRead | 調用 ChannelPipeline 里下一個 ChannelInBoundHandler 的 ChannelRead 方法 |
write | 調用 ChannelPipeline 里下一個 ChannelOutBoundHandler 的 write 方法 |
我們修改一下 Demo 中的例子:
// OneChannelInBoundHandler.java
// 省略代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
ctx.pipeline().fireChannelRead(msg); // 調用ChannelPipeline的fireChannelRead方法
}
// 省略代碼
運行單元測試查看控制台日志,發現事件會反復觸發 OneChannelInBoundHandler 的 channelRead 方法,直到死循環。對比之前的運行結果可以看到,ChannelPipeline 的 fireChannelRead 方法會將事件重新從頭部開始向后傳遞,而 ctx.fireChannelRead 方法會將事件從當前的下一個 ChannelHandler 開始向后傳遞。
三、ChannelHandlerContext
ChannelHandlerContext 是一個接口,它維護了 ChannelHandler 和 ChannelPipeline 兩者之間的關系。當一個 ChannelHandler 加入到 ChannelPipeline 里時,就會創建一個 ChannelHandlerContext 關聯它們。下圖展示了它們之間的關系,當調用 ChannelHandlerContext 的 fire...方法時,事件都將會被傳遞到它關聯的 ChannelHandler 的下一個 ChannelHandler 上
ChannelHandlerContext 部分的 API 如下,更多 API 可以查閱官方文檔
方法 | 描述 |
---|---|
pipeline | 獲取關聯的 ChannelPipeline |
handler | 獲取關聯的 ChannelHandler |
fireChannelRead | 觸發下一個 ChannelInBoundHandler 的 channelRead 方法 |
四、異常處理
入站異常
如果在處理入站事件過程中發生了異常,則該異常將會從它所在的 ChannelInBoundHandler 開始傳遞直到 ChannelPipeline 尾部。通過重寫 exceptionCaught 方法,可以處理異常。
修改一下 Demo,增加 exceptionCaught
// OneChannelInBoundHandler.java
// 省略部分代碼
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.copiedBuffer("OneChannelInBoundHandler answer...", CharsetUtil.UTF_8));
ctx.fireChannelReadComplete();
throw new Exception("This is an exception!"); // 模擬拋出一個異常
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
}
// 省略部分代碼
運行測試,可以看到異常信息已經打印到控制台日志:
再次修改 Demo,調用 ChannelHandlerContext 的 fireExceptionCaught 方法將異常繼續傳遞下去
// OneChannelInBoundHandler.java
// 省略部分代碼
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("OneChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
ctx.fireExceptionCaught(cause); // 將異常傳遞下去
}
// 省略部分代碼
// TwoChannelInBoundHandler.java
// 省略部分代碼
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("TwoChannelInBoundHandler exception:{}....", cause.getMessage(), cause);
}
// 省略部分代碼
運行測試,查看控制台日志,兩個 ChannelInBoundHandler 都會打印異常日志:
如果,兩個 ChannelInBoundHandler 都不重寫 exceptionCaught 方法處理異常,會怎樣?修改 Demo,刪除 exceptionCaught 后再次運行測試,查看控制台日志:
控制台輸出一條日志信息:An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
如果異常發生但是沒有被處理,異常將會一直傳遞到 ChannelPipeline 並記錄為未處理異常,以 WARN 級別日志輸出。
出站異常
出站操作的相關方法是異步的,處理異常信息都是基於通知機制。處理方式有兩種:
第一種是通過在方法返回值上注冊 listener:
// OneChannelOutBoundHandler.java
// 省略部分代碼
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
ctx.close(); // 在發送消息之前關閉channel,后序寫入數據將會引發異常。
ChannelFuture channelFuture = ctx.writeAndFlush(msg);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause());
}
}
});
}
// 省略部分代碼
第二種是在傳入的參數 promise 上注冊 listener:
// OneChannelOutBoundHandler.java
// 省略部分代碼
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("write msg: {}.....", buf.toString(CharsetUtil.UTF_8));
ctx.close(); // 在發送消息之前關閉channel,后序寫入數據將會引發異常。
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
logger.error("OneChannelOutBoundHandler cause:{}.......", f.cause().getMessage(), f.cause());
}
}
});
ctx.writeAndFlush(msg, promise);
}
// 省略部分代碼