以前用netty寫了個代理的程序,生產環境一直跑的好好的,部署到新環境后出現了有時候會出現文件末尾缺失字節的情況。
當時我的第一想法是:“莫非哪里沒有調用flush?可是為啥之前好好的?”
檢查了一遍代碼后確認沒少flush,蒙圈了。打印日志看了半天,最后把netty定為了嫌疑人。
不多說,直接看代碼
順着channel的writeAndLush方法一路往下,最后會定位到 DefaultChannelPipeline的頭結點HeadContext類,其flush方法如下:
@Override public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }
這個unsafe對象是netty的底層數據操作類AbstractChannel,它的實現類其實就是對Epoll、Nio、Kqueue幾種網絡模型進行封裝。
因為大多數情況都是用的Nio模型,所以我順着Nio的子類找到了如下調用關系flush()->flush0()->doWrite()
@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); } protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ initialCloseCause = t; close(voidPromise(), t, newClosedChannelException(t), false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { initialCloseCause = t; close(voidPromise(), t2, newClosedChannelException(t), false); } } } finally { inFlush0 = false; } } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
問題的關鍵就在doWrite方法里的do-while循環,這個循環通過自旋計數變量writeSpinCount 來控制最大循環次數,以防止單次寫入操作消耗過多時間阻塞線程。
private volatile int writeSpinCount = 16;
也就是說,最多循環16次,沒寫完也暫時不管了!!!
當然netty會有后續的處理,它會再次注冊寫監聽事件到channel表示我還想寫
protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
如果tcp通道沒斷,netty的這種處理方式是不會有問題的
但是,如果你是先flush刷數據,再馬上close,這個機制就不能保證數據100%不丟失了。
當netty用於轉發,就是從channelA讀數據,往channelB寫數據時,如果讀寫速度差距過大,就很容易發生這種現象。
我這次就是碰到了雙網卡,網絡環境不一樣,一個快一個慢。
解決辦法也是有的
netty提供了兩個標志,用來判斷緩沖區的使用情況
WRITE_BUFFER_HIGH_WATER_MARK
WRITE_BUFFER_LOW_WATER_MARK
當緩沖區水位達到高警戒線時就會觸發channelWritabilityChanged回調函數,在這里注銷掉對讀事件的監聽關閉tcp窗口
,等水位降下來到低警戒線時再觸發channelWritabilityChanged回調函數,就把讀事件再給注冊上去。
這樣可以平衡讀寫速度,還能防止內存占用過多
