今天研究的是,心跳和重連,雖然這次是大神寫的代碼,但是萬變不離其宗,我們先回顧一下Netty應用心跳和重連的整個過程:
1)客戶端連接服務端
2)在客戶端的的ChannelPipeline中加入一個比較特殊的IdleStateHandler,設置一下客戶端的寫空閑時間,例如5s
3)當客戶端的所有ChannelHandler中4s內沒有write事件,則會觸發userEventTriggered方法(上文介紹過)
4)我們在客戶端的userEventTriggered中對應的觸發事件下發送一個心跳包給服務端,檢測服務端是否還存活,防止服務端已經宕機,客戶端還不知道
5)同樣,服務端要對心跳包做出響應,其實給客戶端最好的回復就是“不回復”,這樣可以服務端的壓力,假如有10w個空閑Idle的連接,那么服務端光發送心跳回復,則也是費事的事情,那么怎么才能告訴客戶端它還活着呢,其實很簡單,因為5s服務端都會收到來自客戶端的心跳信息,那么如果10秒內收不到,服務端可以認為客戶端掛了,可以close鏈路
6)加入服務端因為什么因素導致宕機的話,就會關閉所有的鏈路鏈接,所以作為客戶端要做的事情就是短線重連
以上描述的就是整個心跳和重連的整個過程,雖然很簡單,上一篇blog也寫了一個Demo,簡單地做了一下上述功能
要寫工業級的Netty心跳重連的代碼,需要解決一下幾個問題:
1)ChannelPipeline中的ChannelHandlers的維護,首次連接和重連都需要對ChannelHandlers進行管理
2)重連對象的管理,也就是bootstrap對象的管理
3)重連機制編寫
完整的代碼:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle
下面我們就看大神是如何解決這些問題的,首先先定義一個接口ChannelHandlerHolder,用來保管ChannelPipeline中的Handlers的
- package com.lyncc.netty.idle;
- import io.netty.channel.ChannelHandler;
- /**
- *
- * 客戶端的ChannelHandler集合,由子類實現,這樣做的好處:
- * 繼承這個接口的所有子類可以很方便地獲取ChannelPipeline中的Handlers
- * 獲取到handlers之后方便ChannelPipeline中的handler的初始化和在重連的時候也能很方便
- * 地獲取所有的handlers
- */
- public interface ChannelHandlerHolder {
- ChannelHandler[] handlers();
- }
我們再來編寫我們熟悉的服務端的ServerBootstrap的編寫:
HeartBeatServer.java
- package com.lyncc.netty.idle;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.net.InetSocketAddress;
- import java.util.concurrent.TimeUnit;
- public class HeartBeatServer {
- private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
- private int port;
- public HeartBeatServer(int port) {
- this.port = port;
- }
- public void start() {
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
- .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
- ch.pipeline().addLast(idleStateTrigger);
- ch.pipeline().addLast("decoder", new StringDecoder());
- ch.pipeline().addLast("encoder", new StringEncoder());
- ch.pipeline().addLast(new HeartBeatServerHandler());
- };
- }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
- // 綁定端口,開始接收進來的連接
- ChannelFuture future = sbs.bind(port).sync();
- System.out.println("Server start listen at " + port);
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 8080;
- }
- new HeartBeatServer(port).start();
- }
- }
單獨寫一個AcceptorIdleStateTrigger,其實也是繼承ChannelInboundHandlerAdapter,重寫userEventTriggered方法,因為客戶端是write,那么服務端自然是read,設置的狀態就是IdleState.READER_IDLE,源碼如下:
- package com.lyncc.netty.idle;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- @ChannelHandler.Sharable
- public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleState state = ((IdleStateEvent) evt).state();
- if (state == IdleState.READER_IDLE) {
- throw new Exception("idle exception");
- }
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
- }
HeartBeatServerHandler就是一個很簡單的自定義的Handler,不是重點:
- package com.lyncc.netty.idle;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("server channelRead..");
- System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
接下來就是重點,我們需要寫一個類,這個類可以去觀察鏈路是否斷了,如果斷了,進行循環的斷線重連操作,ConnectionWatchdog,顧名思義,鏈路檢測狗,我們先看完整代碼:
- package com.lyncc.netty.idle;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.channel.ChannelInitializer;
- import io.netty.util.Timeout;
- import io.netty.util.Timer;
- import io.netty.util.TimerTask;
- import java.util.concurrent.TimeUnit;
- /**
- *
- * 重連檢測狗,當發現當前的鏈路不穩定關閉之后,進行12次重連
- */
- @Sharable
- public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
- private final Bootstrap bootstrap;
- private final Timer timer;
- private final int port;
- private final String host;
- private volatile boolean reconnect = true;
- private int attempts;
- public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
- this.bootstrap = bootstrap;
- this.timer = timer;
- this.port = port;
- this.host = host;
- this.reconnect = reconnect;
- }
- /**
- * channel鏈路每次active的時候,將其連接的次數重新☞ 0
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("當前鏈路已經激活了,重連嘗試次數重新置為0");
- attempts = 0;
- ctx.fireChannelActive();
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("鏈接關閉");
- if(reconnect){
- System.out.println("鏈接關閉,將進行重連");
- if (attempts < 12) {
- attempts++;
- //重連的間隔時間會越來越長
- int timeout = 2 << attempts;
- timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
- }
- }
- ctx.fireChannelInactive();
- }
- public void run(Timeout timeout) throws Exception {
- ChannelFuture future;
- //bootstrap已經初始化好了,只需要將handler填入就可以了
- synchronized (bootstrap) {
- bootstrap.handler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(handlers());
- }
- });
- future = bootstrap.connect(host,port);
- }
- //future對象
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture f) throws Exception {
- boolean succeed = f.isSuccess();
- //如果重連失敗,則調用ChannelInactive方法,再次出發重連事件,一直嘗試12次,如果失敗則不再重連
- if (!succeed) {
- System.out.println("重連失敗");
- f.channel().pipeline().fireChannelInactive();
- }else{
- System.out.println("重連成功");
- }
- }
- });
- }
- }
稍微分析一下:
1)繼承了ChannelInboundHandlerAdapter,說明它也是Handler,也對,作為一個檢測對象,肯定會放在鏈路中,否則怎么檢測
2)實現了2個接口,TimeTask,ChannelHandlerHolder
①TimeTask,我們就要寫run方法,這應該是一個定時任務,這個定時任務做的事情應該是重連的工作
②ChannelHandlerHolder的接口,這個接口我們剛才說過是維護的所有的Handlers,因為在重連的時候需要獲取Handlers
3)bootstrap對象,重連的時候依舊需要這個對象
4)當鏈路斷開的時候會觸發channelInactive這個方法,也就說觸發重連的導火索是從這邊開始的
好了,我們這邊再寫次核心的HeartBeatsClient的代碼:
- package com.lyncc.netty.idle;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import io.netty.util.HashedWheelTimer;
- import java.util.concurrent.TimeUnit;
- public class HeartBeatsClient {
- protected final HashedWheelTimer timer = new HashedWheelTimer();
- private Bootstrap boot;
- private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();
- public void connect(int port, String host) throws Exception {
- EventLoopGroup group = new NioEventLoopGroup();
- boot = new Bootstrap();
- boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
- final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {
- public ChannelHandler[] handlers() {
- return new ChannelHandler[] {
- this,
- new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
- idleStateTrigger,
- new StringDecoder(),
- new StringEncoder(),
- new HeartBeatClientHandler()
- };
- }
- };
- ChannelFuture future;
- //進行連接
- try {
- synchronized (boot) {
- boot.handler(new ChannelInitializer<Channel>() {
- //初始化channel
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(watchdog.handlers());
- }
- });
- future = boot.connect(host,port);
- }
- // 以下代碼在synchronized同步塊外面是安全的
- future.sync();
- } catch (Throwable t) {
- throw new Exception("connects to fails", t);
- }
- }
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默認值
- }
- }
- new HeartBeatsClient().connect(port, "127.0.0.1");
- }
- }
也稍微說明一下:
1)創建了ConnectionWatchdog對象,自然要實現handlers方法
2)初始化好bootstrap對象
3)4秒內沒有寫操作,進行心跳觸發,也就是IdleStateHandler這個方法
最后ConnectorIdleStateTrigger這個類
- package com.lyncc.netty.idle;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import io.netty.util.CharsetUtil;
- @Sharable
- public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
- private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
- CharsetUtil.UTF_8));
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleState state = ((IdleStateEvent) evt).state();
- if (state == IdleState.WRITER_IDLE) {
- // write heartbeat to server
- ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
- }
- } else {
- super.userEventTriggered(ctx, evt);
- }
- }
- }
HeartBeatClientHandler.java(不是重點)
- package com.lyncc.netty.idle;
- import io.netty.channel.ChannelHandler.Sharable;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.util.ReferenceCountUtil;
- import java.util.Date;
- @Sharable
- public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("激活時間是:"+new Date());
- System.out.println("HeartBeatClientHandler channelActive");
- ctx.fireChannelActive();
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("停止時間是:"+new Date());
- System.out.println("HeartBeatClientHandler channelInactive");
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- String message = (String) msg;
- System.out.println(message);
- if (message.equals("Heartbeat")) {
- ctx.write("has read message from server");
- ctx.flush();
- }
- ReferenceCountUtil.release(msg);
- }
- }
好了,到此為止,所有的代碼都貼完了,我們做一個簡單的測試,按照常理,如果不出任何狀況的話,客戶端4秒發送心跳,服務端5秒才驗證是不會斷連的,所以我們在啟動之后,關閉服務端,然后再次重啟服務端
首先啟動服務端,控制台如下:
啟動客戶端,控制台如下:
客戶端啟動之后,服務端的控制台:
關閉服務端后,客戶端控制台:
重啟啟動服務端:
重連成功~