自頂向下深入分析Netty(六)--Channel總述


自頂向下深入分析Netty(六)--Channel總述

自頂向下深入分析Netty(六)--Channel源碼實現

6.1 總述

6.1.1 Channel

JDK中的Channel是通訊的載體,而Netty中的Channel在此基礎上進行封裝從而賦予了Channel更多的能力,用戶可以使用Channel進行以下操作:

  • 查詢Channel的狀態。
  • 配置Channel的參數。
  • 進行Channel支持的I/O操作(read,write,connect,bind)。
  • 獲取對應的ChannelPipeline,從而可以自定義處理I/O事件或者其他請求。

為了保證在使用Channel或者處理I/O操作時不出現錯誤,以下幾點需要特別注意:

  1. 所有的I/O操作都是異步的
    由於采用事件驅動的機制,所以Netty中的所有IO操作都是異步的。這意味着當我們調用一個IO操作時,方法會立即返回並不保證操作已經完成。由上一章Future的講解中,我們知道,這些IO操作會返回一個ChannelFuture對象,我們需要通過添加監聽者的方式執行操作完成后需執行的代碼
  2. Channel是有等級的
    如果一個Channel由另一個Channel創建,那么他們之間形成父子關系。比如說,當ServerSocketChannel通過accept()方法接受一個SocketChannel時,那么SocketChannel的父親是ServerSocketChannel,調用SocketChannel的parent()方法返回該ServerSocketChannel對象。
  3. 可以使用向下轉型獲取子類的特定操作
    某些子類Channel會提供一些所需的特定操作,可以向下轉型到這樣的子類,從而獲得特定操作。比如說,對於UDP的數據報的傳輸,有特定的join()和leave()操作,我們可以向下轉型到DatagramChannel從而使用這些操作。
  4. 釋放資源
    當一個Channel不再使用時,須調用close()或者close(ChannelPromise)方法釋放資源。

6.1.2 Channel配置參數

(1).通用參數

CONNECT_TIMEOUT_MILLIS
        Netty參數,連接超時毫秒數,默認值30000毫秒即30秒。

MAX_MESSAGES_PER_READ
        Netty參數,一次Loop讀取的最大消息數,對於ServerChannel或者NioByteChannel,默認值為16,其他Channel默認值為1。默認值這樣設置,是因為:ServerChannel需要接受足夠多的連接,保證大吞吐量,NioByteChannel可以減少不必要的系統調用select。

WRITE_SPIN_COUNT
        Netty參數,一個Loop寫操作執行的最大次數,默認值為16。也就是說,對於大數據量的寫操作至多進行16次,如果16次仍沒有全部寫完數據,此時會提交一個新的寫任務給EventLoop,任務將在下次調度繼續執行。這樣,其他的寫請求才能被響應不會因為單個大數據量寫請求而耽誤。

ALLOCATOR
        Netty參數,ByteBuf的分配器,默認值為ByteBufAllocator.DEFAULT,4.0版本為UnpooledByteBufAllocator,4.1版本為PooledByteBufAllocator。該值也可以使用系統參數io.netty.allocator.type配置,使用字符串值:"unpooled","pooled"。

RCVBUF_ALLOCATOR
        Netty參數,用於Channel分配接受Buffer的分配器,默認值為AdaptiveRecvByteBufAllocator.DEFAULT,是一個自適應的接受緩沖區分配器,能根據接受到的數據自動調節大小。可選值為FixedRecvByteBufAllocator,固定大小的接受緩沖區分配器。

AUTO_READ
        Netty參數,自動讀取,默認值為True。Netty只在必要的時候才設置關心相應的I/O事件。對於讀操作,需要調用channel.read()設置關心的I/O事件為OP_READ,這樣若有數據到達才能讀取以供用戶處理。該值為True時,每次讀操作完畢后會自動調用channel.read(),從而有數據到達便能讀取;否則,需要用戶手動調用channel.read()。需要注意的是:當調用config.setAutoRead(boolean)方法時,如果狀態由false變為true,將會調用channel.read()方法讀取數據;由true變為false,將調用config.autoReadCleared()方法終止數據讀取。

WRITE_BUFFER_HIGH_WATER_MARK
        Netty參數,寫高水位標記,默認值64KB。如果Netty的寫緩沖區中的字節超過該值,Channel的isWritable()返回False

WRITE_BUFFER_LOW_WATER_MARK
        Netty參數,寫低水位標記,默認值32KB。當Netty的寫緩沖區中的字節超過高水位之后若下降到低水位,則Channel的isWritable()返回True。寫高低水位標記使用戶可以控制寫入數據速度,從而實現流量控制。推薦做法是:每次調用channl.write(msg)方法首先調用channel.isWritable()判斷是否可寫。

MESSAGE_SIZE_ESTIMATOR
        Netty參數,消息大小估算器,默認為DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder為實際大小,FileRegion估算值為0。該值估算的字節數在計算水位時使用,FileRegion為0可知FileRegion不影響高低水位。

SINGLE_EVENTEXECUTOR_PER_GROUP
        Netty參數,單線程執行ChannelPipeline中的事件,默認值為True。該值控制執行ChannelPipeline中執行ChannelHandler的線程。如果為Trye,整個pipeline由一個線程執行,這樣不需要進行線程切換以及線程同步,是Netty4的推薦做法;如果為False,ChannelHandler中的處理過程會由Group中的不同線程執行。

(2).SocketChannel參數

SO_RCVBUF
        Socket參數,TCP數據接收緩沖區大小。該緩沖區即TCP接收滑動窗口,linux操作系統可使用命令:cat /proc/sys/net/ipv4/tcp_rmem查詢其大小。一般情況下,該值可由用戶在任意時刻設置,但當設置值超過64KB時,需要在連接到遠端之前設置。

SO_SNDBUF
        Socket參數,TCP數據發送緩沖區大小。該緩沖區即TCP發送滑動窗口,linux操作系統可使用命令:cat /proc/sys/net/ipv4/tcp_smem查詢其大小。

TCP_NODELAY
        TCP參數,立即發送數據,默認值為Ture(Netty默認為True而操作系統默認為False)。該值設置Nagle算法的啟用,改算法將小的碎片數據連接成更大的報文來最小化所發送的報文的數量,如果需要發送一些較小的報文,則需要禁用該算法。Netty默認禁用該算法,從而最小化報文傳輸延時。

SO_KEEPALIVE
        Socket參數,連接保活,默認值為False。啟用該功能時,TCP會主動探測空閑連接的有效性。可以將此功能視為TCP的心跳機制,需要注意的是:默認的心跳間隔是7200s即2小時。Netty默認關閉該功能

SO_REUSEADDR
        Socket參數,地址復用,默認值False。有四種情況可以使用:(1).當有一個有相同本地地址和端口的socket1處於TIME_WAIT狀態時,而你希望啟動的程序的socket2要占用該地址和端口,比如重啟服務且保持先前端口。(2).有多塊網卡或用IP Alias技術的機器在同一端口啟動多個進程,但每個進程綁定的本地IP地址不能相同。(3).單個進程綁定相同的端口到多個socket上,但每個socket綁定的ip地址不同。(4).完全相同的地址和端口的重復綁定。但這只用於UDP的多播,不用於TCP。

SO_LINGER
         Netty對底層Socket參數的簡單封裝,關閉Socket的延遲時間,默認值為-1,表示禁用該功能。-1以及所有<0的數表示socket.close()方法立即返回,但OS底層會將發送緩沖區全部發送到對端。0表示socket.close()方法立即返回,OS放棄發送緩沖區的數據直接向對端發送RST包,對端收到復位錯誤。非0整數值表示調用socket.close()方法的線程被阻塞直到延遲時間到或發送緩沖區中的數據發送完畢,若超時,則對端會收到復位錯誤。

IP_TOS
        IP參數,設置IP頭部的Type-of-Service字段,用於描述IP包的優先級和QoS選項。

ALLOW_HALF_CLOSURE
        Netty參數,一個連接的遠端關閉時本地端是否關閉,默認值為False。值為False時,連接自動關閉;為True時,觸發ChannelInboundHandler的userEventTriggered()方法,事件為ChannelInputShutdownEvent。

(3).ServerSocketChannel參數

SO_RCVBUF
        已說明,需要注意的是:當設置值超過64KB時,需要在綁定到本地端口前設置。該值設置的是由ServerSocketChannel使用accept接受的SocketChannel的接收緩沖區。

SO_REUSEADDR
        已說明

SO_BACKLOG
        Socket參數,服務端接受連接的隊列長度,如果隊列已滿,客戶端連接將被拒絕。默認值,Windows為200,其他為128。

(4).DatagramChannel參數

SO_BROADCAST
        Socket參數,設置廣播模式。

SO_RCVBUF
        已說明

SO_SNDBUF
        已說明

SO_REUSEADDR
        已說明

IP_MULTICAST_LOOP_DISABLED
        對應IP參數IP_MULTICAST_LOOP,設置本地回環接口的多播功能。由於IP_MULTICAST_LOOP返回True表示關閉,所以Netty加上后綴_DISABLED防止歧義。

IP_MULTICAST_ADDR
        對應IP參數IP_MULTICAST_IF,設置對應地址的網卡為多播模式。

IP_MULTICAST_IF
        對應IP參數IP_MULTICAST_IF2,同上但支持IPV6。

IP_MULTICAST_TTL
        IP參數,多播數據報的time-to-live即存活跳數。

IP_TOS
        已說明

DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION
        Netty參數,DatagramChannel注冊的EventLoop即表示已激活。


6.1.3 Channel接口

Channel接口中含有大量的方法,我們先對這些方法分類:

  1. 狀態查詢
    boolean isOpen(); // 是否開放
    boolean isRegistered(); // 是否注冊到一個EventLoop
    boolean isActive(); // 是否激活
    boolean isWritable();   // 是否可寫

 

open表示Channel的開放狀態,True表示Channel可用,False表示Channel已關閉不再可用。registered表示Channel的注冊狀態,True表示已注冊到一個EventLoop,False表示沒有注冊到EventLoop。active表示Channel的激活狀態,對於ServerSocketChannel,True表示Channel已綁定到端口;對於SocketChannel,表示Channel可用(open)且已連接到對端。Writable表示Channel的可寫狀態,當Channel的寫緩沖區outboundBuffer非null且可寫時返回True。
一個正常結束的Channel狀態轉移有以下兩種情況:

 REGISTERED->CONNECT/BIND->ACTIVE->CLOSE->INACTIVE->UNREGISTERED REGISTERED->ACTIVE->CLOSE->INACTIVE->UNREGISTERED 

其中第一種是服務端用於綁定的Channel或者客戶端用於發起連接的Channel,第二種是服務端接受的SocketChannel。一個異常關閉的Channel則不會服從這樣的狀態轉移。

  1. getter方法
    EventLoop eventLoop();  // 注冊到的EventLoop
    Channel parent();   // 父類Channel
    ChannelConfig config(); // 配置參數
    ChannelMetadata metadata(); // 元數據
    SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 遠端地址
    Unsafe unsafe();    // Unsafe對象
    ChannelPipeline pipeline(); // 事件管道,用於處理IO事件
    ByteBufAllocator alloc();   // 字節緩存分配器
    ChannelFuture closeFuture();    // Channel關閉時的異步結果
    ChannelPromise voidPromise();   

 

  1. 異步結果生成
    ChannelPromise newPromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);

 

  1. I/O事件處理
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    Channel read();
    ChannelFuture write(Object msg);
    Channel flush();
    ChannelFuture writeAndFlush(Object msg);

 

這里的I/O事件都是outbound出站事件,表示由用戶發起,即用戶可以調用這些方法產生響應的事件。對應地,有inbound入站事件,將在ChnanelPipeline一節中詳述。


6.1.4 Unsafe

Unsafe?直譯中文為不安全,這曾給我帶來極大的困擾。如果你是第一次遇到這種接口,一定會和我感同身受。一個Unsafe對象是不安全的?這里說的不安全,是相對於用戶程序員而言的,也就是說,用戶程序員使用Netty進行編程時不會接觸到這個接口和相關類。為什么不會接觸到呢?因為類似的接口和類是Netty的大量內部實現細節,不會暴露給用戶程序員。然而我們的目標是自頂向下深入分析Netty,所以有必要深入Unsafe雷區。我們先看Unsafe接口中的方法:

    SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 遠端地址
    ChannelPromise voidPromise();   // 不關心結果的異步Promise?
    ChannelOutboundBuffer outboundBuffer(); // 寫緩沖區
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, 
                              ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();

 

也許你已經發現Unsafe接口和Channel接口中都有register、bind等I/O事件相關的方法,它們有什么區別呢?回憶一下EventLoop線程實現,當一個selectedKey就緒時,對I/O事件的處理委托給unsafe對象實現,代碼類似如下:

    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        k.interestOps(k.interestOps() & ~SelectionKey.OP_CONNECT); 
        unsafe.finishConnect(); 
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                  || readyOps == 0) {
        unsafe.read(); 
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

 

也就是說,Unsafe的子類作為Channel的內部類,負責處理底層NIO相關的I/O事件。Channel則使用責任鏈的方式通過ChannelPipeline將事件提供給用戶自定義處理。

 

6.2 Channel實現

![Netty_Channel類圖][2]

Channel的類圖比較清晰。我們主要分析NioSocketChannel和NioServerSocketChannel這兩條線。

6.2.1 AbstractChannel

首先看其中的字段:

    private final Channel parent;   // 父Channel
    private final Unsafe unsafe;    
    private final DefaultChannelPipeline pipeline;  // 處理通道
    private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
    private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;    // 本地地址
    private volatile SocketAddress remoteAddress;   // 遠端地址
    private volatile EventLoop eventLoop;   // EventLoop線程
    private volatile boolean registered;    // 是否注冊到EventLoop

 然后,我們看其中的構造方法:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

 newUnsafe()和newChannelPipeline()可由子類覆蓋實現。在Netty的實現中每一個Channel都有一個對應的Unsafe內部類:AbstractChannel--AbstractUnsafe,AbstractNioChannel--AbstractNioUnsafe等等,newUnsafe()方法正好用來生成這樣的對應關系。ChannelPipeline將在之后講解,這里先了解它的功能:作為用戶處理器Handler的容器為用戶提供自定義處理I/O事件的能力即為用戶提供業務邏輯處理。AbstractChannel中對I/O事件的處理,都委托給ChannelPipeline處理,代碼都如出一轍:

    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

AbstractChannel其他方法都比較簡單,主要關注狀態判定的方法:

    public boolean isRegistered() {
        return registered;
    }

    public boolean isWritable() {
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        return buf != null && buf.isWritable(); // 寫緩沖區不為null且可寫
    }

 對於Channel的實現來說,其中的內部類Unsafe才是關鍵,因為其中含有I/O事件處理的細節。AbstractUnsafe作為AbstractChannel的內部類,定義了I/O事件處理的基本框架,其中的細節留給子類實現。我們將依次對各個事件框架進行分析。

  1. register事件框架

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (isRegistered()) {
            promise.setFailure(...);    // 已經注冊則失敗
            return;
        }
        if (!isCompatible(eventLoop)) { // EventLoop不兼容當前Channel
            promise.setFailure(...);
            return;
        }
        AbstractChannel.this.eventLoop = eventLoop;
        // 當前線程為EventLoop線程直接執行;否則提交任務給EventLoop線程
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(() -> { register0(promise); });
            } catch (Throwable t) {
                closeForcibly();    // 異常時關閉Channel
                closeFuture.setClosed();    
                safeSetFailure(promise, t);
            }
        }
    }

 

12-22行類似的代碼結構,Netty使用了很多次,這是為了保證I/O事件以及用戶定義的I/O事件處理邏輯(業務邏輯)在一個線程中處理。我們看提交的任務register0():

    private void register0(ChannelPromise promise) {
        try {
            // 確保Channel沒有關閉
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();   // 模板方法,細節由子類完成
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();  // 將用戶Handler添加到ChannelPipeline
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();   // 觸發Channel注冊事件
            if (isActive()) {
                // ServerSocketChannel接受的Channel此時已被激活
                if (firstRegistration) {
                    // 首次注冊且激活觸發Channel激活事件
                    pipeline.fireChannelActive();   
                } else if (config().isAutoRead()) {
                    beginRead();   // 可視為模板方法 
                }
            }
        } catch (Throwable t) {
            closeForcibly();     // 可視為模板方法
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

 

register0()方法定義了注冊到EventLoop的整體框架,整個流程如下:
(1).注冊的具體細節由doRegister()方法完成,子類中實現。
(2).注冊后將處理業務邏輯的用戶Handler添加到ChannelPipeline
(3).異步結果設置為成功,觸發Channel的Registered事件。
(4).對於服務端接受的客戶端連接,如果首次注冊,觸發Channel的Active事件如果已設置autoRead,則調用beginRead()開始讀取數據
對於(4)的是因為fireChannelActive()中也根據autoRead配置,調用了beginRead()方法。beginRead()方法其實也是一個框架,細節由doBeginRead()方法在子類中實現:

    public final void beginRead() {
        assertEventLoop();
        if (!isActive()) {
            return;
        }
        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(() -> { pipeline.fireExceptionCaught(e); });
            close(voidPromise());
        }
    }

異常處理的closeForcibly()方法也是一個框架,細節由doClose()方法在子類中實現:

    public final void closeForcibly() {
        assertEventLoop();
        try {
            doClose();
        } catch (Exception e) {
            logger.warn("Failed to close a channel.", e);
        }
    }

 register框架中有一對safeSetXXX()方法,將未完成的Promise標記為完成且成功或失敗,其實現如下:

    protected final void safeSetSuccess(ChannelPromise promise) {
        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
            logger.warn(...);
        }
    }

至此,register事件框架分析完畢。

  1. bind事件框架

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return; // 確保Channel沒有關閉
        }
        boolean wasActive = isActive();
        try {
            doBind(localAddress);   // 模板方法,細節由子類完成
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        if (!wasActive && isActive()) { 
            invokeLater(() -> { pipeline.fireChannelActive(); });   // 觸發Active事件
        }
        safeSetSuccess(promise);
    }

bind事件框架較為簡單,主要完成在Channel綁定完成后觸發Channel的Active事件。其中的invokeLater()方法向Channel注冊到的EventLoop提交一個任務:

    private void invokeLater(Runnable task) {
        try {
            eventLoop().execute(task);
        } catch (RejectedExecutionException e) {
            logger.warn("Can't invoke task later as EventLoop rejected it", e);
        }
    }

closeIfClosed()方法當Channel不再打開時關閉Channel,代碼如下:

    protected final void closeIfClosed() {
        if (isOpen()) {
            return;
        }
        close(voidPromise());
    }

close()也是一個框架,之后會進行分析。

  1. disconnect事件框架

    public final void disconnect(final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable()) {
            return;
        }
        boolean wasActive = isActive();
        try {
            doDisconnect(); // 模板方法,細節由子類實現
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        if (wasActive && !isActive()) {
            invokeLater(() ->{ pipeline.fireChannelInactive(); });  // 觸發Inactive事件
        }
        safeSetSuccess(promise);
        closeIfClosed(); // disconnect框架可能會調用close框架
    }

 

  1. close事件框架

    public final void close(final ChannelPromise promise) {
        assertEventLoop();
        close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
    }

    private void close(final ChannelPromise promise, final Throwable cause,
                       final ClosedChannelException closeCause, final boolean notify) {
        if (!promise.setUncancellable()) {
            return;
        }
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;   
        if (outboundBuffer == null) {   // outboundBuffer作為一個標記,為空表示Channel正在關閉
            if (!(promise instanceof VoidChannelPromise)) {
                // 當Channel關閉時,將此次close異步請求結果也設置為成功
                closeFuture.addListener( (future) -> { promise.setSuccess(); });
            }
            return;
        }
        if (closeFuture.isDone()) {
            safeSetSuccess(promise);    // 已經關閉,保證底層close只執行一次
            return;
        }
        final boolean wasActive = isActive();
        this.outboundBuffer = null; // 設置為空禁止write操作,同時作為標記字段表示正在關閉
        Executor closeExecutor = prepareToClose();
        if (closeExecutor != null) {
            closeExecutor.execute(() -> {
                try {
                    doClose0(promise);  // prepareToClose返回的executor執行
                } finally {
                    invokeLater( () -> { // Channel注冊的EventLoop執行
                        // 寫緩沖隊列中的數據全部設置失敗
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                        fireChannelInactiveAndDeregister(wasActive);
                    });
                }
            });
        } else {    // 當前調用線程執行
            try {
                doClose0(promise);
            } finally {
                outboundBuffer.failFlushed(cause, notify);
                outboundBuffer.close(closeCause);
            }
            if (inFlush0) {
                invokeLater( () -> { fireChannelInactiveAndDeregister(wasActive); });
            } else {
                fireChannelInactiveAndDeregister(wasActive);
            }
        }
    }
    
    private void doClose0(ChannelPromise promise) {
        try {
            doClose();  // 模板方法,細節由子類實現
            closeFuture.setClosed();
            safeSetSuccess(promise);
        } catch (Throwable t) {
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

 close事件框架保證只有一個線程執行了真正關閉的doClose()方法,prepareToClose()做一些關閉前的清除工作並返回一個Executor,如果不為空,需要在該Executor里執行doClose0()方法;為空,則在當前線程執行(為什么這樣設計?)。寫緩沖區outboundBuffer同時也作為一個標記字段,為空表示Channel正在關閉此時禁止寫操作。fireChannelInactiveAndDeregister()方法需要invokeLater()使用EventLoop執行,是因為其中會調用deRegister()方法觸發Inactive事件,而事件執行需要在EventLoop中執行。

    private void fireChannelInactiveAndDeregister(final boolean wasActive) {
        deregister(voidPromise(), wasActive && !isActive());
    }

 

  1. deregister事件框架

    public final void deregister(final ChannelPromise promise) {
        assertEventLoop();
        deregister(promise, false);
    }

    private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
        if (!promise.setUncancellable()) {
            return;
        }
        if (!registered) {
            safeSetSuccess(promise);    // 已經deregister
            return;
        }
        invokeLater( () -> {
            try {
                doDeregister(); // 模板方法,子類實現具體細節
            } catch (Throwable t) {
                logger.warn(...);
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive(); // 根據參數觸發Inactive事件
                }
                if (registered) {
                    registered = false;
                    pipeline.fireChannelUnregistered(); // 首次調用觸發Unregistered事件
                }
                safeSetSuccess(promise);
            }
        });
    }

deregister事件框架的處理流程很清晰,其中,使用invokeLater()方法是因為:用戶可能會在ChannlePipeline中將當前Channel注冊到新的EventLoop,確保ChannelPipiline事件和doDeregister()在同一個EventLoop完成([?][3])。

需要注意的是:事件之間可能相互調用,比如:disconnect->close->deregister。

  1. write事件框架

    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            // 聯系close操作,outboundBuffer為空表示Channel正在關閉,禁止寫數據
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            ReferenceCountUtil.release(msg); // 釋放msg 防止泄露
            return;
        }
        int size;
        try {
            msg = filterOutboundMessage(msg);
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        outboundBuffer.addMessage(msg, size, promise);
    }

事實上,這是Netty定義的write操作的全部代碼,完成的功能是將要寫的消息Msg加入到寫緩沖區。其中的filterOutboundMessage()可對消息進行過濾整理,例如把HeapBuffer轉為DirectBuffer,具體實現由子類負責:

    protected Object filterOutboundMessage(Object msg) throws Exception {
        return msg; // 默認實現
    }

 

  1. flush事件框架

    public final void flush() {
        assertEventLoop();
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return; // Channel正在關閉直接返回
        }
        outboundBuffer.addFlush();  // 添加一個標記
        flush0();
    }

    protected void flush0() {
        if (inFlush0) {
            return;     // 正在flush返回防止多次調用
        }
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return; // Channel正在關閉或者已沒有需要寫的數據
        }
        inFlush0 = true;
        if (!isActive()) {
            // Channel已經非激活,將所有進行中的寫請求標記為失敗
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }
        try {
            doWrite(outboundBuffer);    // 模板方法,細節由子類實現
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                outboundBuffer.failFlushed(t, true);
            }
        } finally {
            inFlush0 = false;
        }
    }

flush事件中執行真正的底層寫操作,Netty對於寫的處理引入了一個寫緩沖區ChannelOutboundBuffer,由該緩沖區控制Channel的可寫狀態,其具體實現,將會在緩沖區一章中分析。

至此,Unsafe中的事件方法已經分析完7個,但還有connect和read沒有引入,下一節將進行分析。

6.2.2 AbstractNioChannel

Netty的實現中,Unsafe的I/O事件框架中的細節實現方法doXXX()放到了Channel子類中而不是Unsafe子類中,所以我們先分析Unsafe,然后分析Channel。
AbstractNioChannel從名字可以看出是對NIO的抽象,我們自頂向下一步一步深入細節,該類中定義了一個NioUnsafe接口:

    public interface NioUnsafe extends Unsafe {
        SelectableChannel ch(); // 對應NIO中的JDK實現的Channel
        void finishConnect();   // 連接完成
        void read();    // 從JDK的Channel中讀取數據
        void forceFlush(); 
    }

回憶NIO的三大概念:Channel、Buffer、Selector,Netty的Channel包裝了JDK的Channel從而實現更為復雜的功能。Unsafe中可以使用ch()方法,NioChannel中可以使用javaChannel()方法獲得JDK的Channel。接口中定義了finishConnect()方法是因為:SelectableChannel設置為非阻塞模式時,connect()方法會立即返回,此時連接操作可能沒有完成,如果沒有完成,則需要調用JDK的finishConnect()方法完成連接操作。也許你已經注意到,AbstractUnsafe中並沒有connect事件框架,這是因為並不是所有連接都有標准的connect過程,比如Netty的LocalChannel和EmbeddedChannel。但是NIO中的連接操作則有較為標准的流程,在介紹Connect事件框架前,先介紹一下其中使用到的相關字段,這些字段定義在AbstractNioChannel中:

    private ChannelPromise connectPromise;  // 連接異步結果
    private ScheduledFuture<?> connectTimeoutFuture;    // 連接超時檢測任務異步結果
    private SocketAddress requestedRemoteAddress;   // 連接的遠端地址

Connect事件框架:

    public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
                                            final ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return; // Channel已被關閉
        }
        try {
            if (connectPromise != null) {
                throw new ConnectionPendingException(); // 已有連接操作正在進行
            }
            boolean wasActive = isActive();
            // 模板方法,細節子類完成
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);  // 連接操作已完成
            } else {
                // 連接操作尚未完成
                connectPromise = promise;
                requestedRemoteAddress = remoteAddress;
                // 這部分代碼為Netty的連接超時機制
                int connectTimeoutMillis = config().getConnectTimeoutMillis();
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(() -> {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause = new ConnectTimeoutException("...");
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }

                promise.addListener((ChannelFutureListener) (future) -> {
                    if (future.isCancelled()) {
                        // 連接操作取消則連接超時檢測任務取消
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                });
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }

Connect事件框架中包含了Netty的連接超時檢測機制:向EventLoop提交一個調度任務,設定的超時時間已到則向連接操作的異步結果設置失敗然后關閉連接。fulfillConnectPromise()設置異步結果為成功並觸發Channel的Active事件:

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
        if (promise == null) {
            return; // 操作已取消或Promise已被通知?
        }
        boolean active = isActive();
        boolean promiseSet = promise.trySuccess();  // False表示用戶取消操作
        if (!wasActive && active) { // 此時用戶沒有取消Connect操作
            pipeline().fireChannelActive(); // 觸發Active事件
        }
        if (!promiseSet) {
            close(voidPromise()); // 操作已被用戶取消,關閉Channel
        }
    }

 

FinishConnect事件框架:

    public final void finishConnect() {
        assert eventLoop().inEventLoop();
        try {
            boolean wasActive = isActive();
            doFinishConnect();  // 模板方法
            fulfillConnectPromise(connectPromise, wasActive);   // 首次Active觸發Active事件
        } catch (Throwable t) {
            fulfillConnectPromise(connectPromise, annotateConnectException(...));
        } finally {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false); // 連接完成,取消超時檢測任務
            }
            connectPromise = null;
        }
    }

finishConnect()只由EventLoop處理就緒selectionKey的OP_CONNECT事件時調用,從而完成連接操作。注意:連接操作被取消或者超時不會使該方法被調用。

Flush事件細節:

    protected final void flush0() {
        if (isFlushPending()) {
            return; // 已經有flush操作,返回
        }
        super.flush0(); // 調用父類方法
    }

    public final void forceFlush() {
        super.flush0(); // 調用父類方法
    }

    private boolean isFlushPending() {
        SelectionKey selectionKey = selectionKey();
        return selectionKey.isValid() && 
                    (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
    }

forceFlush()方法由EventLoop處理就緒selectionKey的OP_WRITE事件時調用,將緩沖區中的數據寫入Channel。isFlushPending()方法容易導致困惑:為什么selectionKey關心OP_WRITE事件表示正在Flush呢?OP_WRITE表示通道可寫,而一般情況下通道都可寫,如果selectionKey一直關心OP_WRITE事件,那么將不斷從select()方法返回從而導致死循環。Netty使用一個寫緩沖區,write操作將數據放入緩沖區中,flush時設置selectionKey關心OP_WRITE事件,完成后取消關心OP_WRITE事件。所以,如果selectionKey關心OP_WRITE事件表示此時正在Flush數據。

AbstractNioUnsafe還有最后一個方法removeReadOp():

    protected final void removeReadOp() {
        SelectionKey key = selectionKey();
        if (!key.isValid()) {
            return; // selectionKey已被取消
        }
        int interestOps = key.interestOps();
        if ((interestOps & readInterestOp) != 0) {
            key.interestOps(interestOps & ~readInterestOp); // 設置為不再感興趣
        }
    }

Netty中將服務端的OP_ACCEPT和客戶端的Read統一抽象為Read事件,在NIO底層I/O事件使用bitmap表示,一個二進制位對應一個I/O事件。當一個二進制位為1時表示關心該事件,readInterestOp的二進制表示只有1位為1,所以體會interestOps & ~readInterestOp的含義,可知removeReadOp()的功能是設置SelectionKey不再關心Read事件。類似的,還有setReadOp()、removeWriteOp()、setWriteOp()等等。

分析完AbstractNioUnsafe,我們再分析AbstractNioChannel,首先看其中還沒講解的字段:

    private final SelectableChannel ch; // 包裝的JDK Channel
    protected final int readInterestOp; // Read事件,服務端OP_ACCEPT,其他OP_READ
    volatile SelectionKey selectionKey; // JDK Channel對應的選擇鍵
    private volatile boolean inputShutdown; // Channel的輸入關閉標記
    private volatile boolean readPending;   // 底層讀事件進行標記

再看一下構造方法:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);    // 設置非阻塞模式
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                // log
            }
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

其中的ch.configureBlocking(false)方法設置Channel為非阻塞模式,從而為Netty提供非阻塞處理I/O事件的能力。

對於AbstractNioChannel的方法,我們主要分析它實現I/O事件框架細節部分的doXXX()方法。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // 選擇鍵取消重新selectNow(),清除因取消操作而緩存的選擇鍵
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }
    
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey()); // 設置取消選擇鍵
    }

對於Register事件,當Channel屬於NIO時,已經可以確定注冊操作的全部細節:將Channel注冊到給定NioEventLoop的selector上即可。注意,其中第二個參數0表示注冊時不關心任何事件,第三個參數為Netty的NioChannel對象本身。對於Deregister事件,選擇鍵執行cancle()操作,選擇鍵表示JDK Channel和selctor的關系,調用cancle()終結這種關系,從而實現從NioEventLoop中Deregister。需要注意的是:cancle操作調用后,注冊關系不會立即生效,而會將cancle的key移入selector的一個取消鍵集合,當下次調用select相關方法或一個正在進行的select調用結束時,會從取消鍵集合中移除該選擇鍵,此時注銷才真正完成。一個Cancle的選擇鍵為無效鍵,調用它相關的方法會拋出CancelledKeyException。

    protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return; // Channel的輸入關閉?什么情況下發生?
        }
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return; // 選擇鍵被取消而不再有效
        }
        readPending = true; // 設置底層讀事件正在進行
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 選擇鍵關心Read事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

對於NioChannel的beginRead事件,只需將Read事件設置為選擇鍵所關心的事件,則之后的select()調用如果Channel對應的Read事件就緒,便會觸發Netty的read()操作。

    protected void doClose() throws Exception {
        ChannelPromise promise = connectPromise;
        if (promise != null) {
            // 連接操作還在進行,但用戶調用close操作
            promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
            connectPromise = null;
        }
        ScheduledFuture<?> future = connectTimeoutFuture;
        if (future != null) {
            // 如果有連接超時檢測任務,則取消
            future.cancel(false);
            connectTimeoutFuture = null;
        }
    }

此處的doClose操作主要處理了連接操作相關的后續處理。並沒有實際關閉Channel,所以需要子類繼續增加細節實現。AbstractNioChannel中還有關於創建DirectBuffer的方法,將在以后必要時進行分析。其他的方法則較為簡單,不在列出。最后提一下isCompatible()方法,說明NioChannel只在NioEventLoop中可用。

    protected boolean isCompatible(EventLoop loop) {
        return loop instanceof NioEventLoop;
    }

 

AbstractNioChannel的子類實現分為服務端AbstractNioMessageChannel和客戶端AbstractNioByteChannel,我們將首先分析服務端AbstractNioMessageChannel。

6.2.3 AbstractNioMessageChannel

AbstractNioMessageChannel是底層數據為消息的NioChannel。在Netty中,服務端Accept的一個Channel被認為是一條消息,UDP數據報也是一條消息。該類主要完善flush事件框架的doWrite細節和實現read事件框架(在內部類NioMessageUnsafe完成)。首先看read事件框架:

    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // 此時讀操作不被允許,既沒有配置autoRead也沒有底層讀事件進行
            removeReadOp(); // 清除read事件,不再關心
            return;
        }
        
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        final ChannelPipeline pipeline = pipeline();
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf); // 模板方法,讀取消息
                    if (localRead == 0) { // 沒有數據可讀
                        break;  
                    }
                    if (localRead < 0) { // 讀取出錯
                        closed = true;  
                        break;
                    }
                    if (!config.isAutoRead()) { //沒有設置AutoRead
                        break;
                    }
                    if (readBuf.size() >= maxMessagesPerRead) { // 達到最大可讀數
                        break;
                    }
                }
            } catch (Throwable t) {
                exception = t;
            }
            
            setReadPending(false);  // 已沒有底層讀事件
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));   //觸發ChannelRead事件,用戶處理
            }
            readBuf.clear(); // ChannelReadComplete事件中如果配置autoRead則會調用beginRead,從而不斷進行讀操作
            pipeline.fireChannelReadComplete(); // 觸發ChannelReadComplete事件,用戶處理

            if (exception != null) {
                if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                    // ServerChannel異常也不能關閉,應該恢復讀取下一個客戶端
                    closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                }
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                if (isOpen()) {
                    close(voidPromise());   // 非serverChannel且打開則關閉
                }
            }
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                // 既沒有配置autoRead也沒有底層讀事件進行
                removeReadOp();
            }
        }
    }

 

read事件框架的流程已在代碼中注明,需要注意的是讀取消息的細節doReadMessages(readBuf)方法由子類實現。
我們主要分析NioServerSocketChannel,它不支持doWrite()操作,所以我們不再分析本類的flush事件框架的doWrite細節方法,直接轉向下一個目標:NioServerSocketChannel。

6.2.4 NioServerSocketChannel

你肯定已經使用過NioServerSocketChannel,作為處於Channel最底層的子類,NioServerSocketChannel會實現I/O事件框架的底層細節。首先需要注意的是:NioServerSocketChannel只支持bind、read和close操作

   protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) { // JDK版本1.7以上
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
    
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
        try {
            if (ch != null) {
                // 一個NioSocketChannel為一條消息
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }
    
    protected void doClose() throws Exception {
        javaChannel().close();
    }

其中的實現,都是調用JDK的Channel的方法,從而實現了最底層的細節。需要注意的是:此處的doReadMessages()方法每次最多返回一個消息(客戶端連接),由此可知NioServerSocketChannel的read操作一次至多處理的連接數為config.getMaxMessagesPerRead(),也就是參數值MAX_MESSAGES_PER_READ。此外doClose()覆蓋了AbstractNioChannel的實現,因為NioServerSocketChannel不支持connect操作,所以不需要連接超時處理。

最后,我們再看關鍵構造方法:

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

其中的SelectionKey.OP_ACCEPT最為關鍵,Netty正是在此處將NioServerSocketChannel的read事件定義為NIO底層的OP_ACCEPT,統一完成read事件的抽象。

至此,我們已分析完兩條線索中的服務端部分,下面分析客戶端部分。首先是AbstractNioChannel的另一個子類AbstractNioByteChannel。

6.2.5 AbstractNioByteChannel

從字面可推知,AbstractNioByteChannel的底層數據為Byte字節。首先看構造方法:

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

 

其中的SelectionKey.OP_READ,說明AbstractNioByteChannel的read事件為NIO底層的OP_READ事件。
然后我們看read事件框架:

    public final void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // 此時讀操作不被允許,既沒有配置autoRead也沒有底層讀事件進行
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);  // 創建一個ByteBuf
                int writable = byteBuf.writableBytes(); 
                int localReadAmount = doReadBytes(byteBuf); // 模板方法,子類實現細節
                if (localReadAmount <= 0) { // 沒有數據可讀
                    byteBuf.release();
                    byteBuf = null;
                    close = localReadAmount < 0; // 讀取數據量為負數表示對端已經關閉
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);  // 沒有底層讀事件進行
                    // 此時,若autoRead關閉則必須調用beginRead,read操作才會讀取數據
                }
                pipeline.fireChannelRead(byteBuf);  // 觸發ChannelRead事件,用戶處理
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {   // 防止溢出
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }
                totalReadAmount += localReadAmount;

                if (!config.isAutoRead()) { // 沒有配置AutoRead
                    break;
                }
                if (localReadAmount < writable) {   // 讀取數小於可寫數,可能接受緩沖區已完全耗盡
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            // ReadComplete結束時,如果開啟autoRead則會調用beginRead,從而可以繼續read
            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                // 既沒有配置autoRead也沒有底層讀事件進行
                removeReadOp(); 
            }
        }
    }

AbstractNioByteChannel的read事件框架處理流程與AbstractNioMessageChannel的稍有不同:AbstractNioMessageChannel依次讀取Message,最后統一觸發ChannelRead事件;而AbstractNioByteChannel每讀取到一定字節就觸發ChannelRead事件。這是因為,AbstractNioMessageChannel需求高吞吐量,特別是ServerSocketChannel需要盡可能多地接受連接;而AbstractNioByteChannel需求快響應,要盡可能快地響應遠端請求

read事件的具體流程請參考代碼和代碼注釋進行理解,不再分析。注意到代碼中有關於接收緩沖區的代碼,這一部分我們單獨使用一節講述,之后會分析。當讀取到的數據小於零時,表示遠端連接已關閉,這時會調用closeOnRead(pipeline)方法:

    private void closeOnRead(ChannelPipeline pipeline) {
        SelectionKey key = selectionKey();
        setInputShutdown(); // 遠端關閉此時設置Channel的輸入源關閉
        if (isOpen()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
                // 取消關心Read事件並觸發UserEvent事件ChannelInputShutdownEvent
                key.interestOps(key.interestOps() & ~readInterestOp);   
                pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
            } else {
                close(voidPromise());   // 直接關閉
            }
        }
    }

 這段代碼正是Channel參數ALLOW_HALF_CLOSURE的意義描述,該參數為True時,會觸發用戶事件ChannelInputShutdownEvent,否則,直接關閉該Channel。

拋出異常時,會調用handleReadException(pipeline, byteBuf, t, close)方法:

    private void handleReadException(ChannelPipeline pipeline,
                                         ByteBuf byteBuf, Throwable cause, boolean close) {
        if (byteBuf != null) {  // 已讀取到數據
            if (byteBuf.isReadable()) { // 數據可讀
                setReadPending(false);
                pipeline.fireChannelRead(byteBuf);  
            } else {    // 數據不可讀
                byteBuf.release();
            }
        }
        pipeline.fireChannelReadComplete();
        pipeline.fireExceptionCaught(cause);
        if (close || cause instanceof IOException) {
            closeOnRead(pipeline);
        }
    }

可見,拋出異常時,如果讀取到可用數據和正常讀取一樣觸發ChannelRead事件,只是最后會統一觸發ExceptionCaught事件由用戶進行處理。

至此,read事件框架分析完畢,下面我們分析write事件的細節實現方法doWrite()。在此之前,先看filterOutboundMessage()方法對需要寫的數據進行過濾。

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }
            return newDirectBuffer(buf); // 非DirectBuf轉為DirectBuf
        }
        if (msg instanceof FileRegion) {
            return msg;
        }
        throw new UnsupportedOperationException("...");
    }

 

可知,Netty支持的寫數據類型只有兩種:DirectBufferFileRegion。我們再看這些數據怎么寫到Channel上,也就是doWrite()方法:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;
        boolean setOpWrite = false;
        for (;;) {
            Object msg = in.current();
            if (msg == null) {  // 數據已全部寫完
                clearOpWrite();     // 清除OP_WRITE事件
                return;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf); // 模板方法,子類實現細節
                    if (localFlushedAmount == 0) {
                        // NIO在非阻塞模式下寫操作可能返回0表示未寫入數據
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        // ByteBuf不可讀,此時數據已寫完
                        done = true;
                        break;
                    }
                }
                
                in.progress(flushedAmount); // 記錄進度
                if (done) {
                    in.remove();    // 完成時,清理緩沖區
                } else {
                    break;  // 跳出循環執行incompleteWrite()
                }
            } else if (msg instanceof FileRegion) {
                // ....
            } else {
                throw new Error();  // 其他類型不支持
            }
        }
        incompleteWrite(setOpWrite);
    }

代碼中省略了對FileRegion的處理,FileRegion是Netty對NIO底層的FileChannel的封裝,負責將File中的數據寫入到WritableChannel中。FileRegion的默認實現是DefaultFileRegion,如果你很感興趣它的實現,可以自行查閱。

我們主要分析對ByteBuf的處理。doWrite的流程簡潔明了,核心操作是模板方法doWriteBytes(buf),將ByteBuf中的數據寫入到Channel,由於NIO底層的寫操作返回已寫入的數據量,在非阻塞模式下該值可能為0,此時會調用incompleteWrite()方法:

    protected final void incompleteWrite(boolean setOpWrite) {
        if (setOpWrite) {
            setOpWrite();   // 設置繼續關心OP_WRITE事件
        } else {
            // 此時已進行寫操作次數writeSpinCount,但並沒有寫完
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = (Runnable) () -> { flush(); };
            }
            // 再次提交一個flush()任務
            eventLoop().execute(flushTask);
        }
    }

 

該方法分兩種情況處理,在上文提到的第一種情況(實際寫0數據)下,設置SelectionKey繼續關心OP_WRITE事件從而繼續進行寫操作;第二種情況下,也就是寫操作進行次數達到配置中的writeSpinCount值但尚未寫完,此時向EventLoop提交一個新的flush任務,此時可以響應其他請求,從而提交響應速度。這樣的處理,不會使大數據的寫操作占用全部資源而使其他請求得不到響應,可見這是一個較為公平的處理。這里引出一個問題:使用Netty如何搭建高性能文件服務器?
至此,已分析完對於Byte數據的read事件和doWrite細節的處理,接下里,繼續分析NioSocketChannel,從而完善各事件框架的細節部分。

6.2.6 NioSocketChannel

NioSocketChannel作為Channel的最末端子類,實現了NioSocket相關的最底層細節實現,首先看doBind():

    protected void doBind(SocketAddress localAddress) throws Exception {
        doBind0(localAddress);
    }

    private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress);   // JDK版本1.7以上
        } else {
            javaChannel().socket().bind(localAddress);
        }
    }

 

這部分代碼與NioServerSocketChannel中相同,委托給JDK的Channel進行綁定操作。
接着再看doConnect()和doFinishConnect()方法:

    protected boolean doConnect(SocketAddress remoteAddress, 
                                        SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
                // 設置關心OP_CONNECT事件,事件就緒時調用finishConnect()
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

    protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

 

JDK中的Channel在非阻塞模式下調用connect()方法時,會立即返回結果:成功建立連接返回True,操作還在進行時返回False。返回False時,需要在底層OP_CONNECT事件就緒時,調用finishConnect()方法完成連接操作。
再看doDisconnect()和doClose()方法:

    protected void doDisconnect() throws Exception {
        doClose();
    }

    protected void doClose() throws Exception {
        super.doClose();    // AbstractNioChannel中關於連接超時的處理
        javaChannel().close();
    }

 

然后看核心的doReadBytes()和doWriteXXX()方法:

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }

    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

    protected long doWriteFileRegion(FileRegion region) throws Exception {
        final long position = region.transfered();
        return region.transferTo(javaChannel(), position);
    }

 

對於read和write操作,委托給ByteBuf處理,我們將使用專門的一章,對這一部分細節進行完善,將在后面介紹。
NioSocketChannel最重要的部分是覆蓋了父類的doWrite()方法,使用更高效的方式進行寫操作,其代碼如下:

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                clearOpWrite(); // 所有數據已寫完,不再關心OP_WRITE事件
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            switch (nioBufferCnt) {
                case 0: // 沒有ByteBuffer,也就是只有FileRegion
                    super.doWrite(in);  // 使用父類方法進行普通處理
                    return;
                case 1: // 只有一個ByteBuffer,此時的處理等效於父類方法的處理
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default: // 多個ByteBuffer,采用gathering方法處理
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        // gathering方法,此時一次寫多個ByteBuffer
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }
            in.removeBytes(writtenBytes);   // 清理緩沖區
            if (!done) {
                incompleteWrite(setOpWrite);    // 寫操作並沒有完成
                break;
            }
        }
    }

 

在明白了父類的doWrite方法后,這段代碼便容易理解,本段代碼做的優化是:當輸出緩沖區中有多個buffer時,采用Gathering Writes將數據從這些buffer寫入到同一個channel。
在AbstractUnsafe對close事件框架的分析中,有一個prepareToClose()方法,進行關閉的必要處理並在必要時返回一個Executor執行doClose()操作,默認方法返回null,NioSocketChannelUnsafe覆蓋了父類的實現,代碼如下:

    protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    doDeregister(); // 取消選擇鍵selectionKey
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
                //
            }
            return null;
        }

 

SO_LINGER表示Socket關閉的延時時間,在此時間內,內核將繼續把TCP緩沖區的數據發送給對端且執行close操作的線程將阻塞直到數據發送完成。Netty的原則是I/O線程不能被阻塞,所以此時返回一個Executor用於執行阻塞的doClose()操作。doDeregister()取消選擇鍵selectionKey是因為:延遲關閉期間, 如果selectionKey仍然關心OP_WRITE事件,而輸出緩沖區又為null,這樣write操作直接返回,不會再執行clearOpWrite()操作取消關心OP_WRITE事件,而Channel一般是可寫的,這樣OP_WRITE事件會不斷就緒從而耗盡CPU,所以需要取消選擇鍵刪除注冊的事件。
[1]: //upload-images.jianshu.io/upload_images/3288959-5a4be2f31620177d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240
[2]: http://img.blog.csdn.net/20160928165809260
[3]: https://github.com/netty/netty/issues/4435

 


作者:Hypercube
鏈接:https://www.jianshu.com/p/9258af254e1d
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。


作者:Hypercube
鏈接:https://www.jianshu.com/p/fffc18d33159
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM