Netty——發送消息流程&高低水位


 

相關概念

SO_SEND_BUF和SO_REC_BUFF

  • SO_SEND_BUF是操作系統內核的寫緩沖區,所有應用程序需要發送到對端的信息,都會放到該緩沖區中,等待發往對端
  • SO_REC_BUFF是操作系統內核的讀緩沖區,所有對端發過來的數據都會放到該緩沖區中,等待應用程序取走

ChannelOutboundBuffer

  • 該buffer是Netty等待寫入系統內核緩沖區的消息隊列。

Channel的高低水位線

  • Netty 中提供一種水位線的標志,提用戶當前通道的消息堆積情況;
  • Netty 中的 Channel 都有一個寫緩沖區(ChannelOutboundBuffer),這是個 Netty 發數據時的倉庫,要發送的數據以數據結構 Entry 的形式存在倉庫中,Entry 是個鏈表中的節點;
  • Netty 中的高低水位線,對應的就是這個鏈表中節點的數量范圍,用於限制程序的寫操作,自己在寫程序的時候,需要用相應的代碼給予配合,從而避免 OOM,增強寫數據時的安全性。
  • 設置高低水位線參數(默認 32 * 1024 ~ 64 * 1024);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
  • 設置好了高低水位參數,如果自己在寫代碼的時候,沒有做判斷 channel.isWritable() 的,就跟沒設置一樣!!! 示例代碼片段:
// 這是對設置的高低水位線參數的尊重,如果設置了高低水位線,這里卻不做判斷,直接寫,就有可能 OOM;
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
    ctx.writeAndFlush(responseMessage);
} else {
    log.error("message dropped");
}

 

Netty發送消息的流程

1、調用Channelwrite方法,該方法會將消息加入ChannelOutboundBuffer,此時並沒有實際發送,netty會增加該連接發送隊列的水位線。
以下是AbstractChannel.java中的代碼片段:

public final void write(Object msg, ChannelPromise promise) {
       assertEventLoop();
       ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
       if (outboundBuffer == null) {
           // If the outboundBuffer is null we know the channel was closed and so
           // need to fail the future right away. If it is not null the handling of the rest
           // will be done in flush0()
           // See https://github.com/netty/netty/issues/2362
           safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
           // release message now to prevent resource-leak
           ReferenceCountUtil.release(msg);
           return;
       }
       int size;
       try {
           msg = filterOutboundMessage(msg);
           size = pipeline.estimatorHandle().size(msg);
           if (size < 0) {
               size = 0;
           }
       } catch (Throwable t) {
           safeSetFailure(promise, t);
           ReferenceCountUtil.release(msg);
           return;
       }
       outboundBuffer.addMessage(msg, size, promise);
   }

2、調用Channelflush方法,該方法將ChannelOutboundBuffer中的消息寫入內核緩沖區。AbstractChannelHandlerContext.flush方法,准備寫入。

public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeFlush();
    } else {
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel().voidPromise(), null);
    }
    return this;
}

3、NioSocketChannel.doWrite方法,該方法調用java原生nio將數據寫入內核緩沖區。寫入完畢,將消息從ChannelOutboundBuffer移除並且減少ChannelOutboundBuffer的水位線。

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }
            // Ensure the pending writes are made of ByteBufs only.
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();
            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);
        incompleteWrite(writeSpinCount < 0);
    }

 

 

引用:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM