Netty 源碼解析(四): Netty 的 ChannelPipeline


 

 

今天是猿燈塔“365篇原創計划”第四篇。

接下來的時間燈塔君持續更新Netty系列一共九篇

Netty 源碼解析(一): 開始

Netty 源碼解析(二): Netty 的 Channel

Netty 源碼解析(三): Netty 的 Future 和 Promise

當前:Netty 源碼解析(四): Netty 的 ChannelPipeline

Netty 源碼解析(五): Netty 的線程池分析

Netty 源碼解析(六): Channel 的 register 操作

Netty 源碼解析(七): NioEventLoop 工作流程

Netty 源碼解析(八): 回到 Channel 的 register 操作

Netty 源碼解析(九): connect 過程和 bind 過程分析

 

今天呢!燈塔君跟大家講:

Netty 的 ChannelPipeline

ChannelPipeline和Inbound、Outbound

我想很多讀者應該或多或少都有 Netty 中 pipeline 的概念。前面我們說了,使用 Netty 的時候,我們通常就只要寫一些自定義的 handler 就可以了,我們定義的這些 handler 會組成一個 pipeline,用於處理 IO 事件,這個和我們平時接觸的 Filter 或 Interceptor 表達的差不多是一個意思。

每個 Channel 內部都有一個 pipeline,pipeline 由多個 handler 組成,handler 之間的順序是很重要的,因為 IO 事件將按照順序順次經過 pipeline 上的 handler,這樣每個 handler 可以專注於做一點點小事,由多個 handler 組合來完成一些復雜的邏輯。

 

從圖中,我們知道這是一個雙向鏈表。

首先,我們看兩個重要的概念:InboundOutbound。在 Netty 中,IO 事件被分為 Inbound 事件和 Outbound 事件。

Outboundout 指的是 出去,有哪些 IO 事件屬於此類呢?比如 connect、write、flush 這些 IO 操作是往外部方向進行的,它們就屬於 Outbound 事件。

其他的,諸如 accept、read 這種就屬於 Inbound 事件。

比如客戶端在發起請求的時候,需要 1️⃣connect 到服務器,然后 2️⃣write 數據傳到服務器,再然后 3️⃣read 服務器返回的數據,前面的 connect 和 write 就是 out 事件,后面的 read 就是 in 事件。

比如很多初學者看不懂下面的這段代碼,這段代碼用於服務端的 childHandler 中:

 
pipeline.addLast(new StringDecoder());
 
pipeline.addLast(new StringEncoder());
 
pipeline.addLast(new BizHandler());
 
 

初學者肯定都納悶,以為這個順序寫錯了,應該是先 decode 客戶端過來的數據,然后用 BizHandler 處理業務邏輯,最后再 encode 數據然后返回給客戶端,所以添加的順序應該是 1 -> 3 -> 2 才對。

其實這里的三個 handler 是分組的,分為 Inbound(1 和 3) 和 Outbound(2):

 
1. pipeline.addLast(new StringDecoder());
2. pipeline.addLast(new StringEncoder());
3. pipeline.addLast(new BizHandler());復制代碼
  • 客戶端連接進來的時候,讀取(read)客戶端請求數據的操作是 Inbound 的,e 操作是 Outbound 的,此時使用的是 2。

  • 處理完數據后,返回給客戶端數據的 write 操作是 Outbound 的,此時使用的是 2。

所以雖然添加順序有點怪,但是執行順序其實是按照 1 -> 3 -> 2 進行的。

如果我們在上面的基礎上,加上下面的第四行,這是一個 OutboundHandler:

 
4. pipeline.addLast(new OutboundHandlerA());
 

那么執行順序是不是就是 1 -> 3 -> 2 -> 4 呢?答案是:不是的。

對於 Inbound 操作,按照添加順序執行每個 Inbound 類型的 handler;而對於 Outbound 操作,是反着來的,從后往前,順次執行 Outbound 類型的 handler。

所以,上面的順序應該是先 1 后 3,它們是 Inbound 的,然后是 4,最后才是 2,它們兩個是 Outbound 的。說實話,這種組織方式對新手應該很是頭疼。

那我們在開發的時候怎么寫呢?其實也很簡單,從最外層開始寫,一步步寫到業務處理層,把 Inbound 和 Outbound 混寫在一起。比如 encode 和 decode 是屬於最外層的處理邏輯,先寫它們。假設 decode 以后是字符串,那再進來一層應該可以寫進來和出去的日志。再進來一層可以寫 字符串 <=> 對象 的相互轉換。然后就應該寫業務層了。

到這里,我想大家應該都知道 Inbound 和 Outbound 了吧?下面我們來介紹它們的接口使用。

定義處理 Inbound 事件的 handler 需要實現 ChannelInboundHandler,定義處理 Outbound 事件的 handler 需要實現 ChannelOutboundHandler。最下面的三個類,是 Netty 提供的適配器,特別的,如果我們希望定義一個 handler 能同時處理 Inbound 和 Outbound 事件,可以通過繼承中間的 ChannelDuplexHandler 的方式,比如 LoggingHandler 這種既可以用來處理 Inbound 也可以用來處理 Outbound 事件的 handler。

有了 Inbound 和 Outbound 的概念以后,我們來開始介紹 Pipeline 的源碼。

我們說過,一個 Channel 關聯一個 pipeline,NioSocketChannel 和 NioServerSocketChannel 在執行構造方法的時候,都會走到它們的父類 AbstractChannel 的構造方法中:

 
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 給每個 channel 分配一個唯一 id
id = newId();
// 每個 channel 內部需要一個 Unsafe 的實例
unsafe = newUnsafe();
// 每個 channel 內部都會創建一個 pipeline
pipeline = newChannelPipeline();
}
 

上面的三行代碼中,id 比較不重要,Netty 中的 Unsafe 實例其實挺重要的,這里簡單介紹一下。

在 JDK 的源碼中,sun.misc.Unsafe 類提供了一些底層操作的能力,它設計出來是給 JDK 中的源碼使用的,比如 AQS、ConcurrentHashMap 等,我們在之前的並發包的源碼分析中也看到了很多它們使用 Unsafe 的場景,這個 Unsafe 類不是給我們的代碼使用的,是給 JDK 源碼使用的(需要的話,我們也是可以獲取它的實例的)。

Unsafe 類的構造方法是 private 的,但是它提供了 getUnsafe() 這個靜態方法: 

Unsafe unsafe = Unsafe.getUnsafe();

大家可以試一下,上面這行代碼編譯沒有問題,但是執行的時候會拋 java.lang.SecurityException 異常,因為它就不是給我們的代碼用的。

但是如果你就是想獲取 Unsafe 的實例,可以通過下面這個代碼獲取到:

 
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);

Netty 中的 Unsafe 也是同樣的意思,它封裝了 Netty 中會使用到的 JDK 提供的 NIO 接口,比如將 channel 注冊到 selector 上,比如 bind 操作,比如 connect 操作等,這些操作都是稍微偏底層一些。Netty 同樣也是不希望我們的業務代碼使用 Unsafe 的實例,它是提供給 Netty 中的源碼使用的。

不過,對於我們源碼分析來說,我們還是會有很多時候需要分析 Unsafe 中的源碼的

關於 Unsafe,我們后面用到了再說,這里只要知道,它封裝了大部分需要訪問 JDK 的 NIO 接口的操作就好了。這里我們繼續將焦點放在實例化 pipeline 上:

 
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
 

這里開始調用 DefaultChannelPipeline 的構造方法,並把當前 channel 的引用傳入:

 
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
 

這里實例化了 tail 和 head 這兩個 handler。tail 實現了 ChannelInboundHandler 接口,

而 head 實現了 ChannelOutboundHandler 和 ChannelInboundHandler 兩個接口,

並且最后兩行代碼將 tail 和 head 連接起來:復制代碼

 

注意,在不同的版本中,源碼也略有差異,head 不一定是 in + out,大家知道這點就好了。

還有,從上面的 head 和 tail 我們也可以看到,其實 pipeline 中的每個元素是 ChannelHandlerContext 的實例,而不是 ChannelHandler 的實例,context 包裝了一下 handler,但是,后面我們都會用 handler 來描述一個 pipeline 上的節點,而不是使用 context,希望讀者知道這一點。

這里只是構造了 pipeline,並且添加了兩個固定的 handler 到其中(head + tail),還不涉及到自定義的 handler 代碼執行。我們回過頭來看下面這段代碼:

我們說過 childHandler 中指定的 handler 不是給 NioServerSocketChannel 使用的,是給 NioSocketChannel 使用的,所以這里我們不看它。

這里調用 handler(…) 方法指定了一個 LoggingHandler 的實例,然后我們再進去下面的 bind(…) 方法中看看這個 LoggingHandler 實例是怎么進入到我們之前構造的 pipeline 內的。

順着 bind() 一直往前走,bind() -> doBind() -> initAndRegister():

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 構造 channel 實例,同時會構造 pipeline 實例,
// 現在 pipeline 中有 head 和 tail 兩個 handler 了
channel = channelFactory.newChannel();
// 2. 看這里
init(channel);
} catch (Throwable t) {
......
}
 

上面的兩行代碼,第一行實現了構造 channel 和 channel 內部的 pipeline,我們來看第二行 init 代碼:

// ServerBootstrap:

@Override
void init(Channel channel) throws Exception {
......
// 拿到剛剛創建的 channel 內部的 pipeline 實例
ChannelPipeline p = channel.pipeline();
...
// 開始往 pipeline 中添加一個 handler,這個 handler 是 ChannelInitializer 的實例
p.addLast(new ChannelInitializer<Channel>() {
// 我們以后會看到,下面這個 initChannel 方法何時會被調用
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 這個方法返回我們最開始指定的 LoggingHandler 實例
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 LoggingHandler
pipeline.addLast(handler);
}
// 先不用管這里的 eventLoop
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一個 handler 到 pipeline 中:ServerBootstrapAcceptor
// 從名字可以看到,這個 handler 的目的是用於接收客戶端請求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

這里涉及到 pipeline 中的輔助類 ChannelInitializer,我們看到,它本身是一個 handler(Inbound 類型),但是它的作用和普通 handler 有點不一樣,它純碎是用來輔助將其他的 handler 加入到 pipeline 中的。

大家可以稍微看一下 ChannelInitializer 的 initChannel 方法,有個簡單的認識就好,此時的 pipeline 應該是這樣的:

ChannelInitializer 的 initChannel(channel) 方法被調用的時候,會往 pipeline 中添加我們最開始指定的 LoggingHandler 和添加一個 ServerBootstrapAcceptor。但是我們現在還不知道這個 initChannel 方法何時會被調用。

上面我們說的是作為服務端的 NioServerSocketChannel 的 pipeline,NioSocketChannel 也是差不多的,我們可以看一下 Bootstrap 類的 init(channel) 方法:

 
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}

 

它和服務端 ServerBootstrap 要添加 ServerBootstrapAcceptor 不一樣,它只需要將 EchoClient 類中的 ChannelInitializer 實例加進來就可以了,它的 ChannelInitializer 中添加了兩個 handler,LoggingHandler 和 EchoClientHandler:

很顯然,我們需要的是像 LoggingHandler 和 EchoClientHandler 這樣的 handler,但是,它們現在還不在 pipeline 中,那么它們什么時候會真正進入到 pipeline 中呢?以后我們再揭曉。

還有,為什么 Server 端我們指定的是一個 handler 實例,而 Client 指定的是一個 ChannelInitializer 實例?其實它們是可以隨意搭配使用的,你甚至可以在 ChannelInitializer 實例中添加 ChannelInitializer 的實例。

非常抱歉,這里又要斷了,下面要先介紹線程池了,大家要記住 pipeline 現在的樣子,head + channelInitializer + tail

本節沒有介紹 handler 的向后傳播,就是一個 handler 處理完了以后,怎么傳遞給下一個 handler 來處理?比如我們熟悉的 JavaEE 中的 Filter 是采用在一個 Filter 實例中調用 chain.doFilter(request, response) 來傳遞給下一個 Filter 這種方式的。

我們用下面這張圖結束本節。下圖展示了傳播的方法,但我其實是更想讓大家看一下,哪些事件是 Inbound 類型的,哪些是 Outbound 類型的:

Outbound 類型的幾個事件大家應該比較好認,注意 bind 也是 Outbound 類型的。

365天干貨不斷微信搜索「猿燈塔」第一時間閱讀,回復【資料】【面試】【簡歷】有我准備的一線大廠面試資料和簡歷模板


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM