Netty為超時控制封裝了兩個類ReadTimeoutHandler和WriteTimeoutHandler,ReadTimeoutHandler,用於控制讀取數據的時候的超時,如果在設置時間段內都沒有數據讀取了,那么就引發超時,然后關閉當前的channel;WriteTimeoutHandler,用於控制數據輸出的時候的超時,如果在設置時間段內都沒有數據寫了,那么就超時。它們都是IdleStateHandler的子類。
開刀一下ReadTimeoutHandler:
public class ReadTimeoutHandler extends IdleStateHandler { private boolean closed; /** * Creates a new instance. * * @param timeoutSeconds * read timeout in seconds */ public ReadTimeoutHandler(int timeoutSeconds) { this(timeoutSeconds, TimeUnit.SECONDS); } /** * Creates a new instance. * * @param timeout * read timeout * @param unit * the {@link TimeUnit} of {@code timeout} */ public ReadTimeoutHandler(long timeout, TimeUnit unit) { super(timeout, 0, 0, unit); } @Override protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { assert evt.state() == IdleState.READER_IDLE; readTimedOut(ctx); } /** * Is called when a read timeout was detected. */ protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { if (!closed) { ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE); ctx.close(); closed = true; } } }
ReadTimeoutHandler構造函數實際上調用的是父類IdleStateHandler的構造函數,當鏈路激活事件觸發后,會初始化相關定時任務:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // This method will be invoked only if this handler was added // before channelActive() event is fired. If a user adds this handler // after the channelActive() event, initialize() will be called by beforeAdd(). initialize(ctx); super.channelActive(ctx); }
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { return ctx.executor().schedule(task, delay, unit); }
schedule會獲取與該channel綁定的EventLoop來執行定時任務,這里主要看下ReaderIdleTimeoutTask。
private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); //這里相當於再設置一下timeout,確保channel關閉 boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }
當超時發生后,會調用到channelIdle方法,ReadTimeoutHandler重寫(Overriding)了該方法,ReadTimeoutHandler會拋出一個ReadTimeoutException.INSTANCE異常,然后關閉掉對應的ChannelHandlerContext。
那個在自定義的Handler中需要關注exceptionCaught:
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { // do something } else { super.exceptionCaught(ctx, cause); } }
