Netty 源碼解析(八): 回到 Channel 的 register 操作


 

 

原創申明:本文由公眾號【猿燈塔】原創,轉載請說明出處標注

今天是猿燈塔“365篇原創計划”第八篇。

接下來的時間燈塔君持續更新Netty系列一共九篇
Netty 源碼解析(一): 開始
Netty 源碼解析(二): Netty 的 Channel
Netty 源碼解析(三): Netty 的 Future 和 Promise
Netty 源碼解析(四): Netty 的 ChannelPipeline
Netty 源碼解析(五): Netty 的線程池分析
Netty 源碼解析(六): Channel 的 register 操作
Netty 源碼解析(七): NioEventLoop 工作流程
當前:Netty 源碼解析(八): 回到 Channel 的 register 操作
Netty 源碼解析(九): connect 過程和 bind 過程分析
今天呢!燈塔君跟大家講:

回到Channel 的 register 操作

我們回到前面的 register0(promise) 方法,我們知道,這個 register 任務進入到了 NioEventLoop 的 taskQueue 中,然后會啟動 NioEventLoop 中的線程,該線程會輪詢這個 taskQueue,然后執行這個 register 任務。
注意,此時執行該方法的是 eventLoop 中的線程:
// AbstractChannel
private void register0(ChannelPromise promise) {
     try {
         ...
         boolean firstRegistration = neverRegistered;
         // *** 進行 JDK 底層的操作:Channel 注冊到 Selector 上 ***
         doRegister();

         neverRegistered = false;
         registered = true;
        // 到這里,就算是 registered 了

        // 這一步也很關鍵,因為這涉及到了 ChannelInitializer 的 init(channel)
        // 我們之前說過,init 方法會將 ChannelInitializer 內部添加的 handlers 添加到 pipeline 中
        pipeline.invokeHandlerAddedIfNeeded();

        // 設置當前 promise 的狀態為 success
        //   因為當前 register 方法是在 eventLoop 中的線程中執行的,需要通知提交 register 操作的線程
        safeSetSuccess(promise);

        // 當前的 register 操作已經成功,該事件應該被 pipeline 上
        //   所有關心 register 事件的 handler 感知到,往 pipeline 中扔一個事件
        pipeline.fireChannelRegistered();

        // 這里 active 指的是 channel 已經打開
        if (isActive()) {
            // 如果該 channel 是第一次執行 register,那么 fire ChannelActive 事件
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // 該 channel 之前已經 register 過了,
                // 這里讓該 channel 立馬去監聽通道中的 OP_READ 事件
                beginRead();
            }
        }
    } catch (Throwable t) {
        ...
    }
}
我們先說掉上面的 doRegister() 方法,然后再說 pipeline。
 @Override
 protected void doRegister() throws Exception {
     boolean selected = false;
     for (;;) {
         try {
             // 附 JDK 中 Channel 的 register 方法:
             // public final SelectionKey register(Selector sel, int ops, Object att) {...}
             selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
             return;
        } catch (CancelledKeyException e) {
            ...
        }
    }
}
我們可以看到,這里做了 JDK 底層的 register 操作,將 SocketChannel(或 ServerSocketChannel) 注冊到 Selector 中,並且可以看到,這里的監聽集合設置為了 0,也就是什么都不監聽。
當然,也就意味着,后續一定有某個地方會需要修改這個 selectionKey 的監聽集合,不然啥都干不了
我們重點來說說 pipeline 操作,我們之前在介紹 NioSocketChannel 的 pipeline 的時候介紹到,我們的 pipeline 現在長這個樣子:
現在,我們將看到這里會把 LoggingHandler 和 EchoClientHandler 添加到 pipeline。
我們繼續看代碼,register 成功以后,執行了以下操作:
1pipeline.invokeHandlerAddedIfNeeded();

大家可以跟蹤一下,這一步會執行到 pipeline 中 ChannelInitializer 實例的 handlerAdded 方法,在這里會執行它的 init(context) 方法:
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

然后我們看下 initChannel(ctx),這里終於來了我們之前介紹過的 init(channel) 方法:
 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
     if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
         try {
             // 1. 將把我們自定義的 handlers 添加到 pipeline 中
             initChannel((C) ctx.channel());
         } catch (Throwable cause) {
             ...
         } finally {
             // 2. 將 ChannelInitializer 實例從 pipeline 中刪除
            remove(ctx);
        }
        return true;
    }
    return false;
}

我們前面也說過,ChannelInitializer 的 init(channel) 被執行以后,那么其內部添加的 handlers 會進入到 pipeline 中,然后上面的 finally 塊中將 ChannelInitializer 的實例從 pipeline 中刪除,那么此時 pipeline 就算建立起來了,如下圖:
 
其實這里還有個問題,如果我們在 ChannelInitializer 中添加的是一個 ChannelInitializer 實例呢?大家可以考慮下這個情況。
pipeline 建立了以后,然后我們繼續往下走,會執行到這一句:
pipeline.fireChannelRegistered();
我們只要摸清楚了 fireChannelRegistered() 方法,以后碰到其他像 fireChannelActive()、fireXxx() 等就知道怎么回事了,它們都是類似的。我們來看看這句代碼會發生什么:
// DefaultChannelPipeline
1@Override
2public final ChannelPipeline fireChannelRegistered() {
3    // 注意這里的傳參是 head
4    AbstractChannelHandlerContext.invokeChannelRegistered(head);
5    return this;
6}
也就是說,我們往 pipeline 中扔了一個 channelRegistered 事件,這里的 register 屬於 Inbound 事件,pipeline 接下來要做的就是執行 pipeline 中的 Inbound 類型的 handlers 中的 channelRegistered() 方法。
從上面的代碼,我們可以看出,往 pipeline 中扔出 channelRegistered 事件以后,第一個處理的 handler 是 head。
接下來,我們還是跟着代碼走,此時我們來到了 pipeline 的第一個節點 head 的處理中:
// AbstractChannelHandlerContext
 // next 此時是 head
 2static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {

     EventExecutor executor = next.executor();
     // 執行 head 的 invokeChannelRegistered()
     if (executor.inEventLoop()) {
         next.invokeChannelRegistered();
     } else {
         executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
   }
}
也就是說,這里會先執行 head.invokeChannelRegistered() 方法,而且是放到 NioEventLoop 中的 taskQueue 中執行的:
// AbstractChannelHandlerContext3
 private void invokeChannelRegistered() {
     if (invokeHandler()) {
         try {
             // handler() 方法此時會返回 head
             ((ChannelInboundHandler) handler()).channelRegistered(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
        fireChannelRegistered();
    }
}
我們去看 head 的 channelRegistered 方法:
// HeadContext
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // 1. 這一步是 head 對於 channelRegistered 事件的處理。沒有我們要關心的
    invokeHandlerAddedIfNeeded();
    // 2. 向后傳播 Inbound 事件
    ctx.fireChannelRegistered();
}
然后 head 會執行 fireChannelRegister() 方法:
// AbstractChannelHandlerContext
1@Override
2public ChannelHandlerContext fireChannelRegistered() {
3    // 這里很關鍵
4    // findContextInbound() 方法會沿着 pipeline 找到下一個 Inbound 類型的 handler
5    invokeChannelRegistered(findContextInbound());
6    return this;
7}
注意:pipeline.fireChannelRegistered() 是將 channelRegistered 事件拋到 pipeline 中,pipeline 中的 handlers 准備處理該事件。而 context.fireChannelRegistered() 是一個 handler 處理完了以后,向后傳播給下一個 handler。 它們兩個的方法名字是一樣的,但是來自於不同的類。
findContextInbound() 將找到下一個 Inbound 類型的 handler,然后又是重復上面的幾個方法。
我覺得上面這塊代碼沒必要太糾結,總之就是從 head 中開始,依次往下尋找所有 Inbound handler,執行其 channelRegistered(ctx) 操作。
說了這么多,我們的 register 操作算是真正完成了。
下面,我們回到 initAndRegister 這個方法:
 final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {
         channel = channelFactory.newChannel();
         init(channel);
     } catch (Throwable t) {
         ...
     }
    // 我們上面說完了這行
    ChannelFuture regFuture = config().group().register(channel);
    // 如果在 register 的過程中,發生了錯誤
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    // 源碼中說得很清楚,如果到這里,說明后續可以進行 connect() 或 bind() 了,因為兩種情況:
    // 1. 如果 register 動作是在 eventLoop 中發起的,那么到這里的時候,register 一定已經完成
    // 2. 如果 register 任務已經提交到 eventLoop 中,也就是進到了 eventLoop 中的 taskQueue 中,
    //    由於后續的 connect 或 bind 也會進入到同一個 eventLoop 的 queue 中,所以一定是會先 register 成功,才會執行 connect 或 bind
    return regFuture;
}
我們要知道,不管是服務端的 NioServerSocketChannel 還是客戶端的 NioSocketChannel,在 bind 或 connect 時,都會先進入 initAndRegister 這個方法,所以我們上面說的那些,對於兩者都是通用的。
大家要記住,register 操作是非常重要的,要知道這一步大概做了哪些事情,register 操作以后,將進入到 bind 或 connect 操作中。
365天干貨不斷微信搜索「猿燈塔」第一時間閱讀,回復【資料】【面試】【簡歷】有我准備的一線大廠面試資料和簡歷模板

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM