Netty 心跳處理


傳統的心跳包設計,基本上是服務端和客戶端同時維護 Scheduler,然后雙方互相接收心跳包信息,然后重置雙方的上下線狀態表。此種心跳方式的設計,可以選擇在主線程上進行,也可以選擇在心跳線程中進行,由於在進行業務調用過程中,此種心跳包是沒有必要進行發送的,所以在一定程度上會造成資源浪費。嚴重的甚至會影響業務線程的操作。但是在 Netty 中是通過檢測鏈路的空閑與否在進行的。鏈路分為讀操作空閑,寫操作空閑,讀寫操作空閑。由於空閑檢測本身只有在通道空閑的時候才進行檢測,而不是固定頻率的進行心跳包通訊,所以可以節省網絡帶寬,同時對業務的影響也很小

在 Netty 中空閑檢測需要引入 IdleStateHandler,然后實現自己的心跳處理 Handler,本文中服務端與客戶端均向對方發送心跳包。

一、服務端

1.1 編解碼及 Handler

...

.childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline().addLast("ping", new IdleStateHandler(10, 5, 10));
        channel.pipeline().addLast("encoder", new NettyMessageEncoder());
        channel.pipeline().addLast("decoder", new NettyMessageDecoder());
        channel.pipeline().addLast("message", new MessageHandler());
        channel.pipeline().addLast("heartbeat", new HeartbeatHandler());
    }
});

...

HeartbeatHandler 為 心跳處理 Handler

1.2 心跳處理 Handler

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatHandler.class);

    private final AttributeKey<Integer> counterAttr = AttributeKey.valueOf(ChannelSupervise.COUNTER_ATTR);;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);

        if(evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    NettyMessage<String> nettyMessage = new NettyMessage<>();
                    nettyMessage.setSessionId(0L);
                    nettyMessage.setType(NettyMessageTypeEnum.HEARTBEAT);

                    ctx.writeAndFlush(nettyMessage).addListener(future -> {
//                        if(future.isSuccess()) {
//                            ctx.channel().attr(counterAttr).set(0);
//                        }else {
                            Integer counter = ctx.channel().attr(counterAttr).get();
                            counter = counter + 1;
                            LOGGER.info(ctx.channel().id().asShortText() + ",發送心跳: " + counter);
                            if(counter >= 3) {
                                ctx.close();
                            } else {
                                ctx.channel().attr(counterAttr).set(counter);
                            }
//                        }
                    });
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().attr(counterAttr).set(0);
        ChannelSupervise.addChannel(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ChannelSupervise.removeChannel(ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ctx.channel().attr(counterAttr).set(0);
        ctx.fireChannelRead(msg);
    }



    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("斷開連接", cause);
        ctx.close();
    }
}

本例中,如果服務端連續發送三次心跳包,則認為客戶端斷開連接,使用 Netty 內置的 AttributeKey 計數 (本例中為方便測試注釋掉部分代碼,正常來說如果發送消息成功則證明客戶端還在線,需要把計數重置為 0)。

二、客戶端

2.1 編解碼及 Handler

...

.handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline().addLast("ping", new IdleStateHandler(5, 5, 3));
        channel.pipeline().addLast("encoder", new NettyMessageEncoder());
        channel.pipeline().addLast("decoder", new NettyMessageDecoder());
        channel.pipeline().addLast("heartbeat", new HeartbeatHandler());
        channel.pipeline().addLast("logger", new LoggingHandler(LogLevel.INFO));
    }
});

...

HeartbeatHandler 為 心跳處理 Handler

2.2 心跳處理 Handler

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if(evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;

            switch (idleStateEvent.state()){
                case WRITER_IDLE:
                    LOGGER.info("發送心跳包");
                    NettyMessage<String> nettyMessage = new NettyMessage<>();
                    nettyMessage.setSessionId(0L);
                    nettyMessage.setType(NettyMessageTypeEnum.HEARTBEAT);
                    ctx.writeAndFlush(nettyMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    break;
                default:
                    break;
            }
        }

        super.userEventTriggered(ctx, evt);
    }

    ...
}

本例中,如果客戶端發送心跳消息失敗則斷開連接。

參考

  1. Netty(一) SpringBoot 整合長連接心跳機制TCP-Heartbeat/)
  2. 微言Netty:分布式服務框架

完整代碼:GitHub


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM