何為心跳
顧名思義, 所謂心跳, 即在TCP長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 TCP 連接的有效性。
為什么需要心跳
因為網絡的不可靠性, 有可能在 TCP 保持長連接的過程中, 由於某些突發情況, 例如網線被拔出, 突然掉電等, 會造成服務器和客戶端的連接中斷. 在這些突發情況下, 如果恰好服務器和客戶端之間沒有交互的話, 那么它們是不能在短時間內發現對方已經掉線的. 為了解決這個問題, 我們就需要引入心跳機制。 心跳機制的工作原理是: 在服務器和客戶端之間一定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文后, 也立即發送一個特殊的數據報文, 回應發送方, 此即一個 PING-PONG 交互。自然地, 當某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性。
如何實現心跳
我們可以通過兩種方式實現心跳機制:
-
使用 TCP 協議層面的 keepalive 機制.
-
在應用層上實現自定義的心跳機制.
雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 但是使用它有幾個缺點:
-
它不是 TCP 的標准協議, 並且是默認關閉的.
-
TCP keepalive 機制依賴於操作系統的實現, 默認的 keepalive 心跳時間是 兩個小時, 並且對 keepalive 的修改需要系統調用(或者修改系統配置), 靈活性不夠.
-
TCP keepalive 與 TCP 協議綁定, 因此如果需要更換為 UDP 協議時, keepalive 機制就失效了.
雖然使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量, 但是基於上面的幾點缺點, 一般的實踐中, 人們大多數都是選擇在應用層上實現自定義的心跳。
既然如此, 那么我們就來大致看看在在 Netty 中是怎么實現心跳的吧。
在這之前,先大致看看Netty的連接管理
Netty的連接管理,用於空閑連接以及超時的ChannelHandler
- IdleStateHandler當連接空閑時間太長時,將會觸發一個IdleStateEvent事件。然后,你可以通過在你的ChannelInboundHandler中重寫userEventTriggered()方法來處理該IdleStateEvent事件
- ReadTimeoutHandler如果在指定的時間間隔內沒有收到任何的入站數據,則拋出一個ReadTimeoutException並關閉對應的Channel。可以通過重寫你的ChannelHandler中的exceptionCaught()方法來檢測該ReadTimeoutException
- WriteTimeoutHandler如果在指定的時間間隔內沒有任何出站數據寫入,則拋出一個WriteTimeoutException並關閉對應的Channel。可以通過重寫你的ChannelHandler 的exceptionCaught()方法檢測該WriteTimeoutException
讓我們仔細看看在實踐中使用得最多的IdleStateHandler吧。以下代碼清單展示了當使用通常的發送心跳消息到遠程節點的方法時,如果在60秒之內沒有接收或者發送任何的數據,我們將如何得到通知,如果沒有響應,則連接會被關閉。
發送心跳
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));// IdleStateHandler將在被觸發時發送一個IdleStateEvent事件 pipeline.addLast(new HeartbeatHandler());// 將一個HeartbeatHandler添加到ChannelPipeline中 } } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {// 實現userEventTriggered()方法以發送心跳消息 private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));// 發送到遠程節點的心跳消息 @Override public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception { if (evt instanceof IdleStateEvent) {// 發送心跳消息,並在發送失敗時關閉該連接 ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt);// 不是IdleStateEvent事件,所以將它傳遞給下一個ChannelInboundHandler } }
這個示例演示了如何使用IdleStateHandler來測試遠程節點是否仍然還活着,並且在它失活時通過關閉連接來釋放資源。如果連接超過60秒沒有接收或者發送任何的數據,那么IdleStateHandler將會使用一個IdleStateEvent事件來調用fireUserEventTriggered()方法。HeartbeatHandler實現了userEventTriggered()方法,如果這個方法檢測到IdleStateEvent事件,它將會發送心跳消息,並且添加一個將在發送操作失敗時關閉該連接的ChannelFutureListener。
使用 Netty 實現心跳
通過上面的解釋,我們知道連接管理是首先需要關注的,檢測空閑連接以及超時對於及時釋放資源來說是至關重要的。由於這是一項常見的任務,Netty特地為它提供了幾個ChannelHandler實現。
在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 它可以對一個 Channel 的 讀/寫設置定時器, 當 Channel 在一定事件間隔內沒有數據交互時(即處於 idle 狀態), 就會觸發指定的事件。
那么這個 Handler 如何使用呢? 我們來看看它的構造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); }
實例化一個 IdleStateHandler 需要提供三個參數:
-
readerIdleTimeSeconds, 讀超時. 即當在指定的事件間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
-
writerIdleTimeSeconds, 寫超時. 即當在指定的事件間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
-
allIdleTimeSeconds, 讀/寫超時. 即當在指定的事件間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
為了展示具體的 IdleStateHandler 實現的心跳機制, 下面我們來構造一個具體的EchoServer 的例子, 這個例子的行為如下:
-
在這個例子中, 客戶端和服務器通過 TCP 長連接進行通信.
-
TCP 通信的報文格式是:
+--------+-----+---------------+ | Length |Type | Content | | 17 | 1 |"HELLO, WORLD" | +--------+-----+---------------+
-
客戶端每隔一個隨機的時間后, 向服務器發送消息, 服務器收到消息后, 立即將收到的消息原封不動地回復給客戶端.
-
若客戶端在指定的時間間隔內沒有讀/寫操作, 則客戶端會自動向服務器發送一個 PING 心跳, 服務器收到 PING 心跳消息時, 需要回復一個 PONG 消息.
下面所使用的代碼例子可以在我的 Github github.com/yongshun/some_java_code 上找到.
通用部分
根據上面定義的行為, 我們接下來實現心跳的通用部分 CustomHeartbeatHandler:
/** * @author xiongyongshun * @version 1.0 * @email yongshun1228@gmail.com * @created 16/9/18 13:02 */ public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final byte PING_MSG = 1; public static final byte PONG_MSG = 2; public static final byte CUSTOM_MSG = 3; protected String name; private int heartbeatCount = 0; public CustomHeartbeatHandler(String name) { this.name = name; } @Override protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception { if (byteBuf.getByte(4) == PING_MSG) { sendPongMsg(context); } else if (byteBuf.getByte(4) == PONG_MSG){ System.out.println(name + " get pong msg from " + context.channel().remoteAddress()); } else { handleData(context, byteBuf); } } protected void sendPingMsg(ChannelHandlerContext context) { ByteBuf buf = context.alloc().buffer(5); buf.writeInt(5); buf.writeByte(PING_MSG); buf.retain(); context.writeAndFlush(buf); heartbeatCount++; System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount); } private void sendPongMsg(ChannelHandlerContext context) { ByteBuf buf = context.alloc().buffer(5); buf.writeInt(5); buf.writeByte(PONG_MSG); context.channel().writeAndFlush(buf); heartbeatCount++; System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount); } protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // IdleStateHandler 所產生的 IdleStateEvent 的處理邏輯. if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: handleReaderIdle(ctx); break; case WRITER_IDLE: handleWriterIdle(ctx); break; case ALL_IDLE: handleAllIdle(ctx); break; default: break; } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("---" + ctx.channel().remoteAddress() + " is active---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---"); } protected void handleReaderIdle(ChannelHandlerContext ctx) { System.err.println("---READER_IDLE---"); } protected void handleWriterIdle(ChannelHandlerContext ctx) { System.err.println("---WRITER_IDLE---"); } protected void handleAllIdle(ChannelHandlerContext ctx) { System.err.println("---ALL_IDLE---"); } }
類 CustomHeartbeatHandler 負責心跳的發送和接收, 我們接下來詳細地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實現心跳的關鍵, 它會根據不同的 IO idle 類型來產生不同的 IdleStateEvent 事件, 而這個事件的捕獲, 其實就是在 userEventTriggered 方法中實現的.
我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實現:
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: handleReaderIdle(ctx); break; case WRITER_IDLE: handleWriterIdle(ctx); break; case ALL_IDLE: handleAllIdle(ctx); break; default: break; } } }
在 userEventTriggered 中, 根據 IdleStateEvent 的 state() 的不同, 而進行不同的處理. 例如如果是讀取數據 idle, 則 e.state() == READER_IDLE, 因此就調用 handleReaderIdle 來處理它. CustomHeartbeatHandler 提供了三個 idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個方法目前只有默認的實現, 它需要在子類中進行重寫, 現在我們暫時略過它們, 在具體的客戶端和服務器的實現部分時再來看它們.
知道了這一點后, 我們接下來看看數據處理部分:
@Override protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception { if (byteBuf.getByte(4) == PING_MSG) { sendPongMsg(context); } else if (byteBuf.getByte(4) == PONG_MSG){ System.out.println(name + " get pong msg from " + context.channel().remoteAddress()); } else { handleData(context, byteBuf); } }
在 CustomHeartbeatHandler.channelRead0 中, 我們首先根據報文協議:
+--------+-----+---------------+ | Length |Type | Content | | 17 | 1 |"HELLO, WORLD" | +--------+-----+---------------+
來判斷當前的報文類型, 如果是 PING_MSG 則表示是服務器收到客戶端的 PING 消息, 此時服務器需要回復一個 PONG 消息, 其消息類型是 PONG_MSG.扔報文類型是 PONG_MSG, 則表示是客戶端收到服務器發送的 PONG 消息, 此時打印一個 log 即可.
客戶端初始化
public class Client { public static void main(String[] args) { NioEventLoopGroup workGroup = new NioEventLoopGroup(4); Random random = new Random(System.currentTimeMillis()); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new IdleStateHandler(0, 0, 5)); p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0)); p.addLast(new ClientHandler()); } }); Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel(); for (int i = 0; i < 10; i++) { String content = "client msg " + i; ByteBuf buf = ch.alloc().buffer(); buf.writeInt(5 + content.getBytes().length); buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG); buf.writeBytes(content.getBytes()); ch.writeAndFlush(buf); Thread.sleep(random.nextInt(20000)); } } catch (Exception e) { throw new RuntimeException(e); } finally { workGroup.shutdownGracefully(); } } }
上面的代碼是 Netty 的客戶端端的初始化代碼, 使用過 Netty 的朋友對這個代碼應該不會陌生. 別的部分我們就不再贅述, 我們來看看 ChannelInitializer.initChannel 部分即可:
.handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new IdleStateHandler(0, 0, 5)); p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0)); p.addLast(new ClientHandler()); } });
我們給 pipeline 添加了三個 Handler, IdleStateHandler 這個 handler 是心跳機制的核心, 我們為客戶端端設置了讀寫 idle 超時, 時間間隔是5s, 即如果客戶端在間隔 5s 后都沒有收到服務器的消息或向服務器發送消息, 則產生 ALL_IDLE 事件.
接下來我們添加了 LengthFieldBasedFrameDecoder, 它是負責解析我們的 TCP 報文, 因為和本文的目的無關, 因此這里不詳細展開.
最后一個 Handler 是 ClientHandler, 它繼承於 CustomHeartbeatHandler, 是我們處理業務邏輯部分.
客戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler { public ClientHandler() { super("client"); } @Override protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { byte[] data = new byte[byteBuf.readableBytes() - 5]; byteBuf.skipBytes(5); byteBuf.readBytes(data); String content = new String(data); System.out.println(name + " get content: " + content); } @Override protected void handleAllIdle(ChannelHandlerContext ctx) { super.handleAllIdle(ctx); sendPingMsg(ctx); } }
ClientHandler 繼承於 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這里面實現 僅僅打印收到的消息.
第二個重寫的方法是 handleAllIdle. 我們在前面提到, 客戶端負責發送心跳的 PING 消息, 當客戶端產生一個 ALL_IDLE 事件后, 會導致父類的 CustomHeartbeatHandler.userEventTriggered 調用, 而 userEventTriggered 中會根據 e.state() 來調用不同的方法, 因此最后調用的是 ClientHandler.handleAllIdle, 在這個方法中, 客戶端調用 sendPingMsg 向服務器發送一個 PING 消息.
服務器初始化
public class Server { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workGroup = new NioEventLoopGroup(4); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new IdleStateHandler(10, 0, 0)); p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0)); p.addLast(new ServerHandler()); } }); Channel ch = bootstrap.bind(12345).sync().channel(); ch.closeFuture().sync(); } catch (Exception e) { throw new RuntimeException(e); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
服務器的初始化部分也沒有什么好說的, 它也和客戶端的初始化一樣, 為 pipeline 添加了三個 Handler.
服務器 Handler
public class ServerHandler extends CustomHeartbeatHandler { public ServerHandler() { super("server"); } @Override protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) { byte[] data = new byte[buf.readableBytes() - 5]; ByteBuf responseBuf = Unpooled.copiedBuffer(buf); buf.skipBytes(5); buf.readBytes(data); String content = new String(data); System.out.println(name + " get content: " + content); channelHandlerContext.write(responseBuf); } @Override protected void handleReaderIdle(ChannelHandlerContext ctx) { super.handleReaderIdle(ctx); System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---"); ctx.close(); } }
ServerHandler 繼承於 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這里面實現 EchoServer 的功能: 即收到客戶端的消息后, 立即原封不動地將消息回復給客戶端.
第二個重寫的方法是 handleReaderIdle, 因為服務器僅僅對客戶端的讀 idle 感興趣, 因此只重新了這個方法. 若服務器在指定時間后沒有收到客戶端的消息, 則會觸發 READER_IDLE 消息, 進而會調用 handleReaderIdle 這個方法. 我們在前面提到, 客戶端負責發送心跳的 PING 消息, 並且服務器的 READER_IDLE 的超時時間是客戶端發送 PING 消息的間隔的兩倍, 因此當服務器 READER_IDLE 觸發時, 就可以確定是客戶端已經掉線了, 因此服務器直接關閉客戶端連接即可.
總結
-
使用 Netty 實現心跳機制的關鍵就是利用 IdleStateHandler 來產生對應的 idle 事件.
-
一般是客戶端負責發送心跳的 PING 消息, 因此客戶端注意關注 ALL_IDLE 事件, 在這個事件觸發后, 客戶端需要向服務器發送 PING 消息, 告訴服務器"我還存活着".
-
服務器是接收客戶端的 PING 消息的, 因此服務器關注的是 READER_IDLE 事件, 並且服務器的 READER_IDLE 間隔需要比客戶端的 ALL_IDLE 事件間隔大(例如客戶端ALL_IDLE 是5s 沒有讀寫時觸發, 因此服務器的 READER_IDLE 可以設置為10s)
-
當服務器收到客戶端的 PING 消息時, 會發送一個 PONG 消息作為回復. 一個 PING-PONG 消息對就是一個心跳交互.
斷線重連
客戶端斷線重連機制。
客戶端數量多,且需要傳遞的數據量級較大。可以周期性的發送數據的時候,使用。要求對數據的即時性不高的時候,才可使用。
優點: 可以使用數據緩存。不是每條數據進行一次數據交互。可以定時回收資源,對資源利用率高。相對來說,即時性可以通過其他方式保證。如: 120秒自動斷線。數據變化1000次請求服務器一次。300秒中自動發送不足1000次的變化數據。
既然可以在客戶端做判斷連接是否斷開了,那么我們就可以重新調用連接服務端的代碼。
當然,我們重連的動作肯定是發生在斷連之后發生的,我們可以在上篇的心跳機制的基礎上,簡單地修改一下客戶端的啟動代碼就可以了。
public class Client { private NioEventLoopGroup workGroup = new NioEventLoopGroup(4); private Channel channel; private Bootstrap bootstrap; public static void main(String[] args) throws Exception { Client client = new Client(); client.start(); client.sendData(); } public void sendData() throws Exception { Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < 10000; i++) { if (channel != null && channel.isActive()) { String content = "client msg " + i; ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length); buf.writeInt(5 + content.getBytes().length); buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG); buf.writeBytes(content.getBytes()); channel.writeAndFlush(buf); } Thread.sleep(random.nextInt(20000)); } } public void start() { try { bootstrap = new Bootstrap(); bootstrap .group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new IdleStateHandler(0, 0, 5)); p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0)); p.addLast(new ClientHandler(Client.this)); } }); doConnect(); } catch (Exception e) { throw new RuntimeException(e); } } protected void doConnect() { if (channel != null && channel.isActive()) { return; } ChannelFuture future = bootstrap.connect("127.0.0.1", 12345); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture futureListener) throws Exception { if (futureListener.isSuccess()) { channel = futureListener.channel(); System.out.println("Connect to server successfully!"); } else { System.out.println("Failed to connect to server, try connect after 10s"); futureListener.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } }, 10, TimeUnit.SECONDS); } } }); } }
上面的代碼中, 我們抽象出 doConnect 方法, 它負責客戶端和服務器的 TCP 連接的建立, 並且當 TCP 連接失敗時, doConnect 會 通過 "channel().eventLoop().schedule" 來延時10s 后嘗試重新連接。
客戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler { private Client client; public ClientHandler(Client client) { super("client"); this.client = client; } @Override protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { byte[] data = new byte[byteBuf.readableBytes() - 5]; byteBuf.skipBytes(5); byteBuf.readBytes(data); String content = new String(data); System.out.println(name + " get content: " + content); } @Override protected void handleAllIdle(ChannelHandlerContext ctx) { super.handleAllIdle(ctx); sendPingMsg(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); client.doConnect(); } }
斷線重連的關鍵一點是檢測連接是否已經斷開. 因此我們改寫了 ClientHandler, 重寫了 channelInactive 方法. 當 TCP 連接斷開時, 會回調 channelInactive 方法, 因此我們在這個方法中調用 client.doConnect() 來進行重連。
參考文章:https://blog.csdn.net/z69183787/article/details/52671543