Netty生產級的心跳和重連機制


今天研究的是,心跳和重連,雖然這次是大神寫的代碼,但是萬變不離其宗,我們先回顧一下Netty應用心跳和重連的整個過程:

1)客戶端連接服務端

2)在客戶端的的ChannelPipeline中加入一個比較特殊的IdleStateHandler,設置一下客戶端的寫空閑時間,例如5s

3)當客戶端的所有ChannelHandler中4s內沒有write事件,則會觸發userEventTriggered方法(上文介紹過)

4)我們在客戶端的userEventTriggered中對應的觸發事件下發送一個心跳包給服務端,檢測服務端是否還存活,防止服務端已經宕機,客戶端還不知道

5)同樣,服務端要對心跳包做出響應,其實給客戶端最好的回復就是“不回復”,這樣可以服務端的壓力,假如有10w個空閑Idle的連接,那么服務端光發送心跳回復,則也是費事的事情,那么怎么才能告訴客戶端它還活着呢,其實很簡單,因為5s服務端都會收到來自客戶端的心跳信息,那么如果10秒內收不到,服務端可以認為客戶端掛了,可以close鏈路

6)加入服務端因為什么因素導致宕機的話,就會關閉所有的鏈路鏈接,所以作為客戶端要做的事情就是短線重連

 

以上描述的就是整個心跳和重連的整個過程,雖然很簡單,上一篇blog也寫了一個Demo,簡單地做了一下上述功能

 

要寫工業級的Netty心跳重連的代碼,需要解決一下幾個問題:

1)ChannelPipeline中的ChannelHandlers的維護,首次連接和重連都需要對ChannelHandlers進行管理

2)重連對象的管理,也就是bootstrap對象的管理

3)重連機制編寫

 

完整的代碼:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle

 

下面我們就看大神是如何解決這些問題的,首先先定義一個接口ChannelHandlerHolder,用來保管ChannelPipeline中的Handlers的

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.channel.ChannelHandler;  
  4.   
  5. /** 
  6.  *  
  7.  * 客戶端的ChannelHandler集合,由子類實現,這樣做的好處: 
  8.  * 繼承這個接口的所有子類可以很方便地獲取ChannelPipeline中的Handlers 
  9.  * 獲取到handlers之后方便ChannelPipeline中的handler的初始化和在重連的時候也能很方便 
  10.  * 地獲取所有的handlers 
  11.  */  
  12. public interface ChannelHandlerHolder {  
  13.   
  14.     ChannelHandler[] handlers();  
  15. }  

我們再來編寫我們熟悉的服務端的ServerBootstrap的編寫:

 

HeartBeatServer.java

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.bootstrap.ServerBootstrap;  
  4. import io.netty.channel.ChannelFuture;  
  5. import io.netty.channel.ChannelInitializer;  
  6. import io.netty.channel.ChannelOption;  
  7. import io.netty.channel.EventLoopGroup;  
  8. import io.netty.channel.nio.NioEventLoopGroup;  
  9. import io.netty.channel.socket.SocketChannel;  
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
  11. import io.netty.handler.codec.string.StringDecoder;  
  12. import io.netty.handler.codec.string.StringEncoder;  
  13. import io.netty.handler.logging.LogLevel;  
  14. import io.netty.handler.logging.LoggingHandler;  
  15. import io.netty.handler.timeout.IdleStateHandler;  
  16.   
  17. import java.net.InetSocketAddress;  
  18. import java.util.concurrent.TimeUnit;  
  19.   
  20. public class HeartBeatServer {  
  21.       
  22.     private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();  
  23.       
  24.     private int port;  
  25.   
  26.     public HeartBeatServer(int port) {  
  27.         this.port = port;  
  28.     }  
  29.   
  30.     public void start() {  
  31.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);  
  32.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  33.         try {  
  34.             ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)  
  35.                     .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))  
  36.                     .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {  
  37.                         protected void initChannel(SocketChannel ch) throws Exception {  
  38.                             ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));  
  39.                             ch.pipeline().addLast(idleStateTrigger);  
  40.                             ch.pipeline().addLast("decoder", new StringDecoder());  
  41.                             ch.pipeline().addLast("encoder", new StringEncoder());  
  42.                             ch.pipeline().addLast(new HeartBeatServerHandler());  
  43.                         };  
  44.   
  45.                     }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);  
  46.             // 綁定端口,開始接收進來的連接  
  47.             ChannelFuture future = sbs.bind(port).sync();  
  48.   
  49.             System.out.println("Server start listen at " + port);  
  50.             future.channel().closeFuture().sync();  
  51.         } catch (Exception e) {  
  52.             bossGroup.shutdownGracefully();  
  53.             workerGroup.shutdownGracefully();  
  54.         }  
  55.     }  
  56.   
  57.     public static void main(String[] args) throws Exception {  
  58.         int port;  
  59.         if (args.length > 0) {  
  60.             port = Integer.parseInt(args[0]);  
  61.         } else {  
  62.             port = 8080;  
  63.         }  
  64.         new HeartBeatServer(port).start();  
  65.     }  
  66.   
  67. }  

單獨寫一個AcceptorIdleStateTrigger,其實也是繼承ChannelInboundHandlerAdapter,重寫userEventTriggered方法,因為客戶端是write,那么服務端自然是read,設置的狀態就是IdleState.READER_IDLE,源碼如下:

 

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.channel.ChannelHandler;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.channel.ChannelInboundHandlerAdapter;  
  6. import io.netty.handler.timeout.IdleState;  
  7. import io.netty.handler.timeout.IdleStateEvent;  
  8.   
  9.   
  10. @ChannelHandler.Sharable  
  11. public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {  
  12.   
  13.     @Override  
  14.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  15.         if (evt instanceof IdleStateEvent) {  
  16.             IdleState state = ((IdleStateEvent) evt).state();  
  17.             if (state == IdleState.READER_IDLE) {  
  18.                 throw new Exception("idle exception");  
  19.             }  
  20.         } else {  
  21.             super.userEventTriggered(ctx, evt);  
  22.         }  
  23.     }  
  24. }  

HeartBeatServerHandler就是一個很簡單的自定義的Handler,不是重點:

 

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.channel.ChannelHandlerContext;  
  4. import io.netty.channel.ChannelInboundHandlerAdapter;  
  5.   
  6. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {  
  7.   
  8.   
  9.     @Override  
  10.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
  11.         System.out.println("server channelRead..");  
  12.         System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());  
  13.     }  
  14.   
  15.     @Override  
  16.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
  17.         cause.printStackTrace();  
  18.         ctx.close();  
  19.     }  
  20.   
  21. }  

接下來就是重點,我們需要寫一個類,這個類可以去觀察鏈路是否斷了,如果斷了,進行循環的斷線重連操作,ConnectionWatchdog,顧名思義,鏈路檢測狗,我們先看完整代碼:

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.bootstrap.Bootstrap;  
  4. import io.netty.channel.Channel;  
  5. import io.netty.channel.ChannelFuture;  
  6. import io.netty.channel.ChannelFutureListener;  
  7. import io.netty.channel.ChannelHandler.Sharable;  
  8. import io.netty.channel.ChannelHandlerContext;  
  9. import io.netty.channel.ChannelInboundHandlerAdapter;  
  10. import io.netty.channel.ChannelInitializer;  
  11. import io.netty.util.Timeout;  
  12. import io.netty.util.Timer;  
  13. import io.netty.util.TimerTask;  
  14.   
  15. import java.util.concurrent.TimeUnit;  
  16.   
  17. /** 
  18.  *  
  19.  * 重連檢測狗,當發現當前的鏈路不穩定關閉之后,進行12次重連 
  20.  */  
  21. @Sharable  
  22. public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{  
  23.       
  24.       
  25.       
  26.     private final Bootstrap bootstrap;  
  27.     private final Timer timer;  
  28.     private final int port;  
  29.       
  30.     private final String host;  
  31.   
  32.     private volatile boolean reconnect = true;  
  33.     private int attempts;  
  34.       
  35.       
  36.     public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {  
  37.         this.bootstrap = bootstrap;  
  38.         this.timer = timer;  
  39.         this.port = port;  
  40.         this.host = host;  
  41.         this.reconnect = reconnect;  
  42.     }  
  43.       
  44.     /** 
  45.      * channel鏈路每次active的時候,將其連接的次數重新☞ 0 
  46.      */  
  47.     @Override  
  48.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  49.           
  50.         System.out.println("當前鏈路已經激活了,重連嘗試次數重新置為0");  
  51.           
  52.         attempts = 0;  
  53.         ctx.fireChannelActive();  
  54.     }  
  55.       
  56.     @Override  
  57.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
  58.         System.out.println("鏈接關閉");  
  59.         if(reconnect){  
  60.             System.out.println("鏈接關閉,將進行重連");  
  61.             if (attempts < 12) {  
  62.                 attempts++;  
  63.                 //重連的間隔時間會越來越長  
  64.                 int timeout = 2 << attempts;  
  65.                 timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);  
  66.             }  
  67.         }  
  68.         ctx.fireChannelInactive();  
  69.     }  
  70.       
  71.   
  72.     public void run(Timeout timeout) throws Exception {  
  73.           
  74.         ChannelFuture future;  
  75.         //bootstrap已經初始化好了,只需要將handler填入就可以了  
  76.         synchronized (bootstrap) {  
  77.             bootstrap.handler(new ChannelInitializer<Channel>() {  
  78.   
  79.                 @Override  
  80.                 protected void initChannel(Channel ch) throws Exception {  
  81.                       
  82.                     ch.pipeline().addLast(handlers());  
  83.                 }  
  84.             });  
  85.             future = bootstrap.connect(host,port);  
  86.         }  
  87.         //future對象  
  88.         future.addListener(new ChannelFutureListener() {  
  89.   
  90.             public void operationComplete(ChannelFuture f) throws Exception {  
  91.                 boolean succeed = f.isSuccess();  
  92.   
  93.                 //如果重連失敗,則調用ChannelInactive方法,再次出發重連事件,一直嘗試12次,如果失敗則不再重連  
  94.                 if (!succeed) {  
  95.                     System.out.println("重連失敗");  
  96.                     f.channel().pipeline().fireChannelInactive();  
  97.                 }else{  
  98.                     System.out.println("重連成功");  
  99.                 }  
  100.             }  
  101.         });  
  102.           
  103.     }  
  104.   
  105. }  



 

 
        

稍微分析一下:

 

1)繼承了ChannelInboundHandlerAdapter,說明它也是Handler,也對,作為一個檢測對象,肯定會放在鏈路中,否則怎么檢測

2)實現了2個接口,TimeTask,ChannelHandlerHolder

   ①TimeTask,我們就要寫run方法,這應該是一個定時任務,這個定時任務做的事情應該是重連的工作

   ②ChannelHandlerHolder的接口,這個接口我們剛才說過是維護的所有的Handlers,因為在重連的時候需要獲取Handlers

3)bootstrap對象,重連的時候依舊需要這個對象

4)當鏈路斷開的時候會觸發channelInactive這個方法,也就說觸發重連的導火索是從這邊開始的

 

好了,我們這邊再寫次核心的HeartBeatsClient的代碼:

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.bootstrap.Bootstrap;  
  4. import io.netty.channel.Channel;  
  5. import io.netty.channel.ChannelFuture;  
  6. import io.netty.channel.ChannelHandler;  
  7. import io.netty.channel.ChannelInitializer;  
  8. import io.netty.channel.EventLoopGroup;  
  9. import io.netty.channel.nio.NioEventLoopGroup;  
  10. import io.netty.channel.socket.nio.NioSocketChannel;  
  11. import io.netty.handler.codec.string.StringDecoder;  
  12. import io.netty.handler.codec.string.StringEncoder;  
  13. import io.netty.handler.logging.LogLevel;  
  14. import io.netty.handler.logging.LoggingHandler;  
  15. import io.netty.handler.timeout.IdleStateHandler;  
  16. import io.netty.util.HashedWheelTimer;  
  17.   
  18. import java.util.concurrent.TimeUnit;  
  19.   
  20. public class HeartBeatsClient {  
  21.       
  22.     protected final HashedWheelTimer timer = new HashedWheelTimer();  
  23.       
  24.     private Bootstrap boot;  
  25.       
  26.     private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();  
  27.   
  28.     public void connect(int port, String host) throws Exception {  
  29.           
  30.         EventLoopGroup group = new NioEventLoopGroup();    
  31.           
  32.         boot = new Bootstrap();  
  33.         boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));  
  34.               
  35.         final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {  
  36.   
  37.                 public ChannelHandler[] handlers() {  
  38.                     return new ChannelHandler[] {  
  39.                             this,  
  40.                             new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),  
  41.                             idleStateTrigger,  
  42.                             new StringDecoder(),  
  43.                             new StringEncoder(),  
  44.                             new HeartBeatClientHandler()  
  45.                     };  
  46.                 }  
  47.             };  
  48.               
  49.             ChannelFuture future;  
  50.             //進行連接  
  51.             try {  
  52.                 synchronized (boot) {  
  53.                     boot.handler(new ChannelInitializer<Channel>() {  
  54.   
  55.                         //初始化channel  
  56.                         @Override  
  57.                         protected void initChannel(Channel ch) throws Exception {  
  58.                             ch.pipeline().addLast(watchdog.handlers());  
  59.                         }  
  60.                     });  
  61.   
  62.                     future = boot.connect(host,port);  
  63.                 }  
  64.   
  65.                 // 以下代碼在synchronized同步塊外面是安全的  
  66.                 future.sync();  
  67.             } catch (Throwable t) {  
  68.                 throw new Exception("connects to  fails", t);  
  69.             }  
  70.     }  
  71.   
  72.     /** 
  73.      * @param args 
  74.      * @throws Exception 
  75.      */  
  76.     public static void main(String[] args) throws Exception {  
  77.         int port = 8080;  
  78.         if (args != null && args.length > 0) {  
  79.             try {  
  80.                 port = Integer.valueOf(args[0]);  
  81.             } catch (NumberFormatException e) {  
  82.                 // 采用默認值  
  83.             }  
  84.         }  
  85.         new HeartBeatsClient().connect(port, "127.0.0.1");  
  86.     }  
  87.   
  88. }  

也稍微說明一下:

 

1)創建了ConnectionWatchdog對象,自然要實現handlers方法

2)初始化好bootstrap對象

3)4秒內沒有寫操作,進行心跳觸發,也就是IdleStateHandler這個方法

 

最后ConnectorIdleStateTrigger這個類

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.buffer.ByteBuf;  
  4. import io.netty.buffer.Unpooled;  
  5. import io.netty.channel.ChannelHandler.Sharable;  
  6. import io.netty.channel.ChannelHandlerContext;  
  7. import io.netty.channel.ChannelInboundHandlerAdapter;  
  8. import io.netty.handler.timeout.IdleState;  
  9. import io.netty.handler.timeout.IdleStateEvent;  
  10. import io.netty.util.CharsetUtil;  
  11.   
  12. @Sharable  
  13. public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {  
  14.       
  15.     private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",  
  16.             CharsetUtil.UTF_8));  
  17.   
  18.     @Override  
  19.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  20.         if (evt instanceof IdleStateEvent) {  
  21.             IdleState state = ((IdleStateEvent) evt).state();  
  22.             if (state == IdleState.WRITER_IDLE) {  
  23.                 // write heartbeat to server  
  24.                 ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());  
  25.             }  
  26.         } else {  
  27.             super.userEventTriggered(ctx, evt);  
  28.         }  
  29.     }  
  30. }  

HeartBeatClientHandler.java(不是重點)

 

 

[java]  view plain  copy
 
  1. package com.lyncc.netty.idle;  
  2.   
  3. import io.netty.channel.ChannelHandler.Sharable;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.channel.ChannelInboundHandlerAdapter;  
  6. import io.netty.util.ReferenceCountUtil;  
  7.   
  8. import java.util.Date;  
  9.   
  10. @Sharable  
  11. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {  
  12.   
  13.       
  14.     @Override  
  15.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  16.         System.out.println("激活時間是:"+new Date());  
  17.         System.out.println("HeartBeatClientHandler channelActive");  
  18.         ctx.fireChannelActive();  
  19.     }  
  20.   
  21.     @Override  
  22.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
  23.         System.out.println("停止時間是:"+new Date());  
  24.         System.out.println("HeartBeatClientHandler channelInactive");  
  25.     }  
  26.   
  27.   
  28.     @Override  
  29.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
  30.         String message = (String) msg;  
  31.         System.out.println(message);  
  32.         if (message.equals("Heartbeat")) {  
  33.             ctx.write("has read message from server");  
  34.             ctx.flush();  
  35.         }  
  36.         ReferenceCountUtil.release(msg);  
  37.     }  
  38. }  

 

 

好了,到此為止,所有的代碼都貼完了,我們做一個簡單的測試,按照常理,如果不出任何狀況的話,客戶端4秒發送心跳,服務端5秒才驗證是不會斷連的,所以我們在啟動之后,關閉服務端,然后再次重啟服務端

首先啟動服務端,控制台如下:

啟動客戶端,控制台如下:

客戶端啟動之后,服務端的控制台:

關閉服務端后,客戶端控制台:

重啟啟動服務端:

重連成功~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM