netty调用writeAndFlush后,一定能保证数据100%被发送吗?


以前用netty写了个代理的程序,生产环境一直跑的好好的,部署到新环境后出现了有时候会出现文件末尾缺失字节的情况。

当时我的第一想法是:“莫非哪里没有调用flush?可是为啥之前好好的?”

检查了一遍代码后确认没少flush,蒙圈了。打印日志看了半天,最后把netty定为了嫌疑人。

 

不多说,直接看代码

顺着channel的writeAndLush方法一路往下,最后会定位到 DefaultChannelPipeline的头结点HeadContext类,其flush方法如下:

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

这个unsafe对象是netty的底层数据操作类AbstractChannel,它的实现类其实就是对Epoll、Nio、Kqueue几种网络模型进行封装。

因为大多数情况都是用的Nio模型,所以我顺着Nio的子类找到了如下调用关系flush()->flush0()->doWrite()

        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }
protected void flush0() {
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    initialCloseCause = t;
                    close(voidPromise(), t, newClosedChannelException(t), false);
                } else {
                    try {
                        shutdownOutput(voidPromise(), t);
                    } catch (Throwable t2) {
                        initialCloseCause = t;
                        close(voidPromise(), t2, newClosedChannelException(t), false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
        }


    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }

问题的关键就在doWrite方法里的do-while循环,这个循环通过自旋计数变量writeSpinCount 来控制最大循环次数,以防止单次写入操作消耗过多时间阻塞线程。

    private volatile int writeSpinCount = 16;

也就是说,最多循环16次,没写完也暂时不管了!!!

当然netty会有后续的处理,它会再次注册写监听事件到channel表示我还想写

    protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }

如果tcp通道没断,netty的这种处理方式是不会有问题的

但是,如果你是先flush刷数据,再马上close,这个机制就不能保证数据100%不丢失了。

 

当netty用于转发,就是从channelA读数据,往channelB写数据时,如果读写速度差距过大,就很容易发生这种现象。

我这次就是碰到了双网卡,网络环境不一样,一个快一个慢。

 

解决办法也是有的

netty提供了两个标志,用来判断缓冲区的使用情况

WRITE_BUFFER_HIGH_WATER_MARK

WRITE_BUFFER_LOW_WATER_MARK

当缓冲区水位达到高警戒线时就会触发channelWritabilityChanged回调函数,在这里注销掉对读事件的监听关闭tcp窗口

,等水位降下来到低警戒线时再触发channelWritabilityChanged回调函数,就把读事件再给注册上去。

这样可以平衡读写速度,还能防止内存占用过多

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM