作者:tomas家的小撥浪鼓
鏈接:https://www.jianshu.com/p/8fe70d313d78
來源:簡書
本文是筆者和朋友(筆名:oojeek)一起討論該問題的一個記錄。文章以討論過程中的思路來展現(也是我們解決問題的思路路線),因此可能會有些亂。再者,如果對Netty寫數據流程不了解的朋友,可以先閱讀Netty 源碼解析 ——— writeAndFlush流程分析該篇文章,下面的討論中會涉及不少這篇文章提及的概念。
問題
起因是這樣的,朋友倒騰了個發送大數據包的demo,結果發現在發送大數據包時,寫空閑超時事件被觸發了。即便在設置了IdleStateHandler的observeOutput屬性為true的情況下,依舊會發送在寫一個大數據包的過程中,寫空閑超時事件被觸發。
先來簡單看看朋友的demo,我們來看幾個關鍵類
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("idleStateHandler", new IdleStateHandler(true,9, 2, 11, TimeUnit.SECONDS)); pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 0, 4, 0, 4, true)); pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyClientHandler()); } }
我們定義了一個IdleStateHandler,並且設置了observeOutput屬性為true(即,第一個參數),以及設置了寫空閑超時時間為2秒(即,第二個參數)。
public class MyClientHandler extends SimpleChannelInboundHandler<String> { private String tempString; public MyClientHandler() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 1024 * 1024; i++) { builder.append("abcdefghijklmnopqrstuvwxyz"); } tempString = builder.toString(); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(LocalDateTime.now().toString() + "----" + ctx.channel().remoteAddress().toString() + "----" + msg.length()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { sendData(ctx); } private void sendData(ChannelHandlerContext ctx) { if (!ctx.channel().isActive()) { System.out.println("channel inactive..."); ctx.close(); return; } System.out.println("send a pack of data ..."); long tickCount = System.currentTimeMillis(); ChannelFuture future = ctx.writeAndFlush(tempString); ChannelPromise promise = (ChannelPromise)future; promise.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println("send completed"); sendData(ctx); } }); System.out.println("Time elapse:" + (System.currentTimeMillis(