springboot整合netty實戰


1、依賴

       <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.22.Final</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.1</version>
        </dependency>

2、構建服務端

@Slf4j
@Component
public class NettyServer {
    private EventLoopGroup boss = new NioEventLoopGroup();
    private EventLoopGroup work = new NioEventLoopGroup();

    @Value("${server.port}")
    private Integer port;

    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(port))
                //服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數
                .option(ChannelOption.SO_BACKLOG, 1024)
                //設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //將小的數據包包裝成更大的幀進行傳送,提高網絡的負載,即TCP延遲傳輸
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childHandler(new NettyServerHandlerInitializer());
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("Netty Server啟動成功");
        }
    }

    @PreDestroy
    public void destory() {
        boss.shutdownGracefully();
        work.shutdownGracefully();
        log.info("Netty Server關閉");
    }
}

3、客戶端

@Slf4j
@Component
public class NettyClient {
    private NioEventLoopGroup group = new NioEventLoopGroup();
    @Value("${client.port}")
    private int port;
    @Value("${client.host}")
    private String host;
    private SocketChannel socketChannel;

    public void sendMsg(MessageBase.Message message) {
        socketChannel.writeAndFlush(message);
    }

    @PostConstruct
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ClientHandlerInitilizer());
        ChannelFuture future = bootstrap.connect();
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("連接Netty服務端成功");
            } else {
                log.info("連接失敗,進行斷線重連");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}

4、Protobuf消息

syntax="proto3";
option java_package="com.smart.protocol.protobuf";
option java_outer_classname="MessageBase";

message Message{
    string requestId=1;
    CommandType cmd=2;
    string content=3;
    enum CommandType {
      NORMAL =0;//常規業務消息
      HEARTBEAT_REQUEST=1;//客戶端心跳消息
      HEARTBEAT_RESPONSE=2;//服務端心跳消息
    }
}

5、protobuf的編解碼器

 Netty 為了支持 protobuf提供了針對 protobuf的編解碼器

 

 

   將其加入客戶端和服務端的 ChannelPipeline中以用於對消息進行編解碼

public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline()
                .addLast(new ServerIdleStateHandler())
                .addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
                .addLast(new ProtobufVarint32LengthFieldPrepender())
                .addLast(new ProtobufEncoder())
                .addLast(new NettyServerHandler());
    }
}

6、心跳機制

  心跳是在TCP長連接中,客戶端與服務端之間定期發送的一種特殊的數據包,通知對方在線以確保TCP連接的有效性。

     兩種方式實現心跳機制:

  • TCP協議層面的 keepalive 機制
  • 在應用層上自定義的心跳機制

     TCP層面的 keepalive 機制:在之前構建 Netty服務端和客戶端啟動過程中也有定義,需要手動開啟:

// 設置TCP的長連接,默認的 keepalive的心跳時間是兩個小時
childOption(ChannelOption.SO_KEEPALIVE, true)

  Netty也提供了 IdleStateHandler 來實現心跳機制

@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                log.info("已經10s沒有發送消息給服務器");
                MessageBase.Message heartbeart = MessageBase.Message.newBuilder().setCmd(
                        MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
                        .setRequestId(UUID.randomUUID().toString())
                        .setContent("heart").build();
                ctx.writeAndFlush(heartbeart).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

  當連接空閑時間太長時,將會觸發一個 IdleStateEvent事件,然后我們調用 userEventTriggered來處理該 IdleStateEvent事件。

  對於長連接而言,一種方案是兩邊都發送心跳消息,另一種是服務端作為被動接收一方,如果一段時間內服務端沒有收到心跳包那么就直接斷開連接。這里采用第二種方案,只需要客戶端發送心跳消息,然后服務端被動接收,然后設置一段時間,在這段時間內如果服務端沒有收到任何消息,那么就主動斷開連接

7、Netty 客戶端斷線重連

  Netty 客戶端需要重連服務端:

  • Netty 客戶端啟動時,服務端掛掉,連不上服務端
  • 在程序運行過程中,服務端突然掛掉
  第一種情況實現  ChannelFutureListener用來監測連接是否成功,不成功就進行斷連重試機制
  
@PostConstruct
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ClientHandlerInitilizer());
        ChannelFuture future = bootstrap.connect();
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("連接Netty服務端成功");
            } else {
                log.info("連接失敗,進行斷線重連");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }

  第二種情況是運行過程中 服務端突然掛掉了,這種情況我們在處理數據讀寫的Handler中實現

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //如果運行過程中服務端掛了,執行重連機制
        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
        super.channelInactive(ctx);
    }

  channelInactive()方法是指如果當前Channel沒有連接到遠程節點,那么該方法將會被調用

8、服務端空閑檢測

  空閑檢測是什么?實際上空閑檢測是每隔一段時間,檢測這段時間內是否有數據讀寫。比如,服務端檢測一段時間內,是否收到客戶端發送來的數據,如果沒有,就及時釋放資源,關閉連接。

@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
    private static final int READER_IDLE_TIME = 30;

    public ServerIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        log.info("{} 秒內沒有讀取到數據,關閉連接", READER_IDLE_TIME);
        ctx.channel().close();
    }
}

 

參考:
https://juejin.im/post/5bd584bc518825292865395d


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM