關閉連接:本質是取消 Channel 在 Selelctor 的注冊


關閉連接:本質是取消 Channel 在 Selelctor 的注冊

Netty 系列目錄(https://www.cnblogs.com/binarylei/p/10117436.html)

1. 主線分析

1.1 主線

關閉連接分兩種:主動關閉(正常關閉)和被動關閉(異常關閉)。

  • 多路復用器(Selector)接收到 OP_READ 事件
  • 處理 OP_READ 事件:NioSocketChannel.NioSocketChannelUnsafe.read():
    • 接受數據
    • 判斷接受的數據大小是否 < 0 , 如果是,說明是關閉,開始執行關閉
      • 關閉 channel(包含 cancel 多路復用器的key)
      • 清理消息:不接受新信息,fail 掉所有 queue 中消息。
      • 觸發 fireChannelInactive 和 fireChannelUnregistered。
    • 讀異常,同樣開始執行關閉

1.2 知識點

(1)關閉連接本質

一句話概括:關閉連接的本質是取消 Channel 在 Selelctor 的注冊。

  • java.nio.channels.spi.AbstractInterruptibleChannel#close
  • java.nio.channels.SelectionKey#cancel

(2)要點

  • 關閉連接,會觸發 OP_READ 方法。讀取字節數是 -1 代表關閉。
  • 數據讀取進行時,強行關閉,觸發 IO Exception,進而執行關閉。
  • Channel 的關閉包含了 SelectionKey 的 cancel。

補充1:NIO 中,如果一個客戶端進程退出,為什么會觸發服務器的 OP_READ 事件?

epoll 觸發一個對斷關閉然后在 jvm 層被包裝成了一個讀事件。因為 epoll 收到退出事件的時候要觸發一個讀操作,讀到 -1 認為退出,所以 java 從實際操作角度認為 epoll 的退出事件也是讀。所以簡化了 java 層處理的事件數。但這個時候用 channel.read() 方法讀的時候,會報 java.io.IOException: 遠程主機強迫關閉了一個現有的連接。如果是主動關閉可以在觸發讀事件第一件事是判斷是否有效,比如先讀一個字節 看看是不是 -1,如果是 -1 就停止。 如果異常是 reset by peer,則表示被動關閉,一個流氓方法是所有和鏈接相關的異常都 catch,然后關閉這個鏈接,沒有更好的做法了,Netty 自己也是這樣做的。

轉載自《關於netty你需要了解的二三事》:https://cloud.tencent.com/developer/article/1452395

2. 源碼分析

連接關閉是會觸發 OP_READ 事件,無論是正常還是異常關閉,都會調用 closeOnRead 關閉連接,最終調用 unsafe.close 關閉連接。

2.1 read

AbstractNioByteChannel.NioByteUnsafe#read
    -> closeOnRead
        -> AbstractUnsafe#close
    -> handleReadException
        -> closeOnRead

在前面分析 AbstractNioByteChannel.NioByteUnsafe#read 時,我們忽略了異常的處理。現在回過頭再看一下代碼:

(1)NioByteUnsafe#read

try {
    do {
        byteBuf = allocHandle.allocate(allocator);
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        if (allocHandle.lastBytesRead() <= 0) {
            byteBuf.release();
            byteBuf = null;
            // 1. 正常關閉,返回 -1
            close = allocHandle.lastBytesRead() < 0;
            if (close) {
                readPending = false;
            }
            break;
        }
    } while (allocHandle.continueReading());

    if (close) {
        closeOnRead(pipeline);
    }
} catch (Throwable t) {
    // 2. 如果異常是IOException,也需要關閉
    handleReadException(pipeline, byteBuf, t, close, allocHandle);
}

說明: 無論是正常關閉(allocHandle.lastBytesRead() = -1)還是異常關閉(IOException),都會調用 closeOnRead 關閉連接。

(2)closeOnRead

closeOnRead 方法調用 close 關閉連接。

private void closeOnRead(ChannelPipeline pipeline) {
    if (!isInputShutdown0()) {
        if (isAllowHalfClosure(config())) {
            // 特殊需求
            shutdownInput();
            pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
        } else {
            // 基本上都是調用 close 方法關閉連接
            close(voidPromise());
        }
    } else {
        inputClosedSeenErrorOnRead = true;
        pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
    }
}

2.2 close

unsafe.close 關閉連接,最終調用 NioSocketChannel#doClose(javaChannel.close) 或 NioEventLoop#cancel(key.cancel) 關閉連接,本質都會調用到 SelectionKey#cancel 取消注冊。unsafe.close 做了如下工作:

  1. 預關閉:調用 prepareToClose 方法,實際上是判斷 socket 是否配置了 soLinger。一旦配置了 soLinger 參數,socket 關閉就變成阻塞了,需要返回一個線程單獨執行 doClose0 關閉任務。異步關閉連接,代碼就不看了。
  2. 真正關閉連接:doClose0 方法會調用 javaChannel().close 來關閉連接。本質上也是調用 SelectionKey#cancel 取消注冊。
  3. 清理 NioEventLoop 上的資源:重復調用 key.cancel(),但沒有影響。同時清理 NioEventLoop 上資源。至於為什么 doDeregister 方法要重復取消注冊?可能只調用 doDeregister 取消注冊。
  4. 觸發 ChannelInactive 和 ChannelUnregistered 事件。我們需要關注一下 head 有沒有什么特殊的處理。
AbstractChannel.AbstractUnsafe#close
    -> prepareToClose
    -> doClose0
        -> NioSocketChannel#doClose             # √ javaChannel.close
    -> fireChannelInactiveAndDeregister
        -> deregister
            -> AbstractNioChannel#doDeregister
                -> NioEventLoop#cancel         # √ key.cancel()
            -> pipeline#fireChannelInactive
            -> pipeline#fireChannelUnregistered

(1)close

private void close(final ChannelPromise promise, final Throwable cause,
                   final ClosedChannelException closeCause, final boolean notify) {
    final boolean wasActive = isActive();
    this.outboundBuffer = null;                  // 清理資源,不允許再寫數據到緩沖區
    Executor closeExecutor = prepareToClose();   // 1. 預關閉,設置soLinger后會阻塞關閉連接
    doClose0(promise);                           // 2. 真正關閉連接
    fireChannelInactiveAndDeregister(wasActive); // 3. 調用deregister,清理資源並觸發事件
}

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
    try {
        doDeregister();                        // 4. 調用eventloop.cancel
    } catch (Throwable t) {
    } finally {
        pipeline.fireChannelInactive();        // 5. 觸發channelInactive
        pipeline.fireChannelUnregistered();    // 6. 觸發dhannelUnregistered
    }
}

(2)prepareToClose

prepareToClose 返回了一個線程用來單獨執行關閉任務,因為開啟 soLinger 后,關閉連接是阻塞的,需要異步關閉連接。NioSocketChannelUnsafe 中的實現如下:

@Override
protected Executor prepareToClose() {
    // 配置soLinger后會阻塞關閉連接,返回一個默認的連接池執行關閉任務
    if (javaChannel().isOpen() && config().getSoLinger() > 0) {
        doDeregister();
        return GlobalEventExecutor.INSTANCE;
    }
    return null;
}

(3)doClose

@Override
protected void doClose() throws Exception {
    super.doClose();
    javaChannel().close();   // 核心
}

(4)doDeregister

// AbstractNioChannel
@Override
protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
}

void cancel(SelectionKey key) {
    key.cancel();             // 核心
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        cancelledKeys = 0;
        needsToSelectAgain = true;
    }
}

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM