SocketChannel 讀取ByteBuf 的過程


SocketChannel 讀取ByteBuf 的過程:

  我們首先看NioEventLoop 的processSelectedKey 方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  //獲取到channel 中的unsafe
  final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  //如果這個key 不是合法的, 說明這個channel 可能有問題
  if (!k.isValid()) {
    //代碼省略
  }
  try {
    //如果是合法的, 拿到key 的io 事件
    int readyOps = k.readyOps();
    //鏈接事件
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
      int ops = k.interestOps();
      ops &= ~SelectionKey.OP_CONNECT;
      k.interestOps(ops);
      unsafe.finishConnect();
    }
    //寫事件     if ((readyOps & SelectionKey.OP_WRITE) != 0) {       ch.unsafe().forceFlush();     }
    //讀事件和接受鏈接事件     //如果當前NioEventLoop 是work 線程的話, 這里就是op_read 事件     //如果是當前NioEventLoop 是boss 線程的話, 這里就是op_accept 事件     if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {       unsafe.read();       if (!ch.isOpen()) {         return;       }     }   } catch (CancelledKeyException ignored) {     unsafe.close(unsafe.voidPromise());   } }

  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 這里的判斷表示輪詢到事件是OP_READ 或者OP_ACCEPT 事件。之前我們分析過, 如果當前NioEventLoop 是work 線程的話, 那么這里就是OP_READ 事件, 也就是讀事件, 表示客戶端發來了數據流,這里會調用unsafe 的redis()方法進行讀取。那么這里的channel 是NioServerSocketChannel, 其綁定的unsafe 是NioByteUnsafe, 這里會走進NioByteUnsafe 的read()方法中

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                  byteBuf.release();
                  byteBuf = null;
                  close = allocHandle.lastBytesRead() < 0;
                  break;
             }

             allocHandle.incMessagesRead(1);
             readPending = false;
             pipeline.fireChannelRead(byteBuf);
             byteBuf = null;
         } while (allocHandle.continueReading());
             allocHandle.readComplete();
             pipeline.fireChannelReadComplete();

             if (close) {
                    closeOnRead(pipeline);
             }
   } catch (Throwable t) {
         handleReadException(pipeline, byteBuf, t, close, allocHandle);
   } finally {
          if (!readPending && !config.isAutoRead()) {
             removeReadOp();
          }
   }
}

  首先獲取SocketChannel 的config, pipeline 等相關屬性,final ByteBufAllocator allocator = config.getAllocator(); 這一步是獲取一個ByteBuf 的內存分配器, 用於分配ByteBuf。這里會走到DefaultChannelConfig 的getAllocator 方法中:

public ByteBufAllocator getAllocator() {
    return allocator;
}

  這里返回的DefualtChannelConfig 的成員變量, 我們看這個成員變量:

private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

  這里調用ByteBufAllocator 的屬性DEFAULT, 跟進去:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

  我們看到這里又調用了ByteBufUtil 的靜態屬性DEFAULT_ALLOCATOR, 再跟進去:

static final ByteBufAllocator DEFAULT_ALLOCATOR;

  DEFAULT_ALLOCATOR 這個屬性是在static 塊中初始化的,我們跟到static 塊中:

static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();

        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = alloc;

        THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
        logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);

        MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);
        logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
}

  首先判斷運行環境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在allocType 中,然后通過if 判斷, 最后局部變量alloc = PooledByteBufAllocator.DEFAULT, 最后將alloc 賦值到成員變量DEFAULT_ALLOCATOR , 我們跟到PooledByteBufAllocator 的DEFAULT 屬性中:

public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

  我們看到這里直接通過new 的方式, 創建了一個PooledByteBufAllocator 對象, 也就是基於申請一塊連續內存進行緩沖區分配的緩沖區分配器。緩沖區分配器的知識, 我們在前面的章節進行了詳細的剖析, 這里就不再贅述。回到NioByteUnsafe 的read()方法中:

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                  byteBuf.release();
                  byteBuf = null;
                  close = allocHandle.lastBytesRead() < 0;
                  break;
             }

             allocHandle.incMessagesRead(1);
             readPending = false;
             pipeline.fireChannelRead(byteBuf);
             byteBuf = null;
         } while (allocHandle.continueReading());
             allocHandle.readComplete();
             pipeline.fireChannelReadComplete();

             if (close) {
                    closeOnRead(pipeline);
             }
   } catch (Throwable t) {
         handleReadException(pipeline, byteBuf, t, close, allocHandle);
   } finally {
          if (!readPending && !config.isAutoRead()) {
             removeReadOp();
          }
   }
}

  這里ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator。final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle() 是創建一個handle, 我們之前的章節講過,handle 是對RecvByteBufAllocator 進行實際操作的對象,我們跟進recvBufAllocHandle:

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
   if (recvHandle == null) {
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

  這里是我們之前剖析過的邏輯, 如果不存在, 則創建handle 的實例。同樣allocHandle.reset(config)是將配置重置。重置完配置之后, 進行do-while循環, 有關循環終止條件allocHandle.continueReading()。在do-while 循環中, 首先看byteBuf = allocHandle.allocate(allocator) 這一步, 這里傳入了剛才創建的allocate 對象, 也就是PooledByteBufAllocator,這里會進入DefaultMaxMessagesRecvByteBufAllocator 類的allocate()方法中:

public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}

  這里的guess 方法, 會調用AdaptiveRecvByteBufAllocator 的guess 方法:

public int guess() {
    return nextReceiveBufferSize;
}

  這里會返回AdaptiveRecvByteBufAllocator 的成員變量nextReceiveBufferSize, 也就是下次所分配緩沖區的大小,  第一次分配的時候會分配初始大小, 也就是1024 字節。這樣, alloc.ioBuffer(guess())就會分配一個PooledByteBuf,我們跟到AbstractByteBufAllocator 的ioBuffer 方法中:

public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

  這里首先判斷是否能獲取jdk 的unsafe 對象, 默認為true, 所以會走到directBuffer(initialCapacity)中, 這里最終會分配一個PooledUnsafeDirectByteBuf 對象。回到NioByteUnsafe 的read()方法中,分配完了ByteBuf 之后, 再看這一步allocHandle.lastBytesRead(doReadBytes(byteBuf))。首先看參數doReadBytes(byteBuf)方法, 這步是將channel 中的數據讀取到我們剛分配的ByteBuf 中, 並返回讀取到的字節數,這里會調用到NioSocketChannel 的doReadBytes()方法:

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

  首先拿到綁定在channel 中的handler, 因為我們已經創建了handle, 所以這里會直接拿到。再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())這步, byteBuf.writableBytes()返回byteBuf 的可寫字節數,也就是最多能從channel 中讀取多少字節寫到ByteBuf, allocate 的attemptedBytesRead 會把可寫字節數設置到DefaultMaxMessagesRecvByteBufAllocator 類的attemptedBytesRead 屬性中, 跟到DefaultMaxMessagesRecvByteBufAllocator 中的attemptedBytesRead 我們會看到:

public void attemptedBytesRead(int bytes) {
   attemptedBytesRead = bytes;
}

  繼續看doReadBytes()方法。往下看最后, 通過byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())將jdk 底層的channel 中的數據寫入到我們創建的ByteBuf 中, 並返回實際寫入的字節數。回到NioByteUnsafe 的read()方法中繼續看allocHandle.lastBytesRead(doReadBytes(byteBuf))這步,剛才我們剖析過doReadBytes(byteBuf)返回的是實際寫入ByteBuf 的字節數, 再看lastBytesRead() 方法, 跟到DefaultMaxMessagesRecvByteBufAllocator 的lastBytesRead()方法中:

public final void lastBytesRead(int bytes) {
    lastBytesRead = bytes;
    // Ignore if bytes is negative, the interface contract states it will be detected externally after call.
    // The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
    totalBytesRead += bytes;
    if (totalBytesRead < 0) {
        totalBytesRead = Integer.MAX_VALUE;
    }
}

  這里會賦值兩個屬性, lastBytesRead 代表最后讀取的字節數, 這里賦值為我們剛才寫入ByteBuf 的字節數,totalBytesRead 表示總共讀取的字節數, 這里將寫入的字節數追加。繼續來到NioByteUnsafe 的read()方法,如果最后一次讀取數據為0, 說明已經將channel 中的數據全部讀取完畢, 將新創建的ByteBuf 釋放循環利用, 並跳出循環。allocHandle.incMessagesRead(1)這步是增加消息的讀取次數, 因為我們循環最多16 次, 所以當增加消息次數增加到16會結束循環。讀取完畢之后, 會通過pipeline.fireChannelRead(byteBuf)將傳遞channelRead 事件, 有關channelRead事件我們在前面的章節也進行了詳細的剖析。至此,小伙伴們應該有個疑問, 如果一次讀取不完, 就傳遞channelRead 事件, 那么server 接收到的數據有可能就是不完整的, 其實關於這點, Netty 也做了相應的處理, 我們會在之后的章節詳細剖析Netty 的半包處理機制。循環結束后,會執行到allocHandle.readComplete()這一步。

  我們知道第一次分配ByteBuf 的初始容量是1024, 但是初始容量不一定一定滿足所有的業務場景, netty 中, 將每次讀取數據的字節數進行記錄, 然后之后次分配ByteBuf 的時候, 容量會盡可能的符合業務場景所需要大小, 具體實現方式,就是在readComplete()這一步體現的。我們跟到AdaptiveRecvByteBufAllocator 的readComplete()方法中:

public void readComplete() {
     record(totalBytesRead());
}

  這里調用了record()方法, 並且傳入了這一次所讀取的字節總數,跟到record()方法中:

private void record(int actualReadBytes) {
    if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            index = Math.max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = Math.min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

  首先看判斷條件if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) 。這里index 是當前分配的緩沖區大小所在的SIZE_TABLE 中的索引, 將這個索引進行縮進, 然后根據縮進后的所以找出SIZE_TABLE中所存儲的內存值, 再判斷是否大於等於這次讀取的最大字節數, 如果條件成立, 說明分配的內存過大, 需要縮容操作,我們看if 塊中縮容相關的邏輯。首先if (decreaseNow) 會判斷是否立刻進行收縮操作, 通常第一次不會進行收縮操作,然后會將decreaseNow 設置為true, 代表下一次直接進行收縮操作。假設需要立刻進行收縮操作, 我們看收縮操作的相關邏輯:

  index = Math.max(index - INDEX_DECREMENT, minIndex) 這一步將索引縮進一步, 但不能小於最小索引值;然后通過nextReceiveBufferSize = SIZE_TABLE[index] 獲取設置索引之后的內存, 賦值在nextReceiveBufferSize, 也就是下次需要分配的大小, 下次就會根據這個大小分配ByteBuf 了, 這樣就實現了縮容操作。再看else if (actualReadBytes >= nextReceiveBufferSize) ,這里判斷這次讀取字節的總量比上次分配的大小還要大,則進行擴容操作。擴容操作也很簡單, 索引步進, 然后拿到步進后的索引所對應的內存值, 作為下次所需要分配的大小在NioByteUnsafe 的read()方法中,經過了縮容或者擴容操作之后, 通過pipeline.fireChannelReadComplete()傳播ChannelReadComplete()事件,以上就是讀取客戶端消息的相關流程。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM