Netty超時控制handler


   Netty為超時控制封裝了兩個類ReadTimeoutHandler和WriteTimeoutHandler,ReadTimeoutHandler,用於控制讀取數據的時候的超時,如果在設置時間段內都沒有數據讀取了,那么就引發超時,然后關閉當前的channel;WriteTimeoutHandler,用於控制數據輸出的時候的超時,如果在設置時間段內都沒有數據寫了,那么就超時。它們都是IdleStateHandler的子類。
 開刀一下ReadTimeoutHandler:

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;
 
    /**
     * Creates a new instance.
     *
     * @param timeoutSeconds
     *        read timeout in seconds
     */
    public ReadTimeoutHandler(int timeoutSeconds) {
        this(timeoutSeconds, TimeUnit.SECONDS);
    }
 
    /**
     * Creates a new instance.
     *
     * @param timeout
     *        read timeout
     * @param unit
     *        the {@link TimeUnit} of {@code timeout}
     */
    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0, 0, unit);
    }
 
    @Override
    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;
        readTimedOut(ctx);
    }
 
    /**
     * Is called when a read timeout was detected.
     */
    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }
}

  ReadTimeoutHandler構造函數實際上調用的是父類IdleStateHandler的構造函數,當鏈路激活事件觸發后,會初始化相關定時任務:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }
    private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }
 
        state = 1;
        initOutputChanged(ctx);
 
        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }
    ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        return ctx.executor().schedule(task, delay, unit);
    }

  schedule會獲取與該channel綁定的EventLoop來執行定時任務,這里主要看下ReaderIdleTimeoutTask。

    private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
 
        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }
 
        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - lastReadTime;
            }
 
            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); //這里相當於再設置一下timeout,確保channel關閉
 
                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;
 
                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

 當超時發生后,會調用到channelIdle方法,ReadTimeoutHandler重寫(Overriding)了該方法,ReadTimeoutHandler會拋出一個ReadTimeoutException.INSTANCE異常,然后關閉掉對應的ChannelHandlerContext。
 那個在自定義的Handler中需要關注exceptionCaught:

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ReadTimeoutException) {
            // do something
        } else {
            super.exceptionCaught(ctx, cause);
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM