Pipeline 設計原理
Channel 與ChannelPipeline:
相信大家都已經知道,在Netty 中每個Channel 都有且僅有一個ChannelPipeline 與之對應,它們的組成關系如下:
通過上圖我們可以看到, 一個Channel 包含了一個ChannelPipeline , 而ChannelPipeline 中又維護了一個由ChannelHandlerContext 組成的雙向鏈表。這個鏈表的頭是HeadContext,鏈表的尾是TailContext,並且每個ChannelHandlerContext 中又關聯着一個ChannelHandler。圖示給了我們一個對ChannelPipeline 的直觀認識,但是實際上Netty 實現的Channel 是否真的是這樣的呢?我們繼續用源碼說話。在前我們已經知道了一個Channel 的初始化的基本過程,下面我們再回顧一下。下面的代碼是AbstractChannel 構造器:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
AbstractChannel 有一個pipeline 字段,在構造器中會初始化它為DefaultChannelPipeline 的實例。這里的代碼就印證了一點:每個Channel 都有一個ChannelPipeline。接着我們跟蹤一下DefaultChannelPipeline 的初始化過程,首先進入到DefaultChannelPipeline 構造器中:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
在DefaultChannelPipeline 構造器中, 首先將與之關聯的Channel 保存到字段channel 中。然后實例化兩個ChannelHandlerContext:一個是HeadContext 實例head,另一個是TailContext 實例tail。接着將head 和tail 互相指向, 構成一個雙向鏈表。
特別注意的是:我們在開始的示意圖中head 和tail 並沒有包含ChannelHandler,這是因為HeadContext 和TailContext繼承於AbstractChannelHandlerContext 的同時也實現了ChannelHandler 接口了,因此它們有Context 和Handler的雙重屬性。
再探ChannelPipeline 的初始化:
前面的學習我們已經對ChannelPipeline 的初始化有了一個大致的了解,不過當時重點沒有關注ChannelPipeline,因此沒有深入地分析它的初始化過程。那么下面我們就來看一下具體的ChannelPipeline 的初始化都做了哪些工作吧。先回顧一下,在實例化一個Channel 時,會伴隨着一個ChannelPipeline 的實例化,並且此Channel 會與這個ChannelPipeline相互關聯,這一點可以通過NioSocketChannel 的父類AbstractChannel 的構造器予以佐證:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
當實例化一個NioSocketChannel 是,其pipeline 字段就是我們新創建的DefaultChannelPipeline 對象。可以看到,在DefaultChannelPipeline 的構造方法中,將傳入的channel 賦值給字段this.channel,接着又實例化了兩個特殊的字段:tail 與head,這兩個字段是一個雙向鏈表的頭和尾。其實在DefaultChannelPipeline 中,維護了一個以AbstractChannelHandlerContext 為節點的雙向鏈表,這個鏈表是Netty 實現Pipeline 機制的關鍵。再回顧一下head和tail 的類層次結構:
從類層次結構圖中可以很清楚地看到,head 實現了ChannelInboundHandler與ChannelOutboundHandler,而tail 實現了ChannelOutboundHandler 接口,並且它們都實現了ChannelHandlerContext 接口, 因此可以說head 和tail 即是一個ChannelHandler,又是一個ChannelHandlerContext。接着看HeadContext與TailContext 構造器中的代碼:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true); this.unsafe = pipeline.channel().unsafe(); this.setAddComplete(); } TailContext(DefaultChannelPipeline pipeline) { super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false); this.setAddComplete(); }
我們可以看到,鏈表中head 是一個ChannelOutboundHandler,而tail 則是一個ChannelInboundHandler。它調用了父類AbstractChannelHandlerContext 的構造器,並傳入參數inbound = false,outbound = true。而TailContext 的構造器與HeadContext 的相反,它調用了父類AbstractChannelHandlerContext 的構造器,並傳入參數inbound = true,outbound = false。即header 是一個OutBoundHandler,而tail 是一個InBoundHandler。
ChannelInitializer 的添加:
前面我們已經分析過Channel 的組成,其中我們了解到,最開始的時候ChannelPipeline 中含有兩個ChannelHandlerContext(同時也是ChannelHandler),但是這個Pipeline 並不能實現什么特殊的功能,因為我們還沒有給它添加自定義的ChannelHandler。通常來說,我們在初始化Bootstrap,會添加我們自定義的ChannelHandler,就以我們具體的客戶端啟動代碼片段來舉例:
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChatClientHandler(nickName));
}
});
上面代碼的初始化過程,相信大家都不陌生。在調用handler 時,傳入了ChannelInitializer 對象,它提供了一個initChannel()方法給我我們初始化ChannelHandler。最后將這個匿名的Handler保存到AbstractBootstrap中。那么這個初始化過程是怎樣的呢?下面我們來揭開它的神秘面紗。
ChannelInitializer 實現了ChannelHandler,那么它是在什么時候添加到ChannelPipeline 中的呢?通過代碼跟蹤,我們發現它是在Bootstrap 的init()方法中添加到ChannelPipeline 中的,其代碼如下(以客戶端為例):
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(new ChannelHandler[]{this.config.handler()}); 。。。。。。 } //AbstractBootstrapConfig public final ChannelHandler handler() { return this.bootstrap.handler(); } //AbstractBootstrap final ChannelHandler handler() { return this.handler; }
從上面的代碼可見,將handler()返回的ChannelHandler 添加到Pipeline 中,而handler()返回的其實就是我們在初始化Bootstrap 時通過handler()方法設置的ChannelInitializer 實例,因此這里就是將ChannelInitializer 插入到了Pipeline的末端。此時Pipeline 的結構如下圖所示:
這時候,有小伙伴可能就有疑惑了,我明明插入的是一個ChannelInitializer 實例,為什么在ChannelPipeline 中的雙向鏈表中的元素卻是一個ChannelHandlerContext 呢?我們繼續去源碼中尋找答案。
剛才,我們提到,在Bootstrap 的init()方法中會調用p.addLast()方法,將ChannelInitializer 插入到鏈表的末端:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized(this) { checkMultiplicity(handler); newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
}
addLast()有很多重載的方法,我們只需關注這個比較重要的方法就行。上面的addLast()方法中,首先檢查ChannelHandler 的名字是否是重復,如果不重復,則調用newContex()方法為這個Handler 創建一個對應的DefaultChannelHandlerContext 實例,並與之關聯起來(Context 中有一個handler 屬性保存着對應的Handler 實例)。為了添加一個handler 到pipeline 中,必須把此handler 包裝成ChannelHandlerContext。因此在上面的代碼中我們可以看到新實例化了一個newCtx 對象,並將handler 作為參數傳遞到構造方法中。那么我們來看一下實例化的DefaultChannelHandlerContext 到底有什么玄機吧。首先看它的構造器:
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } else { this.handler = handler; } }
在DefaultChannelHandlerContext 的構造器中,調用了兩個很有意思的方法:isInbound()與isOutbound(),這兩個方法是做什么的呢?從源碼中可以看到,當一個handler 實現了ChannelInboundHandler 接口,則isInbound 返回true;類似地,當一個handler 實現了ChannelOutboundHandler 接口,則isOutbound 就返回true。而這兩個boolean 變量會傳遞到父類AbstractChannelHandlerContext 中,並初始化父類的兩個字段:inbound 與outbound。那么這里的ChannelInitializer 所對應的DefaultChannelHandlerContext 的inbound 與outbound 字段分別是什么呢? 那就看一下ChannelInitializer 到底實現了哪個接口不就行了?如下是ChannelInitializer 的類層次結構圖:
從類圖中可以清楚地看到,ChannelInitializer 僅僅實現了ChannelInboundHandler 接口,因此這里實例化的DefaultChannelHandlerContext 的inbound = true,outbound = false。兜了一圈,不就是inbound 和outbound 兩個字段嘛,為什么需要這么大費周折地分析一番?其實這兩個字段關系到pipeline 的事件的流向與分類,因此是十分關鍵的,不過我在這里先賣個關子, 后面我們再來詳細分析這兩個字段所起的作用。至此, 我們暫時先記住一個結論:ChannelInitializer 所對應的DefaultChannelHandlerContext 的inbound =true,outbound = false。當創建好Context 之后,就將這個Context 插入到Pipeline 的雙向鏈表中
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = this.tail.prev; newCtx.prev = prev; newCtx.next = this.tail; prev.next = newCtx; this.tail.prev = newCtx; }
添加完ChannelInitializer的Pipeline現在是長這樣的:
自定義ChannelHandler 的添加過程:
前面我們已經分析了ChannelInitializer 是如何插入到Pipeline 中的,接下來就來探討ChannelInitializer 在哪里被調用,ChannelInitializer 的作用以及我們自定義的ChannelHandler 是如何插入到Pipeline 中的。先簡單復習一下Channel 的注冊過程:
- 首先在AbstractBootstrap 的initAndRegister()中,通過group().register(channel),調用MultithreadEventLoopGroup 的register()方法。
- 在MultithreadEventLoopGroup 的register()中調用next()獲取一個可用的SingleThreadEventLoop,然后調用它的register()方法。
- 在SingleThreadEventLoop 的register()方法中,通過channel.unsafe().register(this, promise)方法獲取channel的unsafe()底層IO 操作對象,然后調用它的register()。
- 在AbstractUnsafe 的register()方法中,調用register0()方法注冊Channel 對象。
- 在AbstractUnsafe 的register0()方法中,調用AbstractNioChannel 的doRegister()方法。
- AbstractNioChannel 的doRegister()方法調用javaChannel().register(eventLoop().selector, 0, this)將Channel對應的Java NIO 的SockerChannel 對象注冊到一個eventLoop 的Selector 中,並且將當前Channel 作為attachment。
而我們自定義ChannelHandler 的添加過程,發生在AbstractUnsafe 的register0()方法中,在這個方法中調用了pipeline.fireChannelRegistered()方法,其代碼實現如下:
private void register0(ChannelPromise promise) { boolean firstRegistration = this.neverRegistered; AbstractChannel.this.doRegister(); this.neverRegistered = false; AbstractChannel.this.registered = true; AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded(); this.safeSetSuccess(promise); AbstractChannel.this.pipeline.fireChannelRegistered(); } public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(this.head); return this; }
再看AbstractChannelHandlerContext 的invokeChannelRegistered()方法:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
很顯然,這個代碼會從head 開始遍歷Pipeline 的雙向鏈表,然后 findContextInbound() 找到第一個屬性inbound 為true 的ChannelHandlerContext 實例。看代碼:
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(this.findContextInbound()); return this; }
想起來了沒?我們在前面分析ChannelInitializer 時,花了大量的篇幅來分析了inbound和outbound 屬性,現在這里就用上了。回想一下,ChannelInitializer 實現了ChannelInboudHandler,因此它所對應的ChannelHandlerContext 的inbound 屬性就是true,因此這里返回就是ChannelInitializer 實例所對應的ChannelHandlerContext 對象,如下圖所示:
當獲取到inbound 的Context 后,就調用它的invokeChannelRegistered()方法:
private void invokeChannelRegistered() { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRegistered(this); } catch (Throwable var2) { this.notifyHandlerException(var2); } } else { this.fireChannelRegistered(); } }
我們已經知道,每個ChannelHandler 都和一個ChannelHandlerContext 關聯,我們可以通過ChannelHandlerContext獲取到對應的ChannelHandler。因此很明顯,這里handler()返回的對象其實就是一開始我們實例化的ChannelInitializer 對象,並接着調用了ChannelInitializer 的channelRegistered()方法。看到這里, 應該會覺得有點眼熟了。ChannelInitializer 的channelRegistered()這個方法我們在一開始的時候已經接觸到了,但是我們並沒有深入地分析這個方法的調用過程。下面我們來看這個方法中到底有什么玄機,繼續看代碼:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
removeState(ctx);
} else {
ctx.fireChannelRegistered();
}
}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
}
initChannel((C) ctx.channel())這個方法我們也很熟悉,它就是我們在初始化Bootstrap 時,調用handler 方法傳入的匿名內部類所實現的方法:
protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("handler", new MyClient());
}
因此,當調用這個方法之后, 我們自定義的ChannelHandler 就插入到了Pipeline,此時Pipeline 的狀態如下圖所示:
當添加完成自定義的ChannelHandler 后,在finally 代碼塊會刪除自定義的ChannelInitializer,也就是remove(ctx)最終調用ctx.pipeline().remove(this),因此最后的Pipeline 的狀態如下:
至此,自定義ChannelHandler 的添加過程也分析得差不多了。
ChannelHandler 默認命名規則
不知道大家注意到沒有,pipeline.addXXX 都有一個重載的方法,例如addLast()它有一個重載的版本是:ChannelPipeline addLast(String name, ChannelHandler handler);第一個參數指定添加的handler 的名字(更准確地說是ChannelHandlerContext 的名字,說成handler 的名字更便於理解)。那么handler 的名字有什么用呢?如果我們不設置name,那么handler 默認的名字是怎樣呢?帶着這些疑問,我們依舊還是去源碼中找到答案。還是以addLast()方法為例:
public final ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); }
這個方法會調用重載的addLast()方法:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx);return this; }
第一個參數被設置為null,我們不用關心它。第二參數就是這個handler 的名字。看代碼可知,在添加一個handler之前,需要調用checkMultiplicity()方法來確定新添加的handler 名字是否與已添加的handler 名字重復。
如果我們調用的是如下的addLast()方法:ChannelPipeline addLast(ChannelHandler... handlers);那么Netty 就會調用generateName()方法為新添加的handler 自動生成一個默認的名字:
private String filterName(String name, ChannelHandler handler) { if (name == null) { return generateName(handler); } checkDuplicateName(name); return name; }
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
而generateName()方法會接着調用generateName0()方法來實際生成一個新的handler 名字:
private static String generateName0(Class<?> handlerType) { return StringUtil.simpleClassName(handlerType) + "#0"; }
默認命名的規則很簡單,就是用反射獲取handler 的simpleName 加上"#0",因此我們自定義ChatClientHandler 的名字就是"ChatClientHandler#0"。
Pipeline 的事件傳播機制
前面章節中,我們已經知道AbstractChannelHandlerContext 中有inbound 和outbound 兩個boolean 變量,分別用於標識Context 所對應的handler 的類型,即:
- inbound 為true 是,表示其對應的ChannelHandler 是ChannelInboundHandler 的子類。
- outbound 為true 時,表示對應的ChannelHandler 是ChannelOutboundHandler 的子類。
這里大家肯定還有很多疑惑,不知道這兩個字段到底有什么作用? 這還要從ChannelPipeline 的事件傳播類型說起。Netty 中的傳播事件可以分為兩種:Inbound 事件和Outbound 事件。如下是從Netty 官網針對這兩個事件的說明:
從上圖可以看出,inbound 事件和outbound 事件的流向是不一樣的,inbound 事件的流行是從下至上,而outbound剛好相反,是從上到下。並且inbound 的傳遞方式是通過調用相應的ChannelHandlerContext.fireIN_EVT()方法,而outbound 方法的的傳遞方式是通過調用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext的fireChannelRegistered()調用會發送一個ChannelRegistered 的inbound 給下一個ChannelHandlerContext,而ChannelHandlerContext 的bind()方法調用時會發送一個bind 的outbound 事件給下一個ChannelHandlerContext。
Inbound 事件傳播方法有:
public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext var1) throws Exception; void channelUnregistered(ChannelHandlerContext var1) throws Exception; void channelActive(ChannelHandlerContext var1) throws Exception; void channelInactive(ChannelHandlerContext var1) throws Exception; void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; void channelReadComplete(ChannelHandlerContext var1) throws Exception; void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; }
Outbound 事件傳播方法有:
public interface ChannelOutboundHandler extends ChannelHandler { void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception; void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception; void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; void read(ChannelHandlerContext var1) throws Exception; void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception; void flush(ChannelHandlerContext var1) throws Exception; }
大家應該發現了規律:inbound 類似於是事件回調(響應請求的事件),而outbound 類似於主動觸發(發起請求的事件)。注意,如果我們捕獲了一個事件,並且想讓這個事件繼續傳遞下去,那么需要調用Context 對應的傳播方法 fireXXX,例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("連接成功"); ctx.fireChannelActive(); } }
Outbound 事件傳播方式:
Outbound 事件都是請求事件(request event),即請求某件事情的發生,然后通過Outbound 事件進行通知。Outbound 事件的傳播方向是tail -> customContext -> head。我們接下來以connect 事件為例,分析一下Outbound 事件的傳播機制。首先,當用戶調用了Bootstrap 的connect()方法時,就會觸發一個Connect 請求事件,我們就發現AbstractChannel 的connect()其實由調用了DefaultChannelPipeline 的connect()方法:
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
而pipeline.connect()方法的實現如下:
public final ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); }
可以看到,當outbound 事件(這里是connect 事件)傳遞到Pipeline 后,它其實是以tail 為起點開始傳播的。而tail.connect()其實調用的是AbstractChannelHandlerContext 的connect()方法:
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise);return promise; }
findContextOutbound()方法顧名思義,它的作用是以當前Context 為起點,向Pipeline 中的Context 雙向鏈表的前端尋找第一個outbound 屬性為true 的Context(即關聯ChannelOutboundHandler 的Context),然后返回。findContextOutbound()方法代碼實現如下:
private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while ((ctx.executionMask & mask) == 0); return ctx; }
當我們找到了一個outbound 的Context 后,就調用它的invokeConnect()方法,這個方法中會調用Context 其關聯的ChannelHandler 的connect()方法
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } }
如果用戶沒有重寫ChannelHandler 的connect()方法,那么會調用ChannelOutboundHandlerAdapter 的connect()實現:
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); }
我們看到,ChannelOutboundHandlerAdapter 的connect()僅僅調用了ctx.connect(),而這個調用又回到了:Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect這樣的循環中,直到connect 事件傳遞到DefaultChannelPipeline 的雙向鏈表的頭節點,即head 中。為什么會傳遞到head 中呢?回想一下,head 實現了ChannelOutboundHandler,因此它的outbound 屬性是true。因為head 本身既是一個ChannelHandlerContext,又實現了ChannelOutboundHandler 接口,因此當connect()消息傳遞到head 后,會將消息轉遞到對應的ChannelHandler 中處理,而head 的handler()方法返回的就是head 本身:
public ChannelHandler handler() { return this; }
因此最終connect()事件是在head 中被處理。head 的connect()事件處理邏輯如下:
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { unsafe.connect(remoteAddress, localAddress, promise); }
到這里, 整個connect()請求事件就結束了。下圖中描述了整個connect()請求事件的處理過程:
我們僅僅以connect()請求事件為例,分析了outbound 事件的傳播過程,但是其實所有的outbound 的事件傳播都遵循着一樣的傳播規律,小伙伴們可以試着分析一下其他的outbound 事件,體會一下它們的傳播過程。
Inbound 事件傳播方式:
Inbound 事件和Outbound 事件的處理過程是類似的,只是傳播方向不同。Inbound 事件是一個通知事件,即某件事已經發生了,然后通過Inbound 事件進行通知。Inbound 通常發生在Channel的狀態的改變或IO 事件就緒。Inbound 的特點是它傳播方向是head -> customContext -> tail。上面我們分析了connect()這個Outbound 事件,那么接着分析connect()事件后會發生什么Inbound 事件,並最終找到Outbound 和Inbound 事件之間的聯系。當connect()這個Outbound 傳播到unsafe 后,其實是在AbstractNioUnsafe的connect()方法中進行處理的:
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { } }
在AbstractNioUnsafe 的connect()方法中,首先調用doConnect()方法進行實際上的Socket 連接,當連接上后會調用fulfillConnectPromise()方法:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (!wasActive && active) { pipeline().fireChannelActive(); } }
我們看到,在fulfillConnectPromise()中,會通過調用pipeline().fireChannelActive()方法將通道激活的消息(即Socket 連接成功)發送出去。而這里,當調用pipeline.fireXXX 后,就是Inbound 事件的起點。因此當調用pipeline().fireChannelActive()后,就產生了一個ChannelActive Inbound 事件,我們就從這里開始看看這個Inbound事件是怎么傳播的?
public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; }
果然, 在fireChannelActive()方法中,調用的是head.invokeChannelActive(),因此可以證明Inbound 事件在Pipeline中傳輸的起點是head。那么,在head.invokeChannelActive()中又做了什么呢?
static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } }
接下去的調用流程是:
private void invokeChannelActive() { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelActive(this); } catch (Throwable var2) { this.notifyHandlerException(var2); } } else { this.fireChannelActive(); } } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); this.readIfIsAutoRead(); } public ChannelHandlerContext fireChannelActive() { AbstractChannelHandlerContext next = this.findContextInbound(); invokeChannelActive(next); return this; }
上面的代碼應該很熟悉了。回想一下在Outbound 事件(例如connect()事件)的傳輸過程中時,我們也有類似的操作:
- 首先調用findContextInbound(),從Pipeline 的雙向鏈表中中找到第一個屬性inbound 為true 的Context,然后將其返回。
- 調用Context 的invokeChannelActive()方法.
invokeChannelActive()方法源碼如下:
private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } }
這個方法和Outbound 的對應方法(如:invokeConnect()方法)如出一轍。與Outbound 一樣,如果用戶沒有重寫channelActive() 方法,那就會調用ChannelInboundHandlerAdapter 的channelActive()方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); }
同樣地, 在ChannelInboundHandlerAdapter 的channelActive()中,僅僅調用了ctx.fireChannelActive()方法,因此就會進入Context.fireChannelActive() -> Connect.findContextInbound() -> nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive()這樣的循環中。同理,tail 本身既實現了ChannelInboundHandler 接口,又實現了ChannelHandlerContext 接口,因此當channelActive()消息傳遞到tail 后,會將消息轉遞到對應的ChannelHandler 中處理,而tail 的handler()返回的就是tail 本身:
public ChannelHandler handler() { return this; }
因此channelActive Inbound 事件最終是在tail 中處理的,我們看一下它的處理方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
TailContext 的channelActive()方法是空的。如果大家自行查看TailContext 的Inbound 處理方法時就會發現,它們的實現都是空的。可見,如果是Inbound,當用戶沒有實現自定義的處理器時,那么默認是不處理的。下圖描述了Inbound事件的傳輸過程:
Pipeline 事件傳播小結:
Outbound 事件總結:
Outbound 事件是請求事件(由connect()發起一個請求,並最終由unsafe 處理這個請求)。
Outbound 事件的發起者是Channel。
Outbound 事件的處理者是unsafe。
Outbound 事件在Pipeline 中的傳輸方向是tail -> head。
在ChannelHandler 中處理事件時,如果這個Handler 不是最后一個Handler,則需要調用ctx 的方法(如:ctx.connect()方法)將此事件繼續傳播下去。如果不這樣做,那么此事件的傳播會提前終止。
Outbound 事件流:Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT()-> nextHandler.OUT_EVT() -> nextContext.OUT_EVT()
Inbound 事件總結:
Inbound 事件是通知事件,當某件事情已經就緒后,通知上層。
Inbound 事件發起者是unsafe。
Inbound 事件的處理者是Channel,如果用戶沒有實現自定義的處理方法,那么Inbound 事件默認的處理者是TailContext,並且其處理方法是空實現。Inbound 事件在Pipeline 中傳輸方向是head -> tail。
在ChannelHandler 中處理事件時,如果這個Handler 不是最后一個Handler,則需要調用ctx.fireIN_EVT()事件(如:ctx.fireChannelActive()方法)將此事件繼續傳播下去。如果不這樣做,那么此事件的傳播會提前終止。
Outbound 事件流:Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() ->nextHandler.IN_EVT() -> nextContext.fireIN_EVT().
outbound 和inbound 事件設計上十分相似,並且Context 與Handler 直接的調用關系也容易混淆,因此我們在閱讀這里的源碼時,需要特別的注意。
Handler 的各種姿勢:
ChannelHandlerContext
每個ChannelHandler 被添加到ChannelPipeline 后,都會創建一個ChannelHandlerContext 並與之創建的ChannelHandler 關聯綁定。ChannelHandlerContext 允許ChannelHandler 與其他的ChannelHandler 實現進行交互。ChannelHandlerContext 不會改變添加到其中的ChannelHandler,因此它是安全的。下圖描述了ChannelHandlerContext、ChannelHandler、ChannelPipeline 的關系:
Channel 的生命周期:
Netty 有一個簡單但強大的狀態模型,並完美映射到ChannelInboundHandler 的各個方法。下面是Channel 生命周期中四個不同的狀態:
- channelUnregistered() Channel已創建,還未注冊到一個EventLoop上
- channelRegistered() Channel已經注冊到一個EventLoop上
- channelActive() Channel是活躍狀態(連接到某個遠端),可以收發數據
- channelInactive() Channel未連接到遠端
一個Channel 正常的生命周期如下圖所示。隨着狀態發生變化相應的事件產生。這些事件被轉發到ChannelPipeline中的ChannelHandler 來觸發相應的操作。