JAVA使用netty建立websocket連接


 

依賴

  <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>

 

也用了

 <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

 

 

WebsocketNettyServerBootstrap.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 服務啟動類
 */
public class WebsocketNettyServerBootstrap {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //創建服務器端的啟動對象,配置參數
            ServerBootstrap bootstrap = new ServerBootstrap();
            //設置兩個線程組
            bootstrap.group(bossGroup, workerGroup)
                    //使用NioServerSocketChannel 作為服務器的通道實現
                    .channel(NioServerSocketChannel.class)
                    //設置線程隊列得到連接個數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //設置保持活動連接狀態
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //給workerGroup 的 EventLoop 對應的管道設置處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            /**
                             * 因為基於http協議,使用http的編解碼器
                             */
                            pipeline.addLast(new HttpServerCodec());

                            /**
                             * 是以塊方式寫,添加 ChunkedWriteHandler 處理器
                             */
                            pipeline.addLast(new ChunkedWriteHandler());
                            /**
                             * http數據傳輸過程是分段的,HttpObjectAggregator,就是可以將多段聚合
                             * 當瀏覽器發送大量數據時,就會發出多次http請求
                             */
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            /**
                             * 對於websocket數據是以 幀 形式傳遞
                             *  瀏覽器請求時 ws://localhost:7000/hello 其中 hello會與下面的對應
                             *  WebSocketServerProtocolHandler 核心功能是將http協議升級為ws協議,保持長鏈接
                             */
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));


                            //自定義handler
                            pipeline.addLast(new TextWebsocketFrameHandler());
                        }
                    });

            //啟動服務器並綁定一個端口並且同步生成一個 ChannelFuture 對象
            ChannelFuture cf = bootstrap.bind(7000).sync();
            if (cf.isSuccess()) {
                System.out.println("websocket server start---------------");
            }

            //對關閉通道進行監聽
            cf.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //發送異常關閉
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

        }
    }
}

 

 

TextWebsocketFrameHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

/**
 * TextWebSocketFrame 表示一個文本幀
 */
@Slf4j
public class TextWebsocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info(">>>>>>>>>>>服務端收到消息:{}", msg.text());


        /**
         * 回復消息
         */
        ctx.writeAndFlush(new TextWebSocketFrame("服務器收到了,並返回:"+msg.text()));
    }


    /**
     * 當web客戶端連接后,觸發方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        /**
         * 這個ID是唯一的
         */
        log.info(">>>>>>>>>>>> channelId:{} 連接",ctx.channel().id().asLongText());

        /**
         *  這個ID不是唯一的
         */
        log.info(">>>>>>>>>>>> channelId:{} 連接",ctx.channel().id().asShortText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>>>> channelId:{} 關閉了",ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error(">>>>>>>>發送異常:{}",cause.getMessage());
        ctx.close();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM