基於Netty的IdleStateHandler實現Mqtt心跳


基於Netty的IdleStateHandler實現Mqtt心跳

IdleStateHandler解析

最近研究jetlinks編寫的基於Nettymqtt-client(https://github.com/jetlinks/netty-mqtt-client),總結若干知識點.
Netty中,實現心跳機制較為簡單,主要依賴於IdleStateHandler判斷channel的讀寫超時.

    /**
     * Creates a new instance firing {@link IdleStateEvent}s.
     *
     * @param readerIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
     *        will be triggered when no read was performed for the specified
     *        period of time.  Specify {@code 0} to disable.
     * @param writerIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
     *        will be triggered when no write was performed for the specified
     *        period of time.  Specify {@code 0} to disable.
     * @param allIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
     *        will be triggered when neither read nor write was performed for
     *        the specified period of time.  Specify {@code 0} to disable.
     */
    public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

以上是IdleStateHandler的構造函數,主要依賴於三個參數readerIdleTimeSeconds,writerIdleTimeSeconds以及allIdleTimeSeconds.

如果難於理解英文注釋,可參考<<淺析 Netty 實現心跳機制與斷線重連>>https://segmentfault.com/a/1190000006931568一文中的解釋:

  • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds, 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.

IdleStateHandler中,分別通過如下函數實現對channel讀寫操作事件的跟蹤:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // Allow writing with void promise if handler is only configured for read timeout events.
        if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            ctx.write(msg, promise.unvoid()).addListener(writeListener);
        } else {
            ctx.write(msg, promise);
        }
    }

    // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
    private final ChannelFutureListener writeListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            lastWriteTime = ticksInNanos();
            firstWriterIdleEvent = firstAllIdleEvent = true;
        }
    };

其中:

  • channelRead: 判斷channel是否有數據可讀取;
  • channelReadComplete: 判斷channel是否有數據可讀取;
  • write: 判斷channel是否有數據寫(通過writeListener判斷當前寫操作是否執行成功).

IdleStateHandlerchannel激活或注冊時,會執行initialize函數,根據讀寫超時時間創建對應的定時任務.

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Initialize early if channel is active already.
        if (ctx.channel().isActive()) {
            initialize(ctx);
        }
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }

        private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            // 創建讀超時判斷定時任務
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            // 創建寫超時判斷定時任務
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            // 創建讀寫超時判斷定時任務
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

此處,我們將剖析AllIdleTimeoutTask任務.
此任務,會判斷在超時時間段內,是否有讀寫操作:

  • 有讀或者寫操作,則重新創建定時任務,等待下次執行;
  • 沒有讀或者寫操作,則創建IdleStateEvent對象,通過ChannelHandlerContext通知注冊了用戶事件觸發器的handler(即handler重載了userEventTriggered函數).
  private final class AllIdleTimeoutTask extends AbstractIdleTask {

        AllIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                // Both reader and writer are idle - set a new timeout and
                // notify the callback.
                allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstAllIdleEvent;
                firstAllIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Either read or write occurred before the timeout - set a new
                // timeout with shorter delay.
                allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

了解了IdleStateHandler,我們接下來學習如何編寫Mqtt的心跳handler.

Mqtt心跳handler

以下是jetlinks編寫的Mqtt心跳handler代碼,我們截取部分代碼學習.

final class MqttPingHandler extends ChannelInboundHandlerAdapter {

    private final int keepaliveSeconds;

    private ScheduledFuture<?> pingRespTimeout;

    MqttPingHandler(int keepaliveSeconds) {
        this.keepaliveSeconds = keepaliveSeconds;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof MqttMessage)) {
            ctx.fireChannelRead(msg);
            return;
        }
        MqttMessage message = (MqttMessage) msg;
        if (message.fixedHeader().messageType() == MqttMessageType.PINGREQ) {
            this.handlePingReq(ctx.channel());
        } else if (message.fixedHeader().messageType() == MqttMessageType.PINGRESP) {
            this.handlePingResp();
        } else {
            ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
        }
    }

    /**
     * IdleStateHandler,在連接處於idle狀態超過設定時間后,會發送IdleStateEvent
     * 接收到IdleStateEvent,當前類會發送心跳包至server,保持連接
     *
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception 異常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);

        // 確認監聽事件為IdleStateEvent,即發送心跳包至server
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                this.sendPingReq(ctx.channel());
            }
        }
    }

    /**
     * 發送心跳包至server端,並建立心跳超時斷開連接任務
     * 此處,先行創建心跳超時任務,后續再發送心跳包(避免收到心跳響應時,心跳超時任務未建立完成)
     *
     * @param channel 連接
     */
    private void sendPingReq(Channel channel) {

        // 創建心跳超時,斷開連接任務
        if (this.pingRespTimeout == null) {
            this.pingRespTimeout = channel.eventLoop().schedule(() -> {
                MqttFixedHeader disconnectHeader =
                        new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
                channel.writeAndFlush(new MqttMessage(disconnectHeader)).addListener(ChannelFutureListener.CLOSE);
                //TODO: what do when the connection is closed ?
            }, this.keepaliveSeconds, TimeUnit.SECONDS);
        }

        // 創建心跳包,並發送至Mqtts Server
        MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
        channel.writeAndFlush(new MqttMessage(pingHeader));
    }

    /**
     * 處理ping resp,取消ping超時任務(斷開連接)
     */
    private void handlePingResp() {
        if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
            this.pingRespTimeout.cancel(true);
            this.pingRespTimeout = null;
        }
    }
}

函數解析:

(1) 接收超時事件,發送心跳請求

MqttPingHandler中重載了userEventTriggered函數,用以接收ChannelHandlerContext傳遞的事件,代碼中會判斷事件是否為IdleStateEvent.
如果當前接收事件為IdleStateEvent,則說明當前channel在超時時間內未發生讀寫事件,則客戶端發送Mqtt心跳請求.

(2) 發送心跳請求,建立請求響應超時關閉連接任務

sendPingReq函數中(以下兩步操作,順序可任意安排):

  • 建立心跳請求響應超時判斷任務,如果在一定時長內未接收到心跳響應,則會關閉連接;
  • 構建Mqtt心跳包,發送至遠端服務器.

(3) 取消心跳響應超時關閉連接任務

channelRead讀取數據,判斷是否是Mqtt的心跳響應包.
如果是,則執行handlePingResp函數,取消心跳響應超時關閉連接任務.

handler添加

    ch.pipeline().addLast("idleStateHandler",
        new IdleStateHandler(keepAliveTimeSeconds, keepAliveTimeSeconds, 0));
    ch.pipeline().addLast("mqttPingHandler",
        new MqttPingHandler(MqttClientImpl.this.clientConfig.getKeepAliveTimeSeconds()));

只需要以上兩句代碼,就可以完成Mqtt心跳維持功能.

PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!
程序員打怪之路


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM