ChannelPipeline
ChannelPipeline不是單獨存在,它肯定會和Channel、ChannelHandler、ChannelHandlerContext關聯在一起,所以有關概念這里一起講。
一、ChannelHandler
1、概念
先看圖
ChannelHandler下主要是兩個子接口
ChannelInboundHandler(入站): 處理輸入數據和Channel狀態類型改變。
適配器: ChannelInboundHandlerAdapter(適配器設計模式)
常用的: SimpleChannelInboundHandler
ChannelOutboundHandler(出站): 處理輸出數據
適配器: ChannelOutboundHandlerAdapter
每一個Handler都一定會處理出站或者入站(可能兩者都處理數據),例如對於入站的Handler可能會繼承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,
而SimpleChannelInboundHandler又是繼承於ChannelInboundHandlerAdapter,最大的區別在於SimpleChannelInboundHandler會對沒有外界引用的資源進行一定的清理,
並且入站的消息可以通過泛型來規定。
這里為什么有設配器模式呢?
我們在寫自定義Handel時候,很少會直接實現上面兩個接口,因為接口中有很多默認方法需要實現,所以這里就采用了設配器模式,ChannelInboundHandlerAdapter和
ChannelInboundHandlerAdapter就是設配器模式的產物,讓它去實現上面接口,實現它所有方法。那么你自己寫自定義Handel時,只要繼承它,就無須重寫上面接口的所有方法了。
2、Channel 生命周期(執行順序也是從上倒下)
(1)channelRegistered: channel注冊到一個EventLoop。
(2)channelActive: 變為活躍狀態(連接到了遠程主機),可以接受和發送數據
(3)channelInactive: channel處於非活躍狀態,沒有連接到遠程主機
(4)channelUnregistered: channel已經創建,但是未注冊到一個EventLoop里面,也就是沒有和Selector綁定
3、ChannelHandler 生命周期
handlerAdded: 當 ChannelHandler 添加到 ChannelPipeline 調用
handlerRemoved: 當 ChannelHandler 從 ChannelPipeline 移除時調用
exceptionCaught: 當 ChannelPipeline 執行拋出異常時調用
二、ChannelPipeline
1、概念
先看圖
ChannelPipeline類是ChannelHandler實例對象的鏈表,用於處理或截獲通道的接收和發送數據。它提供了一種高級的截取過濾模式(類似serverlet中的filter功能),讓用
戶可以在ChannelPipeline中完全控制一個事件以及如何處理ChannelHandler與ChannelPipeline的交互。
對於每個新的通道Channel,都會創建一個新的ChannelPipeline,並將器pipeline附加到channel中。
下圖描述ChannelHandler與pipeline中的關系,一個io操作可以由一個ChannelInboundHandler或ChannelOutboundHandle進行處理,並通過調用ChannelInboundHandler
處理入站io或通過ChannelOutboundHandler處理出站IO。
2、常用方法
addFirst(...) //添加ChannelHandler在ChannelPipeline的第一個位置 addBefore(...) //在ChannelPipeline中指定的ChannelHandler名稱之前添加ChannelHandler addAfter(...) //在ChannelPipeline中指定的ChannelHandler名稱之后添加ChannelHandler addLast(...) //在ChannelPipeline的末尾添加ChannelHandler remove(...) //刪除ChannelPipeline中指定的ChannelHandler replace(...) //替換ChannelPipeline中指定的ChannelHandler
ChannelPipeline可以動態添加、刪除、替換其中的ChannelHandler,這樣的機制可以提高靈活性。示例:
1. ChannelPipeline pipeline = ch.pipeline(); 2. FirstHandler firstHandler = new FirstHandler(); 3. pipeline.addLast("handler1", firstHandler); 4. pipeline.addFirst("handler2", new SecondHandler()); 5. pipeline.addLast("handler3", new ThirdHandler()); 6. pipeline.remove("“handler3“"); 7. pipeline.remove(firstHandler); 8. pipeline.replace("handler2", "handler4", new FourthHandler());<span style="font-family:Microsoft YaHei;font-size:14px;">
3、入站出站Handler執行順序
一般的項目中,inboundHandler和outboundHandler有多個,在Pipeline中的執行順序?
重點記住: InboundHandler順序執行,OutboundHandler逆序執行。
問題: 下面的handel的執行順序?
ch.pipeline().addLast(new InboundHandler1()); ch.pipeline().addLast(new OutboundHandler1()); ch.pipeline().addLast(new OutboundHandler2()); ch.pipeline().addLast(new InboundHandler2()); 或者: ch.pipeline().addLast(new OutboundHandler1()); ch.pipeline().addLast(new OutboundHandler2()); ch.pipeline().addLast(new InboundHandler1()); ch.pipeline().addLast(new InboundHandler2());
其實上面的執行順序都是一樣的:
InboundHandler1--> InboundHandler2 -->OutboundHandler2 -->OutboundHandler1
結論
1)InboundHandler順序執行,OutboundHandler逆序執行
2)InboundHandler之間傳遞數據,通過ctx.fireChannelRead(msg)
3)InboundHandler通過ctx.write(msg),則會傳遞到outboundHandler
4) 使用ctx.write(msg)傳遞消息,Inbound需要放在結尾,在Outbound之后,不然outboundhandler會不執行;
但是使用channel.write(msg)、pipline.write(msg)情況會不一致,都會執行,那是因為channel和pipline會貫穿整個流。
5) outBound和Inbound誰先執行,針對客戶端和服務端而言,客戶端是發起請求再接受數據,先outbound再inbound,服務端則相反。
三、ChannelHandlerContext
ChannelPipeline並不是直接管理ChannelHandler,而是通過ChannelHandlerContext來間接管理,這一點通過ChannelPipeline的默認實現DefaultChannelPipeline可以看出來。
DefaultChannelHandlerContext和DefaultChannelPipeline是ChannelHandlerContext和ChannelPipeline的默認實現在DefaultPipeline內部
DefaultChannelHandlerContext組成了一個雙向鏈表。 我們看下DefaultChannelPipeline的構造函數:
/** * 可以看到,DefaultChinnelPipeline 內部使用了兩個特殊的Hander 來表示Handel鏈的頭和尾。 */ public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; }
所以對於DefaultChinnelPipeline它的Handel頭部和尾部的Handel是固定的,我們所添加的Handel是添加在這個頭和尾之前的Handel。(下面這個圖更加清晰)
四、幾者關系
先大致說下什么是Channel
通常來說, 所有的 NIO 的 I/O 操作都是從 Channel 開始的. 一個 channel 類似於一個 stream。在Netty中,Channel是客戶端和服務端建立的一個連接通道。
雖然java Stream 和 NIO Channel都是負責I/O操作,但他們還是有許多區別的:
1)我們可以在同一個 Channel 中執行讀和寫操作, 然而同一個 Stream 僅僅支持讀或寫。
2)Channel 可以異步地讀寫, 而 Stream 是阻塞的同步讀寫。
3)Channel 總是從 Buffer 中讀取數據, 或將數據寫入到 Buffer 中。
幾者的關系圖如下:
總結:
一個Channel包含一個ChannelPipeline,創建Channel時會自動創建一個ChannelPipeline,每個Channel都有一個管理它的pipeline,這關聯是永久性的。
這點從源碼中就可以看出,我之前寫的博客里有說到:【Netty】5 源碼 Bootstrap。每一個ChannelPipeline中可以包含多個ChannelHandler。所有ChannelHandler
都會順序加入到ChannelPipeline中,ChannelHandler實例與ChannelPipeline之間的橋梁是ChannelHandlerContext實例。
五、整個傳播流程
現在將上面的整個傳播流程,通過源碼大致走一遍。
為了搞清楚事件如何在Pipeline里傳播,讓我們從Channel的抽象子類AbstractChannel開始,下面是AbstractChannel#write()方法的實現:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { // ... @Override public Channel write(Object msg) { return pipeline.write(msg); } // ... }
AbstractChannel直接調用了Pipeline的write()方法:
再看DefaultChannelPipeline的write()方法實現:
final class DefaultChannelPipeline implements ChannelPipeline { // ... @Override public ChannelFuture write(Object msg) { return tail.write(msg); } // ... }
因為write是個outbound事件,所以DefaultChannelPipeline直接找到tail部分的context,調用其write()方法:
接着看DefaultChannelHandlerContext的write()方法
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { // ... @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } validatePromise(promise, true); write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(); next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } private DefaultChannelHandlerContext findContextOutbound() { DefaultChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } private void invokeWrite(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } // ... }
context的write()方法沿着context鏈往前找,直至找到一個outbound類型的context為止,然后調用其invokeWrite()方法
invokeWrite()接着調用handler的write()方法
最后看看ChannelOutboundHandlerAdapter的write()方法實現:
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { // ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } // ... }
默認的實現調用了context的write()方法而不做任何處理,這樣write事件就沿着outbound鏈繼續傳播:
可見,Pipeline的事件傳播,是靠Pipeline,Context和Handler共同協作完成的。
參考
Netty4學習筆記-- ChannelPipeline(非常感謝作者分享,讓我對事件傳播有了更清晰的認識)
如果一個人充滿快樂,正面的思想,那么好的人事物就會和他共鳴,而且被他吸引過來。同樣,一個人老帶悲傷,倒霉的事情也會跟過來。
——在自己心情低落的時候,告誡自己不要把負能量帶給別人。(大校18)