處理業務:事件是如何在 pipeline 中傳播的


處理業務:事件是如何在 pipeline 中傳播的

Netty 系列目錄(https://www.cnblogs.com/binarylei/p/10117436.html)

在上一節 接收數據:自適應緩沖區和連接讀是為了解決什么問題 中,我們知道 NioEventLoop 不斷的輪詢,接收 OP_READ 事件;然后將讀取到的數據通過 pipeline.fireChannelRead(byteBuf) 傳播出去。所以業務的處理實際上是在 pipeline 的 channelRead() 上處理的,本小節也主要關注事件是在 pipeline 中傳播行為。

事件在 pipeline 中的傳播,需要關注事件的傳播行為和執行線程:

  1. 傳播行為:事件執行到某個 Handler 后,如果不手動觸發 ctx.fireChannelRead,則傳播中斷。
  2. 執行線程:業務線程默認是在 NioEventLoop 中執行。如果業務處理有阻塞,需要考慮另起線程執行。

1. 主線分析

1.1 主線

主線其實在上一節已經講了,在讀取數據時會觸發 pipeline.fireChannelRead(byteBuf) 把讀取到的數據傳播出去。我們現在重點分析事件是如何在 pipeline 中傳播的。

Handler 執行資格:

  • 實現 ChannelInboundHandler
  • 實現方法 channelRead 不能加注解 @Skip
  • Handler 執行鏈是可以被中斷的。如果不主動觸發 ctx.fireChannelRead 方法,則不會再繼續往下執行。

1.2 知識點

(1)處理業務本質

數據在 pipeline 中所有的 ChannelInboundHandler 的 channelRead() 執行。並且執行可以被中斷。

(2)業務處理線程

默認處理線程是 Channel 綁定的 NioEventLoop 線程,也可以設置其他線程:

pipeline.addLast(new UnorderedThreadPoolEventExecutor(10), serverHandler);

2. 源碼分析

2.1 fireChannelRead 在 pipeline 中的傳播

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

說明: 可以看到 fireChannelRead 是從 head -> tail 一直身后傳播。

2.1 傳播行為

pipeline 中一旦被中間某個 Handler 執行,則傳播行為中斷。如果需要繼續執行下去,則需要主動調用 ctx.fireChannelRead。

// AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

說明: 可以看到要么執行 channelRead 方法,要么執行 fireChannelRead 方法直到找到一個對應的 Handler 為止。如果不主要調用 ctx.fireChannelRead,則傳播行為會中斷。

可能會奇怪,通過 findContextInbound() 不是找到對應的 Handler 了,為什么還需要通過 invokeHandler() 再判斷一次?其實,findContextInbound 方法是查找有 channelRead 的 Handler,而 invokeHandler 方法則是判斷這個 Handler 是否被刪除了。

2.3 執行線程

每個 Handler 都是在自己對應的 executor 中執行,默認為 NioEventLoop 線程。當然也可以自己指定其它線程。

// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

說明: 我們需要重點關注業務的執行線程,因為如果業務占用時間過長,會影響 Netty IO 的吞吐率。

2.4 ChannelPipeline vs ChannelHandlerContext

Netty 將每個 Handler 都包裝成 ChannelHandlerContext 添加到 ChannelPipeline 中。雖然說 ChannelPipeline 也就調用 ChannelHandlerContext 中的方法,但 pipeline 會從 head 或 tail 開始遍歷,而 ctx 只會從當前 hander 開始遍歷。

我們還是拿以下四個 read 對比:channel.read()、pipeline.read()、ctx.read()、unsafe.read()

  • channel.read():直接調用 pipeline.read()。
  • pipeline.read():調用 tail.read(),從 head 或 tail 經歷全部的 Handler。實際上,最后的 head.read() 調用 unsafe.beginRead(),這個方法會注冊 OP_ACCEPT 或 OP_READ 事件從而激活 Channel。
  • ctx.read():從當前 ctx 開始之后的全部的 Handler。如果發送數據,需要使用 ctx.write 而不是 ctx.channel().write。
  • unsafe.read():最底層的 API。和 unsafe.beginRead() 不同,unsafe#read 會真正從 socket revbuf 讀取數據。

2.5 HeadContext vs TailContext

  • HeadContext:
    • inbound 由事件觸發,從 head -> tail,所以需要調用 ctx.firexxx 將事件傳播下去。
    • outbound 由用戶觸發,從 tail -> head,通常會直接調用到 head。如果中間某個 Handler 重新自定義實現該方法,則不會再向下調,見 AbstractChannelHandlerContext#invokeWrite0。
  • TailContext:基本上沒什么功能,一直向下傳播事件即可。
HeadContext 功能
方法 執行順序 功能說明
bind outbound unsafe.bind
connect outbound unsafe.connect
disconnect outbound unsafe.disconnect
close outbound unsafe.close
deregister outbound unsafe.deregister
read outbound unsafe.beginRead:注冊感興趣事件
write outbound unsafe.write
flush outbound unsafe.flush
exceptionCaught inbound ctx.fireExceptionCaught
channelRegistered inbound callHandlerAddedForAllHandlers
ctx.fireChannelRegistered
channelUnregistered inbound ctx.fireChannelUnregistered
destroy
channelActive inbound ctx.fireChannelActive
readIfIsAutoRead:調用unsafe.beginRead
channelInactive inbound ctx.fireChannelInactive()
channelRead inbound ctx.fireChannelRead
channelReadComplete inbound ctx.fireChannelReadComplete
readIfIsAutoRead
userEventTriggered inbound ctx.fireUserEventTriggered
channelWritabilityChanged inbound ctx.fireChannelWritabilityChanged
TailContext 功能
方法 執行順序 功能說明
channelRead inbound ReferenceCountUtil.release(msg)

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM