1、依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.22.Final</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.5.1</version> </dependency>
2、构建服务端
@Slf4j @Component public class NettyServer { private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); @Value("${server.port}") private Integer port; @PostConstruct public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true) //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new NettyServerHandlerInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("Netty Server启动成功"); } } @PreDestroy public void destory() { boss.shutdownGracefully(); work.shutdownGracefully(); log.info("Netty Server关闭"); } }
3、客户端
@Slf4j @Component public class NettyClient { private NioEventLoopGroup group = new NioEventLoopGroup(); @Value("${client.port}") private int port; @Value("${client.host}") private String host; private SocketChannel socketChannel; public void sendMsg(MessageBase.Message message) { socketChannel.writeAndFlush(message); } @PostConstruct public void start() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientHandlerInitilizer()); ChannelFuture future = bootstrap.connect(); future.addListener((ChannelFutureListener) future1 -> { if (future1.isSuccess()) { log.info("连接Netty服务端成功"); } else { log.info("连接失败,进行断线重连"); future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS); } }); socketChannel = (SocketChannel) future.channel(); } }
4、Protobuf消息
syntax="proto3"; option java_package="com.smart.protocol.protobuf"; option java_outer_classname="MessageBase"; message Message{ string requestId=1; CommandType cmd=2; string content=3; enum CommandType { NORMAL =0;//常规业务消息 HEARTBEAT_REQUEST=1;//客户端心跳消息 HEARTBEAT_RESPONSE=2;//服务端心跳消息 } }
5、protobuf的编解码器
Netty 为了支持 protobuf提供了针对 protobuf的编解码器
将其加入客户端和服务端的 ChannelPipeline中以用于对消息进行编解码
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new ServerIdleStateHandler()) .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new NettyServerHandler()); } }
6、心跳机制
心跳是在TCP长连接中,客户端与服务端之间定期发送的一种特殊的数据包,通知对方在线以确保TCP连接的有效性。
两种方式实现心跳机制:
- TCP协议层面的 keepalive 机制
- 在应用层上自定义的心跳机制
TCP层面的 keepalive 机制:在之前构建 Netty服务端和客户端启动过程中也有定义,需要手动开启:
// 设置TCP的长连接,默认的 keepalive的心跳时间是两个小时 childOption(ChannelOption.SO_KEEPALIVE, true)
Netty也提供了 IdleStateHandler 来实现心跳机制
@Slf4j public class HeartbeatHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { log.info("已经10s没有发送消息给服务器"); MessageBase.Message heartbeart = MessageBase.Message.newBuilder().setCmd( MessageBase.Message.CommandType.HEARTBEAT_REQUEST) .setRequestId(UUID.randomUUID().toString()) .setContent("heart").build(); ctx.writeAndFlush(heartbeart).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } }
当连接空闲时间太长时,将会触发一个 IdleStateEvent
事件,然后我们调用 userEventTriggered
来处理该 IdleStateEvent
事件。
对于长连接而言,一种方案是两边都发送心跳消息,另一种是服务端作为被动接收一方,如果一段时间内服务端没有收到心跳包那么就直接断开连接。这里采用第二种方案,只需要客户端发送心跳消息,然后服务端被动接收,然后设置一段时间,在这段时间内如果服务端没有收到任何消息,那么就主动断开连接
7、Netty 客户端断线重连
Netty 客户端需要重连服务端:
- Netty 客户端启动时,服务端挂掉,连不上服务端
- 在程序运行过程中,服务端突然挂掉
ChannelFutureListener
用来监测连接是否成功,不成功就进行断连重试机制
@PostConstruct public void start() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientHandlerInitilizer()); ChannelFuture future = bootstrap.connect(); future.addListener((ChannelFutureListener) future1 -> { if (future1.isSuccess()) { log.info("连接Netty服务端成功"); } else { log.info("连接失败,进行断线重连"); future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS); } }); socketChannel = (SocketChannel) future.channel(); }
第二种情况是运行过程中 服务端突然挂掉了,这种情况我们在处理数据读写的Handler中实现
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //如果运行过程中服务端挂了,执行重连机制 EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS); super.channelInactive(ctx); }
channelInactive()
方法是指如果当前Channel没有连接到远程节点,那么该方法将会被调用
8、服务端空闲检测
空闲检测是什么?实际上空闲检测是每隔一段时间,检测这段时间内是否有数据读写。比如,服务端检测一段时间内,是否收到客户端发送来的数据,如果没有,就及时释放资源,关闭连接。
@Slf4j public class ServerIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 30; public ServerIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME); ctx.channel().close(); } }
https://juejin.im/post/5bd584bc518825292865395d