一、前言
心跳機制是定時發送一個自定義的結構體(心跳包),讓對方知道自己還活着,以確保連接的有效性的機制。
我們用到的很多框架都用到了心跳檢測,比如服務注冊到 Eureka Server 之后會維護一個心跳連接,告訴 Eureka Server 自己還活着。本文就是利用 Netty 來實現心跳檢測,以及客戶端重連。
二、設計思路
- 分為客戶端和服務端
- 建立連接后,客戶端先發送一個消息詢問服務端是否可以進行通信了。
- 客戶端收到服務端 Yes 的應答后,主動發送心跳消息,服務端接收到心跳消息后,返回心跳應答,周而復始。
- 心跳超時利用 Netty 的 ReadTimeOutHandler 機制,當一定周期內(默認值50s)沒有讀取到對方任何消息時,需要主動關閉鏈路。如果是客戶端,重新發起連接。
- 為了避免出現粘/拆包問題,使用 DelimiterBasedFrameDecoder 和 StringDecoder 來處理消息。
三、編碼
- 先編寫客戶端 NettyClient
- public class NettyClient {
- private static final String HOST = "127.0.0.1";
- private static final int PORT = 9911;
- private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- EventLoopGroup group = new NioEventLoopGroup();
- private void connect(String host,int port){
- try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY,true)
- .remoteAddress(new InetSocketAddress(host,port))
- .handler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8);
- ch.pipeline()
- .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
- .addLast(new StringDecoder())
- // 當一定周期內(默認50s)沒有收到對方任何消息時,需要主動關閉鏈接
- .addLast("readTimeOutHandler",new ReadTimeoutHandler(50))
- .addLast("heartBeatHandler",new HeartBeatReqHandler());
- }
- });
- // 發起異步連接操作
- ChannelFuture future = b.connect().sync();
- future.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- // 所有資源釋放完之后,清空資源,再次發起重連操作
- executor.execute(()->{
- try {
- TimeUnit.SECONDS.sleep(5);
- //發起重連操作
- connect(NettyClient.HOST,NettyClient.PORT);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- }
- }
- public static void main(String[] args) {
- new NettyClient().connect(NettyClient.HOST,NettyClient.PORT);
- }
- }
這里稍微復雜點的就是38行開始的重連部分。
2. 心跳消息發送類 HeartBeatReqHandler
- package cn.sp.heartbeat;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
- /**
- * Created by 2YSP on 2019/5/23.
- */
- .Sharable
- public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String> {
- private volatile ScheduledFuture<?> heartBeat;
- private static final String hello = "start notify with server$_";
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes()));
- System.out.println("================");
- }
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (heartBeat != null){
- heartBeat.cancel(true);
- heartBeat = null;
- }
- ctx.fireExceptionCaught(cause);
- }
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- if ("ok".equalsIgnoreCase(msg)){
- //服務端返回ok開始心跳
- heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS);
- }else {
- System.out.println("Client receive server heart beat message : --->"+msg);
- }
- }
- private class HeartBeatTask implements Runnable{
- private final ChannelHandlerContext ctx;
- public HeartBeatTask(ChannelHandlerContext ctx){
- this.ctx = ctx;
- }
- public void run() {
- String heartBeat = "I am ok";
- System.out.println("Client send heart beat message to server: ----->"+heartBeat);
- ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes()));
- }
- }
- }
channelActive()方法在首次建立連接后向服務端問好,如果服務端返回了 "ok" 就創建一個線程每隔5秒發送一次心跳消息。如果發生了異常,就取消定時任務並將其設置為 null,等待 GC 回收。
3. 服務端 NettyServer
- public class NettyServer {
- public static void main(String[] args) {
- new NettyServer().bind(9911);
- }
- private void bind(int port){
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(group)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
- ch.pipeline()
- .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
- .addLast(new StringDecoder())
- .addLast("readTimeOutHandler",new ReadTimeoutHandler(50))
- .addLast("HeartBeatHandler",new HeartBeatRespHandler());
- }
- });
- // 綁定端口,同步等待成功
- b.bind(port).sync();
- System.out.println("Netty Server start ok ....");
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- 心跳響應類 HeartBeatRespHandler
- package cn.sp.heartbeat;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * Created by 2YSP on 2019/5/23.
- */
- .Sharable
- public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String> {
- private static final String resp = "I have received successfully$_";
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- if (msg.equals("start notify with server")){
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes()));
- }else {
- //返回心跳應答信息
- System.out.println("Receive client heart beat message: ---->"+ msg);
- ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
- }
- }
- }
第一次告訴客戶端我已經准備好了,后面打印客戶端發過來的信息並告訴客戶端我已經收到你的消息了。
四、測試
啟動服務端再啟動客戶端,可以看到心跳檢測正常,如下圖。

服務端控制台

客戶端控制台
現在讓服務端宕機一段時間,看客戶端能否重連並開始正常工作。
關閉服務端后,客戶端周期性的連接失敗,控制台輸出如圖:

連接失敗
重新啟動服務端,過一會兒發現重連成功了。

成功重連
五、總結
總得來說,使用 Netty 實現心跳檢測還是比較簡單的,這里比較懶沒有使用其他序列化協議(如 ProtoBuf 等),如果感興趣的話大家可以自己試試。
代碼地址,點擊這里。
有篇SpringBoot 整合長連接心跳機制的文章寫的也很不錯,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/