一.心跳介紹
網絡中的接收和發送數據都是使用操作系統中的SOCKET進行實現。但是如果此套接字已經斷開,那發送數據和接收數據的時候就一定會有問題。
1.心跳機制:
是服務端和客戶端定時的發送一個心跳包(自定義的數據結構體),讓對方知道自己還活着,處於在線狀態,以確保連接真實有效的一種機制。
2.心跳檢查:
心跳檢查是查看服務端和客戶端是否定時的在正常的發送心跳包。
在java的定時線程任務中,我們也可以去實現定時的一些輪詢任務,但是netty給我們提供了一些自身封裝實現好的一些心跳檢查機制,我們可以利用netty來實現高效的心跳檢查機制。
二.netty 提供的心跳
netty4.x中為我們提供了IdleStateHandler來檢查服務端和客戶端的心跳。
IdleStateHandler 類中是這樣描述的:
triggers an {@link IdleStateEvent} when a {@link Channel} has not performed read, write, or both operation for a while. 解釋:在一段時間內,如果有讀、寫、讀寫空閑時發生時,會觸發這個這個事件 IdleStateHandler會記錄IdleStateEvent事件(讀空閑、寫空閑、讀寫空閑)交給下一個handler處理 IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) 參數說明: 1. long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連接 2. long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連接 3. long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連接 4.TimeUnit unit:時間大小
三.自定義心跳實現
下面我們利用netty的IdleStateHandler來實現一個斷開重連的心跳檢查機制
1.心跳實現思路:
服務端: 服務端正常配置啟動,並利用IdleStateHandler中的IdleStateEvent事件,在發生5秒后沒有讀事件發生時,就會觸發userEventTrigger事件,如果服務端在5秒內沒有發生讀的事件,說明客戶端已經斷開。 服務端正常編寫,只不過是多了一個IdleStateHandler事件處理的handler而已。 客戶端: 客戶端需要考慮2件事,第1是怎么定時的去向服務端發送數據,第2是如果失敗時怎樣去嘗試再次連接。好在netty的handler都已提供了相應的處理機制和方法。 1.定時發送數據問題: 客戶端利用IdleStateHandler的事件特性在發生IdleStateEvent后,會記錄下觸發的事件,然后交給下一下handler處理,我們可以通過ChannelInboundHandlerAdapter的userEventTriggered方法來向服務端寫數據,也就是說如果4秒內沒有發生寫事件,就會觸發userEventTrigger方法,我們可以在該方法中向服務端寫數據。 2.重連問題: 當服務端發生異常斷開時,我們可以利用ChannelInboundHandlerAdapter的channelInactive方法進行重連。在這里需要注意,由於netty每次進行重連時會使用的Bootstrap是不共享的,因此需要通過設置@Sharable標簽讓bootstrap數據共享,這樣當每次嘗試重連時就可以把之前設置的一些綁定信息可以共享使用。
2 .UML類圖


3.實現代碼:
3.1 服務端代碼實現
服務端代碼實現沒什么難度,一共是3個類組成:
HeartBeatServer : 服務端綁定啟動項參數配置
HeartBeatServerInitHandler : 服務端創建時加載netty的channelhandler
HeartBeatServerHandler : 服務端創建時加載自定義的channelhandler
3.2客戶端代碼實現
客戶端代碼稍微復雜一點,但其本質上和普通的客戶端都一樣
HeartBeatClient : 客戶端綁定啟動項參數配置
ClientUserEventTriggeredHandler : 客戶端心跳事件發生時觸發此類中的方法
ClientReconnectHandler : 客戶端斷開連接后,嘗試重連的自定義handler,該類是個抽象類,需要在調用時傳入相應的參數,具體情況在該類上有解釋
FireChannelHandlers : 客戶端在嘗試重連時,需要透傳的參數
HeartBeatServer
package com.zpb.netty.heartbeat.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @Desc: com.zpb.netty.heare1 * @Date: 2019/11/30 * @Auther: pengbo.zhao * @version: 1.0 */ public class HeartBeatServer { private int port; public HeartBeatServer(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.option(ChannelOption.SO_BACKLOG, 128); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); serverBootstrap.childHandler(new ServerInitHandler()); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); System.out.println("Server start listen at... " + port); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { new HeartBeatServer(8888).start(); } }
HeartBeatServerInitHandler
package com.zpb.netty.heartbeat.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * @Desc: com.zpb.netty.demo * @Date: 2019/11/30 * @Auther: pengbo.zhao * @version: 1.0 */ public class HeartBeatServerInitHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //添加心跳檢查包 pipeline.addLast(new IdleStateHandler(5,0,0,TimeUnit.SECONDS)); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new HeartBeatServerHandler()); } }
HeartBeatServerHandler
package com.zpb.netty.heartbeat.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * @Desc: com.zpb.netty.demo * @Date: 2019/11/30 * @Auther: pengbo.zhao * @version: 1.0 */ public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { //當服務器5秒內沒有發生讀的事件時,會觸發這個事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { //當事件為讀事件觸發時發生異常,或者中斷 throw new Exception("idle exception");//將通道進行關閉 } }else { super.userEventTriggered(ctx, evt); } } //當通道有讀事件時 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead.."); System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString()); } //當通道發生異常時 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server happend exception ,server close channel :"+cause.getMessage()); ctx.close(); } }
HeartBeatClient
package com.zpb.netty.heartbeat.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @Desc: com.zpb.netty.demo.client * @Date: 2019/11/30 * @Auther: pengbo.zhao * @version: 1.0 */ public class HeartBeatClient { public void start(String host,int port){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); //設置線程組 bootstrap.channel(NioSocketChannel.class); //設置管道 final ClientReconnectHandler clientReconnectHandler = new ClientReconnectHandler(bootstrap, host, port) { @Override public ChannelHandler[] channelHandlers() { return new ChannelHandler[]{ this, //重連的handler new LoggingHandler(LogLevel.INFO), //日志handler new StringDecoder(), //編碼handler new StringEncoder(), //解碼handler new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS), //心跳檢查handler new ClientUserEventTriggeredHandler() //心跳檢查失敗handler }; } }; System.err.println("client is ready......"); ChannelFuture channelFuture = null; try { synchronized (bootstrap) { bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(clientReconnectHandler.channelHandlers());//正常情況時的連接綁定 } }); channelFuture = bootstrap.connect(host,port).sync(); } } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); new HeartBeatClient().start("127.0.0.1",8888); executorService.scheduleAtFixedRate(()->{ System.out.println("客戶端獲取服務端是否在線的狀態:"+ClientReconnectHandler.CONNECTION_STATE); },800,800,TimeUnit.MILLISECONDS); } }
ClientUserEventTriggeredHandler
package com.zpb.netty.heartbeat.client; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; /** * 客戶端的寫事件 * @Desc: com.zpb.netty.demo.client * @Date: 2019/11/30 * @Auther: pengbo.zhao * @version: 1.0 */ public class ClientUserEventTriggeredHandler extends ChannelInboundHandlerAdapter{ //當超過n秒沒有寫時會觸發該事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { ctx.writeAndFlush(Unpooled.copiedBuffer("ping",CharsetUtil.UTF_8)); } } else { super.userEventTriggered(ctx, evt); } } }
ClientReconnectHandler
package com.zpb.netty.heartbeat.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; /**
* 該類繼承了ChannelInboundHandlerAdapter方法,目的是為了重寫channelActive 和channelInactive 2個方法
* channelActive 方法是: 在通道建立時,可以知道此時的客戶端和服務端已經建立了連接
* channelInactive 方法是: 在通道斷開后,可以知道此時的客戶端已經和服務斷斷開了連接,需要在這個方法中設置重連客戶端方法
*
* 該類實現了netty的接口TimerTask,目的是為了重寫run()方法
* run(TimeOut timeout) 方法是:寫具體的重連方案
*
* 該類實現了RireChannelHandlers 這個接口,目的是為了重寫channelHandlers()方法
* channelHandlers() 方法是: 獲得所有的通道配置處理的channelHandler,包括netty提供的和自定義的實現的,重點是該類並沒有實現這個接口,因為關於客戶端的一些啟動項配置參數,我們在這里是並不知道客戶端要怎樣配置的,所以這才是把該類定義抽象類的關鍵
* 讓子類去實現這個方法更為合理。
*
* @Sharabel 標簽
* 該注解的目的是在每次重連時,可以讓此類中的的channelhandler可以共享,多次使用
* * @Date: 2019/11/30 * @Auther: pengbo.zhao * @version: 1.0 */ @Sharable public abstract class ClientReconnectHandler extends ChannelInboundHandlerAdapter implements TimerTask,FireChannelHandlers { public static volatile boolean CONNECTION_STATE = false;//對外提供連接標志 protected final HashedWheelTimer timer = new HashedWheelTimer(); private int reconnectCount; private final Bootstrap bootstrap; private final String host; private final int port; public ClientReconnectHandler(Bootstrap bootstrap, String host, int port) { this.bootstrap = bootstrap; this.host = host; this.port = port; } //當通道建立時 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("當前鏈路已經激活了,重連嘗試次數重新置為0"); reconnectCount = 0; CONNECTION_STATE = true; ctx.fireChannelActive(); } //通道關閉時啟動重連 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("通道關閉,將再次進行重連"); CONNECTION_STATE = false; if (reconnectCount < 12) { reconnectCount++; System.out.println("重連第"+reconnectCount+"次"); int timeout = 2 << reconnectCount; timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); } ctx.fireChannelInactive(); } @Override public void run(Timeout timeout) throws Exception { ChannelFuture channelFuture; synchronized (bootstrap) { bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(channelHandlers()); } }); channelFuture = bootstrap.connect(host,port); } //添加重連監聽 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { boolean success = channelFuture.isSuccess(); if(!success){ System.out.println("重連失敗"); channelFuture.channel().pipeline().fireChannelInactive(); }else{ CONNECTION_STATE = true; System.out.println("重連成功"); } } }); } }
FireChannelHandlers
package com.zpb.netty.heartbeat.client; import io.netty.channel.ChannelHandler; /** * 透傳handler列表 * @Desc: com.zpb.netty.demo.client * @Date: 2019/12/1 * @Auther: pengbo.zhao * @version: 1.0 */ public interface FireChannelHandlers { ChannelHandler [] channelHandlers(); }
服務端啟動:

客戶端啟動:

當服務端接收到客戶端發送的數據后:

當服務端斷開連接后:

客戶端斷開重連時:

