相關概念
SO_SEND_BUF和SO_REC_BUFF
- SO_SEND_BUF是操作系統內核的寫緩沖區,所有應用程序需要發送到對端的信息,都會放到該緩沖區中,等待發往對端
- SO_REC_BUFF是操作系統內核的讀緩沖區,所有對端發過來的數據都會放到該緩沖區中,等待應用程序取走
ChannelOutboundBuffer
- 該buffer是Netty等待寫入系統內核緩沖區的消息隊列。
Channel的高低水位線
- Netty 中提供一種水位線的標志,提用戶當前通道的消息堆積情況;
- Netty 中的 Channel 都有一個寫緩沖區(ChannelOutboundBuffer),這是個 Netty 發數據時的倉庫,要發送的數據以數據結構 Entry 的形式存在倉庫中,Entry 是個鏈表中的節點;
- Netty 中的高低水位線,對應的就是這個鏈表中節點的數量范圍,用於限制程序的寫操作,自己在寫程序的時候,需要用相應的代碼給予配合,從而避免 OOM,增強寫數據時的安全性。
- 設置高低水位線參數(默認 32 * 1024 ~ 64 * 1024);
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
- 設置好了高低水位參數,如果自己在寫代碼的時候,沒有做判斷
channel.isWritable()
的,就跟沒設置一樣!!! 示例代碼片段:
// 這是對設置的高低水位線參數的尊重,如果設置了高低水位線,這里卻不做判斷,直接寫,就有可能 OOM; if (ctx.channel().isActive() && ctx.channel().isWritable()) { ctx.writeAndFlush(responseMessage); } else { log.error("message dropped"); }
Netty發送消息的流程
1、調用Channel的write方法,該方法會將消息加入ChannelOutboundBuffer,此時並沒有實際發送,netty會增加該連接發送隊列的水位線。
以下是AbstractChannel.java中的代碼片段:
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
2、調用Channel的flush方法,該方法將ChannelOutboundBuffer中的消息寫入內核緩沖區。AbstractChannelHandlerContext.flush方法,准備寫入。
public ChannelHandlerContext flush() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); } else { Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } safeExecute(executor, task, channel().voidPromise(), null); } return this; }
3、NioSocketChannel.doWrite方法,該方法調用java原生nio將數據寫入內核緩沖區。寫入完畢,將消息從ChannelOutboundBuffer移除並且減少ChannelOutboundBuffer的水位線。
protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
引用: