概述
詳細
詳細
本篇demo實現的功能是基於netty的心跳機制和長連接以及重連機制,最關鍵的就是通過netty中的 IdleStateHandler
的超時機制來實現心跳和重連 ,然后通過org.msgpack
編碼器來實現跨平台數據傳輸,
實現的功能就是通過Scanner來輸入消息得到服務端的回應,超過設定的超時時間就觸發超時事件來進行心跳傳輸,如果服務端宕機客戶端就會一直發起重連。
一、運行效果
服務端:
客戶端:
二、實現過程
-
在maven pom文件添加依賴:
-
<!-- 解碼and編碼器 --> <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency> <!-- netty 核心依賴 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.33.Final</version> </dependency>
-
導入以上依賴 ↓ 創建配置模型model(模型類) , TypeData(參數配置類) ↓ 創建解碼and編碼器MsgPckDecode(解碼器) ,MsgPckEncode(編碼器) ↓ 創建各自的控制器 AbstractClientChannelInboundHandleAdapter,AbstractServerChannelInboundHandleAdapter ↓ 創建客戶端及客戶端控制器Client(客戶端啟動類) , ClientHandler(客戶端控制器) ↓ 創建服務端以及控制器Server(客戶端啟動類) , ServerHandler(客戶端控制器) ps:本demo使用了msgpack , It’s like JSON. but fast and small.
-
package com.zxh.demo.model; import java.io.Serializable; import org.msgpack.annotation.Message; /** * 消息類型分離器 * @author Administrator * */ @Message public class Model implements Serializable{ private static final long serialVersionUID = 1L; //類型 private int type; //內容 private String body; public String getBody() { return body; } public void setBody(String body) { this.body = body; } public int getType() { return type; } public void setType(int type) { this.type = type; } @Override public String toString() { return "Model [type=" + type + ", body=" + body + "]"; } }
-
編寫一個配置類接口,用於控制心跳包和應用消息的處理
-
package com.zxh.demo.model; /** * 配置項 * @author Administrator * */ public interface TypeData { byte PING = 1; byte PONG = 2; //顧客 byte CUSTOMER = 3; }
創建MsgPckDecode(解碼器)
-
package com.zxh.demo.model; import java.util.List; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; /** * 解碼器 * @author Administrator * */ public class MsgPckDecode extends MessageToMessageDecoder<ByteBuf>{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { final byte[] array; final int length = msg.readableBytes(); array = new byte[length]; msg.getBytes(msg.readerIndex(), array, 0, length); MessagePack pack = new MessagePack(); out.add(pack.read(array, Model.class)); } }
-
創建MsgPckEncode(編碼器)
-
package com.zxh.demo.model; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 編碼器 * @author Administrator * */ public class MsgPckEncode extends MessageToByteEncoder<Object>{ @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) throws Exception { // TODO Auto-generated method stub MessagePack pack = new MessagePack(); byte[] write = pack.write(msg); buf.writeBytes(write); } }
-
創建client客戶端:
-
package com.zxh.demo.client; import java.util.Scanner; import java.util.concurrent.TimeUnit; import com.zxh.demo.model.Model; import com.zxh.demo.model.MsgPckDecode; import com.zxh.demo.model.MsgPckEncode; import com.zxh.demo.model.TypeData; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; public class Client { private NioEventLoopGroup worker = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; public static void main(String[] args) { Client client = new Client(); client.start(); client.sendData(); } private void start() { bootstrap = new Bootstrap(); bootstrap.group(worker) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0,0,5)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new ClientHandler(Client.this)); } }); doConnect(); } /** * 連接服務端 and 重連 */ protected void doConnect() { if (channel != null && channel.isActive()){ return; } ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081); //實現監聽通道連接的方法 connect.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()){ channel = channelFuture.channel(); System.out.println("連接服務端成功"); }else{ System.out.println("每隔2s重連...."); channelFuture.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } },2,TimeUnit.SECONDS); } } }); } /** * 向服務端發送消息 */ private void sendData() { Scanner sc= new Scanner(System.in); for (int i = 0; i < 1000; i++) { if(channel != null && channel.isActive()){ //獲取一個鍵盤掃描器 String nextLine = sc.nextLine(); Model model = new Model(); model.setType(TypeData.CUSTOMER); model.setBody(nextLine); channel.writeAndFlush(model); } } } }
-
創建Server服務端:
-
package com.zxh.demo.server; import com.zxh.demo.model.MsgPckDecode; import com.zxh.demo.model.MsgPckEncode; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; public class Server { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8081) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(10,0,0)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new ServerHandler()); } }); System.out.println("start server by port 8081 --"); ChannelFuture sync = serverBootstrap.bind().sync(); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ //優雅的關閉資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
先運行服務端,然后再啟動客戶端 會根據設置的端口連接服務端,在客戶端輸入消息就會得到服務端的回應,如果超過5秒沒有進行讀寫就會觸發IdleStateHandler
類超時事件 來進行心跳包的傳輸 ,服務端未檢測到客戶端的讀寫或者心跳就會主動關閉channel通道
三、項目結構圖
四、補充
所謂的心跳, 即在 TCP 長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 TCP 連接的有效性.因為網絡的不可靠性, 有可能在 TCP 保持長連接的過程中, 由於某些突發情況, 例如網線被拔出, 突然掉電等, 會造成服務器和客戶端的連接中斷. 在這些突發情況下, 如果恰好服務器和客戶端之間沒有交互的話, 那么它們是不能在短時間內發現對方已經掉線的. 為了解決這個問題, 我們就需要引入 心跳 機制. 心跳機制的工作原理是: 在服務器和客戶端之間一定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文后, 也立即發送一個特殊的數據報文, 回應發送方, 此即一個 PING-PONG 交互. 自然地, 當某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性