Pipeline和ChannelHandler是Netty處理流程的重要組成部分,ChannelHandler對應一個個業務處理器,Pipeline則是負責將各個ChannelHandler串起來的“容器”,二者結合起來一起完成Netty的處理流程。
Pipeline
每個channel內部都會持有一個ChannelPipeline對象pipeline,pipeline默認實現DefaultChannelPipeline內部維護了一個DefaultChannelHandlerContext鏈表。
channel的讀寫操作都會走到DefaultChannelPipeline中,當channel完成register、active、read、readComplete等操作時,會觸發pipeline的相應方法。
- 當channel注冊到selector后,觸發pipeline的fireChannelRegistered方法;
- 當channel是可用時,觸發pipeline的fireChannelActive方法。(fireChannelActive觸發一般是在fireChannelRegistered之后觸發的);
- 當客戶端發送數據時,觸發pipeline的fireChannelRead方法;
- 觸發pipeline的fireChannelRead方法之后會觸發pipeline的fireChannelReadComplete方法。
DefaultChannelPipeline
是Netty默認pipeline實現,對應代碼如下:
public class DefaultChannelPipeline implements ChannelPipeline {
// head和tail是handler的處理鏈/上下文
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
TailContext實現了ChannelOutboundHandler接口,HeadContext實現了ChannelInboundHandler和ChannelOutboundHandler接口,head和tail構成了一個鏈表。
對於Inbound操作,從head開始進行處理,向后遍歷;對於OutBound操作,從tail開始處理,向前遍歷。那么哪些操作是Inbound哪些是OutBound操作呢?
- InBound:channelRegistered、channelActive、channelRead、channelReadComplete;
- OutBound:bind、connect、close、flush等。
注意,HeadContext實現了ChannelInboundHandler
和ChannelOutboundHandler
接口,對於OutBound操作,最后也是會走到HeadContext來處理的,其實TailContext只是一個淺封裝,實際邏輯並不多。HeadContext 中包含了一個netty底層的socket操作類,對於bind/connect/disconnect/close/deregister/beginRead/read/wirte/flush
操作都是由unsafe對象來完成的。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
// netty的底層socket操作類
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
// ...
}
channelPipeline的channelHandlerContext鏈表是“責任鏈”模式的體現,一個請求的處理可能會涉及到多個channelHandler,比如decodeHandler、自定義的業務channelHandler和encodeHandler。業務channelHandler示例如下:
public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(in.toString(CharsetUtil.UTF_8));
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
在channelReadComplete方法中調用flush,其實會走到head.flush方法,最后調用unsafe.flush將數據發送出去。netty pipeline就是責任鏈(或者說是流水線)模式的體現,通過pipeline機制,使netty處理數據機制具有強大的擴展性和靈活性。
ChannelHandler
netty的channelHandler是channel處理器,基於netty的業務處理,不管多么復雜,都是由channelHandler來做的,可能涉及到多個channelHandler,channelHandler分為多種類型:encoder、decoder、業務處理等。
decoderHandler
decoderHandler大都是接收到數據之后進行轉換或者處理的,基本都是ByteToMessageDecoder的子類,其類圖如下:
ByteToMessageDecoder中會有一個數據暫存緩沖區,如果接收到數據不完整,可以先暫存下等到下次接收到數據時再處理。
encoderHandler
encoderHandler大都是將message轉換成bytebuf數據,基本都是MessageToByteEncoder的子類,其類圖如下:
業務channelHandler
業務處理channelHanler就是用戶自定義的業務邏輯了,一般是在最后才addLast到channel.pipeline的,比如http處理邏輯如下:
ServerBootstrap boot = new ServerBootstrap();
boot.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(8080)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", new HttpRequestDecoder())
.addLast("encoder", new HttpResponseEncoder())
.addLast("aggregator", new HttpObjectAggregator(512 * 1024))
.addLast("handler", new HttpHandler());
}
});
DefaultChannelPipeline中的headContext(實現了ChannelOutboundHandler和ChannelInboundHandler)、tailContext(實現了ChannelOutboundHandler)和自定義的channelHandler(decoderHandler、ecoderHandler、channelHandler等,一般實現ChannelInboundHandler),通過ChannelHandlerContext的鏈接,組成了一個請求處理鏈。
注意,ChannelOutboundHandler和ChannelInboundHandler的順序如何添加的,其實只要記住一條:ChannelOutboundHandler之間要保證順序,ChannelInboundHandler之間要保證順序,二者之間無需保證順序。
channelHandler的運行流程圖:
TailContesxt
本身代碼不多並且挺多方法都是"空實現",不過它的channelRead方法內部會執行ReferenceCountUtil.release(msg)
釋放msg占用的內存空間,也就是說在未定義用戶ChannelHandler
或者用戶ChannelHandler的channelRead繼續傳遞后續ChannelHandler的channelRead
時,到TailContext的channelRead時會自動釋放msg所占用內存。
推薦閱讀
歡迎小伙伴關注【TopCoder】閱讀更多精彩好文。