SpringBoot集成Netty實現websocket通訊


實現websocket通訊,和廣播消息

添加依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.1.Final</version>
</dependency>
<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>${commons.lang.version}</version>
</dependency>

排除tomcat的依賴

Netty Http服務端編寫

handler 處理類

@Component
@Slf4j
@ChannelHandler.Sharable //@Sharable 注解用來說明ChannelHandler是否可以在多個channel直接共享使用
@ConditionalOnProperty(  //配置文件屬性是否為true
        value = {"netty.ws.enabled"},
        matchIfMissing = false
)
public class WsServerHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    NettyWsProperties nettyWsProperties;

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private WebSocketServerHandshaker handshaker;
    //websocket握手升級綁定頁面
    String wsFactoryUri = "";

    @Value("${netty.ws.endPoint:/ws}")
    private String wsUri;
    //static Set<Channel> channelSet = new HashSet<>();

    /*
     * 握手建立
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.add(incoming);
    }

    /*
     * 握手取消
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.remove(incoming);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
    //websocket消息處理(只支持文本)
    public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {

        // 關閉請求
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // ping請求
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 只支持文本格式,不支持二進制消息
        if (frame instanceof TextWebSocketFrame) {
            //接收到的消息
            String requestmsg = ((TextWebSocketFrame) frame).text();
            TextWebSocketFrame tws = new TextWebSocketFrame(requestmsg);
            channels.writeAndFlush(tws);
        }

    }

    // 第一次請求是http請求,請求頭包括ws的信息
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request)
            throws Exception {
        // 如果HTTP解碼失敗,返回HTTP異常
        if (request instanceof HttpRequest) {
            HttpMethod method = request.getMethod();
            // 如果是websocket請求就握手升級
            if (wsUri.equalsIgnoreCase(request.getUri())) {
                System.out.println(" req instanceof HttpRequest");
                WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                        wsFactoryUri, null, false);
                handshaker = wsFactory.newHandshaker(request);
                if (handshaker == null) {
                    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                } else {
                }
                handshaker.handshake(ctx.channel(), request);
            }
        }
    }
    
    // 異常處理,netty默認是關閉channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
        
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {

            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // 讀數據超時
            } else if (event.state() == IdleState.WRITER_IDLE) {
                // 寫數據超時
            } else if (event.state() == IdleState.ALL_IDLE) {
                // 通道長時間沒有讀寫,服務端主動斷開鏈接
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

ChannelPipeline 實現

@Component
@ConditionalOnProperty(  //配置文件屬性是否為true
        value = {"netty.ws.enabled"},
        matchIfMissing = false
)
public class WsPipeline  extends ChannelInitializer<SocketChannel>{
    
    @Autowired
    WsServerHandler wsServerHandler;
    
    private static final int READ_IDEL_TIME_OUT = 3; // 讀超時
    private static final int WRITE_IDEL_TIME_OUT = 4;// 寫超時
    private static final int ALL_IDEL_TIME_OUT = 5; // 所有超時
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline p = ch.pipeline();
        p.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.MINUTES));
        p.addLast("http-codec", new HttpServerCodec());
        p.addLast("aggregator", new HttpObjectAggregator(65536));
        p.addLast("http-chunked", new ChunkedWriteHandler());
        p.addLast("handler",wsServerHandler);
    }

}

服務實現

@Configuration
@EnableConfigurationProperties({NettyWsProperties.class})
@ConditionalOnProperty(  //配置文件屬性是否為true
        value = {"netty.ws.enabled"},
        matchIfMissing = false
)
@Slf4j
public class WsServer {
    
        @Autowired
        WsPipeline wsPipeline;

        @Autowired
        NettyWsProperties nettyWsProperties;
        
        @Bean("starWsServer")
        public String start() {
            // 准備配置
            // HttpConfiguration.me().setContextPath(contextPath).setWebDir(webDir).config();
            // 啟動服務器
           Thread thread =  new Thread(() -> {
                NioEventLoopGroup bossGroup = new NioEventLoopGroup(nettyWsProperties.getBossThreads());
                NioEventLoopGroup workerGroup = new NioEventLoopGroup(nettyWsProperties.getWorkThreads());
                try {
                    log.info("start netty [WebSocket] server ,port: " + nettyWsProperties.getPort());
                    ServerBootstrap boot = new ServerBootstrap();
                    options(boot).group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .handler(new LoggingHandler(LogLevel.INFO))
                            .childHandler(wsPipeline);
                    Channel ch = null;
                    //是否綁定IP
                    if(StringUtils.isNotEmpty(nettyWsProperties.getBindIp())){
                        ch = boot.bind(nettyWsProperties.getBindIp(),nettyWsProperties.getPort()).sync().channel();
                    }else{
                        ch = boot.bind(nettyWsProperties.getPort()).sync().channel();
                    }
                    ch.closeFuture().sync();
                } catch (InterruptedException e) {
                    log.error("啟動NettyServer錯誤", e);
                } finally {
                    bossGroup.shutdownGracefully();
                    workerGroup.shutdownGracefully();
                }
            });
            thread.setName("Ws_Server");
            thread.start();
            return "ws start";
        }
        
        
        private ServerBootstrap options(ServerBootstrap boot) {
            boot.option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            return boot;
        }

}

啟動配置

---application.yml
spring.profiles.active: ws

---application-ws.yml
netty:
   ws:
     enabled: true
     port: 9988
     endPoint: /ws

測試

在瀏覽器打開多個http://127.0.0.1:8080/socket.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM