以前用netty写了个代理的程序,生产环境一直跑的好好的,部署到新环境后出现了有时候会出现文件末尾缺失字节的情况。
当时我的第一想法是:“莫非哪里没有调用flush?可是为啥之前好好的?”
检查了一遍代码后确认没少flush,蒙圈了。打印日志看了半天,最后把netty定为了嫌疑人。
不多说,直接看代码
顺着channel的writeAndLush方法一路往下,最后会定位到 DefaultChannelPipeline的头结点HeadContext类,其flush方法如下:
@Override public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }
这个unsafe对象是netty的底层数据操作类AbstractChannel,它的实现类其实就是对Epoll、Nio、Kqueue几种网络模型进行封装。
因为大多数情况都是用的Nio模型,所以我顺着Nio的子类找到了如下调用关系flush()->flush0()->doWrite()
@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); } protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ initialCloseCause = t; close(voidPromise(), t, newClosedChannelException(t), false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { initialCloseCause = t; close(voidPromise(), t2, newClosedChannelException(t), false); } } } finally { inFlush0 = false; } } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
问题的关键就在doWrite方法里的do-while循环,这个循环通过自旋计数变量writeSpinCount 来控制最大循环次数,以防止单次写入操作消耗过多时间阻塞线程。
private volatile int writeSpinCount = 16;
也就是说,最多循环16次,没写完也暂时不管了!!!
当然netty会有后续的处理,它会再次注册写监听事件到channel表示我还想写
protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }
如果tcp通道没断,netty的这种处理方式是不会有问题的
但是,如果你是先flush刷数据,再马上close,这个机制就不能保证数据100%不丢失了。
当netty用于转发,就是从channelA读数据,往channelB写数据时,如果读写速度差距过大,就很容易发生这种现象。
我这次就是碰到了双网卡,网络环境不一样,一个快一个慢。
解决办法也是有的
netty提供了两个标志,用来判断缓冲区的使用情况
WRITE_BUFFER_HIGH_WATER_MARK
WRITE_BUFFER_LOW_WATER_MARK
当缓冲区水位达到高警戒线时就会触发channelWritabilityChanged回调函数,在这里注销掉对读事件的监听关闭tcp窗口
,等水位降下来到低警戒线时再触发channelWritabilityChanged回调函数,就把读事件再给注册上去。
这样可以平衡读写速度,还能防止内存占用过多
