netty學習第5章 netty整合websocket實現服務端與客戶端消息推送


   在學完netty基礎部分后,你可能很難想到它的使用場景,本章就介紹一個netty的使用場景--websocket協議的應用。

    WebSocket是一種在單個TCP連接上進行全雙工通信的協議。WebSocket使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。

  其次websocke支持很多種數據傳輸,如:二進制流、文件、文本信息等等。

一、服務端模塊

   1.引入maven與啟動類

    本文是基於spring boot 2.0,netty4.0開發的,這里只展示netty的包

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

    因為是服務端,所以需要在spring boot自帶的啟動類中同時啟動netty服務

/**
 * 聲明CommandLineRunner接口,實現run方法,就能給啟動項目同時啟動netty服務
 */
@SpringBootApplication
public class ThemApplication implements CommandLineRunner {

    /**
     * netty服務
     */
    @Autowired
    ServerByNetty serverByNetty;

    public static void main(String[] args) {
        SpringApplication.run(ThemApplication.class, args);
    }
    
    @Override
    public void run(String... args) throws Exception {
        serverByNetty.startServer();
    }
}

  2.netty服務端類

/**
 * 基於websocket的服務端代碼
 */
@Configuration
public class ServerByNetty {

    /**
     * 服務端啟動類
     * @throws Exception
     */
    public void startServer() throws Exception {
        // netty基本操作,兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();
        try{
            //netty的啟動類
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    //記錄日志的handler,netty自帶的
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .option(ChannelOption.SO_BACKLOG,1024*1024*10)
                    //設置handler
                    .childHandler(new ChannelInitializer< SocketChannel >(){
                        @Override
      protected void initChannel(SocketChannel socketChannel) throws Exception {
         ChannelPipeline pipeline = socketChannel.pipeline();
   //websocket協議本身是基於Http協議的,所以需要Http解碼器
  pipeline.addLast("http-codec",new HttpServerCodec());
  //以塊的方式來寫的處理器
  pipeline.addLast("http-chunked",new ChunkedWriteHandler());
  //netty是基於分段請求的,HttpObjectAggregator的作用是將請求分段再聚合,參數是聚合字節的最大長度
  pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*1024));
  //這個是websocket的handler,是netty提供的,也可以自定義,建議就用默認的
  pipeline.addLast(new WebSocketServerProtocolHandler("/hello",null,true,65535));
  //自定義的handler,處理服務端傳來的消息
  pipeline.addLast(new WebSocketHandle());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }

    }
}

  在服務端類中需要注意的是使用的handler,一共有5個,前面三個都是HTTP的編解碼器,WebSocketServerProtocolHandler則是websocket的handler,這個的作用主要是用來解決HTTP握手等問題。雖然可以自己實現,但是推薦采用這個默認的handler,它能夠解決很多未知的問題。

  3.自定義的業務處理handler

  這里最主要的地方就是消息推送,其實只要你把IP存起來,發送消息就會非常簡單。  

/**
 * 自定義的handler類
 */
@Configuration
public class WebSocketHandle extends SimpleChannelInboundHandler<Object> {

    //客戶端組
    public  static ChannelGroup channelGroup;

    static {
        channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }
    //存儲ip和channel的容器
    private static ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>();



    /**
     * Handler活躍狀態,表示連接成功
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("與客戶端連接成功");
        channelGroup.add(ctx.channel());
    }

    /**
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //文本消息
        if (msg instanceof TextWebSocketFrame) {
            //第一次連接成功后,給客戶端發送消息
            sendMessageAll();
            //獲取當前channel綁定的IP地址
            InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
            String address = ipSocket.getAddress().getHostAddress();
            System.out.println("address為:"+address);
            //將IP和channel的關系保存
            if (!channelMap.containsKey(address)){
                channelMap.put(address,ctx.channel());
            }

        }
        //二進制消息
        if (msg instanceof BinaryWebSocketFrame) {
            System.out.println("收到二進制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes());
            BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("hello".getBytes()));
            //給客戶端發送的消息
            ctx.channel().writeAndFlush(binaryWebSocketFrame);
        }
        //ping消息
        if (msg instanceof PongWebSocketFrame) {
            System.out.println("客戶端ping成功");
        }
        //關閉消息
        if (msg instanceof CloseWebSocketFrame) {
            System.out.println("客戶端關閉,通道關閉");
            Channel channel = ctx.channel();
            channel.close();
        }
    }

    /**
     * 未注冊狀態
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("等待連接");
    }

    /**
     * 非活躍狀態,沒有連接遠程主機的時候。
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端關閉");
        channelGroup.remove(ctx.channel());
    }


    /**
     * 異常處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("連接異常:"+cause.getMessage());
        ctx.close();
    }


    /**
     * 給指定用戶發內容
     * 后續可以掉這個方法推送消息給客戶端
     */
    public void  sendMessage(String address){
        Channel channel=channelMap.get(address);
        String message="你好,這是指定消息發送";
        channel.writeAndFlush(new TextWebSocketFrame(message));
    }

    /**
     * 群發消息
     */
    public void  sendMessageAll(){
        String meesage="這是群發信息";
        channelGroup.writeAndFlush(new TextWebSocketFrame(meesage));
    }

}

 

 

  因為我們采用了websocket自帶的handler,所以不需要我自己再去解決HTTP握手的問題,我們只需要對客戶端發送過來的數據進行轉換和業務處理。

  至此,服務端的代碼就已經完成了。

二、客戶端模塊

  客戶端模塊中,就可以有多種實現了。可以采用JS實現網頁版的聊天工具,也可以在安卓端實現客戶端。這里使用的java實現一個客戶端。

  1.客戶端類

/**
 * 基於websocket的netty客戶端
 *
 */
public class ClientByNetty {

    public static void main(String[] args) throws Exception {
        //netty基本操作,線程組
        EventLoopGroup group = new NioEventLoopGroup();
        //netty基本操作,啟動類
        Bootstrap boot = new Bootstrap();
        boot.option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .group(group)
                .handler(new LoggingHandler(LogLevel.INFO))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast("http-codec",new HttpClientCodec());
                        pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*10));
                        pipeline.addLast("hookedHandler", new WebSocketClientHandler());
                    }
                });
        //websocke連接的地址,/hello是因為在服務端的websockethandler設置的
        URI websocketURI = new URI("ws://localhost:8899/hello");
        HttpHeaders httpHeaders = new DefaultHttpHeaders();
        //進行握手
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
       //客戶端與服務端連接的通道,final修飾表示只會有一個
        final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
        WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("hookedHandler");
        handler.setHandshaker(handshaker);
        handshaker.handshake(channel);
        //阻塞等待是否握手成功
        handler.handshakeFuture().sync();
        System.out.println("握手成功");
        //給服務端發送的內容,如果客戶端與服務端連接成功后,可以多次掉用這個方法發送消息
        sengMessage(channel);
    }

    public static void sengMessage(Channel channel){
        //發送的內容,是一個文本格式的內容
        String putMessage="你好,我是客戶端";
        TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
        channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("消息發送成功,發送的消息是:"+putMessage);
                } else {
                    System.out.println("消息發送失敗 " + channelFuture.cause().getMessage());
                }
            }
        });
    }

}

  客戶端代碼中需要注意很多地方:

  (1) 客戶端的http編解碼器是HttpClientCodec與服務端是不一樣的,服務端是HttpServerCodec

  (2) URI中的地址用的websocket的協議 ws:,而/hello則是服務端設置的通道地址,類似於HTTP的接口地址

  (3) 當客戶端與服務端連接成功后,就可以通過調用sengMessage方法給服務端發送消息,只要這個連接沒有斷開就能夠一直發

  (4) 調用發送消息的方法,一定要等待握手成功后發送

  2.客戶端的業務handler

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    //握手的狀態信息
    WebSocketClientHandshaker handshaker;
    //netty自帶的異步處理
    ChannelPromise handshakeFuture;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("當前握手的狀態"+this.handshaker.isHandshakeComplete());
        Channel ch = ctx.channel();
        FullHttpResponse response;
        //進行握手操作
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse)msg;
                //握手協議返回,設置結束握手
                this.handshaker.finishHandshake(ch, response);
                //設置成功
                this.handshakeFuture.setSuccess();
                System.out.println("服務端的消息"+response.headers());
            } catch (WebSocketHandshakeException var7) {
                FullHttpResponse res = (FullHttpResponse)msg;
                String errorMsg = String.format("握手失敗,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            response = (FullHttpResponse)msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else {
            //接收服務端的消息
            WebSocketFrame frame = (WebSocketFrame)msg;
            //文本信息
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                System.out.println("客戶端接收的消息是:"+textFrame.text());
            }
            //二進制信息
            if (frame instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
                System.out.println("BinaryWebSocketFrame");
            }
            //ping信息
            if (frame instanceof PongWebSocketFrame) {
                System.out.println("WebSocket Client received pong");
            }
            //關閉消息
            if (frame instanceof CloseWebSocketFrame) {
                System.out.println("receive close frame");
                ch.close();
            }

        }
    }

    /**
     * Handler活躍狀態,表示連接成功
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("與服務端連接成功");
    }

    /**
     * 非活躍狀態,沒有連接遠程主機的時候。
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("主機關閉");
    }

    /**
     * 異常處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("連接異常:"+cause.getMessage());
        ctx.close();
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.handshakeFuture = ctx.newPromise();
    }

    public WebSocketClientHandshaker getHandshaker() {
        return handshaker;
    }

    public void setHandshaker(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelPromise getHandshakeFuture() {
        return handshakeFuture;
    }

    public void setHandshakeFuture(ChannelPromise handshakeFuture) {
        this.handshakeFuture = handshakeFuture;
    }

    public ChannelFuture handshakeFuture() {
        return this.handshakeFuture;
    }
}

 

  在handler中,我們可以驗證握手是否成功,就使用handshaker.isHandshakeComplete()的方法,如果false就表示握手失敗。如果是握手失敗客戶端就無法接收服務端的消息,所以如果當你要驗證消息是否成功到達客戶端的時候,可以采用這個方法。

  運行結果,先運行服務端,再運行客戶端:

  服務端界面

 

 

  客戶端界面:

 

 

 

 

  

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM