1、依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.22.Final</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.5.1</version> </dependency>
2、構建服務端
@Slf4j @Component public class NettyServer { private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); @Value("${server.port}") private Integer port; @PostConstruct public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) //服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數 .option(ChannelOption.SO_BACKLOG, 1024) //設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文 .childOption(ChannelOption.SO_KEEPALIVE, true) //將小的數據包包裝成更大的幀進行傳送,提高網絡的負載,即TCP延遲傳輸 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new NettyServerHandlerInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("Netty Server啟動成功"); } } @PreDestroy public void destory() { boss.shutdownGracefully(); work.shutdownGracefully(); log.info("Netty Server關閉"); } }
3、客戶端
@Slf4j @Component public class NettyClient { private NioEventLoopGroup group = new NioEventLoopGroup(); @Value("${client.port}") private int port; @Value("${client.host}") private String host; private SocketChannel socketChannel; public void sendMsg(MessageBase.Message message) { socketChannel.writeAndFlush(message); } @PostConstruct public void start() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientHandlerInitilizer()); ChannelFuture future = bootstrap.connect(); future.addListener((ChannelFutureListener) future1 -> { if (future1.isSuccess()) { log.info("連接Netty服務端成功"); } else { log.info("連接失敗,進行斷線重連"); future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS); } }); socketChannel = (SocketChannel) future.channel(); } }
4、Protobuf消息
syntax="proto3"; option java_package="com.smart.protocol.protobuf"; option java_outer_classname="MessageBase"; message Message{ string requestId=1; CommandType cmd=2; string content=3; enum CommandType { NORMAL =0;//常規業務消息 HEARTBEAT_REQUEST=1;//客戶端心跳消息 HEARTBEAT_RESPONSE=2;//服務端心跳消息 } }
5、protobuf的編解碼器
Netty 為了支持 protobuf提供了針對 protobuf的編解碼器
將其加入客戶端和服務端的 ChannelPipeline中以用於對消息進行編解碼
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new ServerIdleStateHandler()) .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new NettyServerHandler()); } }
6、心跳機制
心跳是在TCP長連接中,客戶端與服務端之間定期發送的一種特殊的數據包,通知對方在線以確保TCP連接的有效性。
兩種方式實現心跳機制:
- TCP協議層面的 keepalive 機制
- 在應用層上自定義的心跳機制
TCP層面的 keepalive 機制:在之前構建 Netty服務端和客戶端啟動過程中也有定義,需要手動開啟:
// 設置TCP的長連接,默認的 keepalive的心跳時間是兩個小時 childOption(ChannelOption.SO_KEEPALIVE, true)
Netty也提供了 IdleStateHandler 來實現心跳機制
@Slf4j public class HeartbeatHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { log.info("已經10s沒有發送消息給服務器"); MessageBase.Message heartbeart = MessageBase.Message.newBuilder().setCmd( MessageBase.Message.CommandType.HEARTBEAT_REQUEST) .setRequestId(UUID.randomUUID().toString()) .setContent("heart").build(); ctx.writeAndFlush(heartbeart).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } }
當連接空閑時間太長時,將會觸發一個 IdleStateEvent
事件,然后我們調用 userEventTriggered
來處理該 IdleStateEvent
事件。
對於長連接而言,一種方案是兩邊都發送心跳消息,另一種是服務端作為被動接收一方,如果一段時間內服務端沒有收到心跳包那么就直接斷開連接。這里采用第二種方案,只需要客戶端發送心跳消息,然后服務端被動接收,然后設置一段時間,在這段時間內如果服務端沒有收到任何消息,那么就主動斷開連接
7、Netty 客戶端斷線重連
Netty 客戶端需要重連服務端:
- Netty 客戶端啟動時,服務端掛掉,連不上服務端
- 在程序運行過程中,服務端突然掛掉
ChannelFutureListener
用來監測連接是否成功,不成功就進行斷連重試機制
@PostConstruct public void start() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientHandlerInitilizer()); ChannelFuture future = bootstrap.connect(); future.addListener((ChannelFutureListener) future1 -> { if (future1.isSuccess()) { log.info("連接Netty服務端成功"); } else { log.info("連接失敗,進行斷線重連"); future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS); } }); socketChannel = (SocketChannel) future.channel(); }
第二種情況是運行過程中 服務端突然掛掉了,這種情況我們在處理數據讀寫的Handler中實現
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //如果運行過程中服務端掛了,執行重連機制 EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS); super.channelInactive(ctx); }
channelInactive()
方法是指如果當前Channel沒有連接到遠程節點,那么該方法將會被調用
8、服務端空閑檢測
空閑檢測是什么?實際上空閑檢測是每隔一段時間,檢測這段時間內是否有數據讀寫。比如,服務端檢測一段時間內,是否收到客戶端發送來的數據,如果沒有,就及時釋放資源,關閉連接。
@Slf4j public class ServerIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 30; public ServerIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { log.info("{} 秒內沒有讀取到數據,關閉連接", READER_IDLE_TIME); ctx.channel().close(); } }
https://juejin.im/post/5bd584bc518825292865395d