netty調用writeAndFlush后,一定能保證數據100%被發送嗎?


以前用netty寫了個代理的程序,生產環境一直跑的好好的,部署到新環境后出現了有時候會出現文件末尾缺失字節的情況。

當時我的第一想法是:“莫非哪里沒有調用flush?可是為啥之前好好的?”

檢查了一遍代碼后確認沒少flush,蒙圈了。打印日志看了半天,最后把netty定為了嫌疑人。

 

不多說,直接看代碼

順着channel的writeAndLush方法一路往下,最后會定位到 DefaultChannelPipeline的頭結點HeadContext類,其flush方法如下:

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

這個unsafe對象是netty的底層數據操作類AbstractChannel,它的實現類其實就是對Epoll、Nio、Kqueue幾種網絡模型進行封裝。

因為大多數情況都是用的Nio模型,所以我順着Nio的子類找到了如下調用關系flush()->flush0()->doWrite()

        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }
protected void flush0() {
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    initialCloseCause = t;
                    close(voidPromise(), t, newClosedChannelException(t), false);
                } else {
                    try {
                        shutdownOutput(voidPromise(), t);
                    } catch (Throwable t2) {
                        initialCloseCause = t;
                        close(voidPromise(), t2, newClosedChannelException(t), false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
        }


    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }

問題的關鍵就在doWrite方法里的do-while循環,這個循環通過自旋計數變量writeSpinCount 來控制最大循環次數,以防止單次寫入操作消耗過多時間阻塞線程。

    private volatile int writeSpinCount = 16;

也就是說,最多循環16次,沒寫完也暫時不管了!!!

當然netty會有后續的處理,它會再次注冊寫監聽事件到channel表示我還想寫

    protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }

如果tcp通道沒斷,netty的這種處理方式是不會有問題的

但是,如果你是先flush刷數據,再馬上close,這個機制就不能保證數據100%不丟失了。

 

當netty用於轉發,就是從channelA讀數據,往channelB寫數據時,如果讀寫速度差距過大,就很容易發生這種現象。

我這次就是碰到了雙網卡,網絡環境不一樣,一個快一個慢。

 

解決辦法也是有的

netty提供了兩個標志,用來判斷緩沖區的使用情況

WRITE_BUFFER_HIGH_WATER_MARK

WRITE_BUFFER_LOW_WATER_MARK

當緩沖區水位達到高警戒線時就會觸發channelWritabilityChanged回調函數,在這里注銷掉對讀事件的監聽關閉tcp窗口

,等水位降下來到低警戒線時再觸發channelWritabilityChanged回調函數,就把讀事件再給注冊上去。

這樣可以平衡讀寫速度,還能防止內存占用過多

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM