作者:tomas家的小拨浪鼓
链接:https://www.jianshu.com/p/8fe70d313d78
来源:简书
本文是笔者和朋友(笔名:oojeek)一起讨论该问题的一个记录。文章以讨论过程中的思路来展现(也是我们解决问题的思路路线),因此可能会有些乱。再者,如果对Netty写数据流程不了解的朋友,可以先阅读Netty 源码解析 ——— writeAndFlush流程分析该篇文章,下面的讨论中会涉及不少这篇文章提及的概念。
问题
起因是这样的,朋友倒腾了个发送大数据包的demo,结果发现在发送大数据包时,写空闲超时事件被触发了。即便在设置了IdleStateHandler的observeOutput属性为true的情况下,依旧会发送在写一个大数据包的过程中,写空闲超时事件被触发。
先来简单看看朋友的demo,我们来看几个关键类
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("idleStateHandler", new IdleStateHandler(true,9, 2, 11, TimeUnit.SECONDS)); pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 0, 4, 0, 4, true)); pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyClientHandler()); } }
我们定义了一个IdleStateHandler,并且设置了observeOutput属性为true(即,第一个参数),以及设置了写空闲超时时间为2秒(即,第二个参数)。
public class MyClientHandler extends SimpleChannelInboundHandler<String> { private String tempString; public MyClientHandler() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 1024 * 1024; i++) { builder.append("abcdefghijklmnopqrstuvwxyz"); } tempString = builder.toString(); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(LocalDateTime.now().toString() + "----" + ctx.channel().remoteAddress().toString() + "----" + msg.length()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { sendData(ctx); } private void sendData(ChannelHandlerContext ctx) { if (!ctx.channel().isActive()) { System.out.println("channel inactive..."); ctx.close(); return; } System.out.println("send a pack of data ..."); long tickCount = System.currentTimeMillis(); ChannelFuture future = ctx.writeAndFlush(tempString); ChannelPromise promise = (ChannelPromise)future; promise.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println("send completed"); sendData(ctx); } }); System.out.println("Time elapse:" + (System.currentTimeMillis(