Netty實現WebSocket通信


/*
* Netty Reactor模型 -
* 1.單線程模型:一個用戶一個線程來處理,線程有極限
* 2.多線程模型:加入線程池,線程池線程輪詢執行任務
* 3.主從多線程模型:倆個線程池,一個線程池接收請求,一個線程池處理IO(推薦,適用高並發環境)
*
* 以下代碼為主從多線程模型
* */

映入坐標:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
</dependencies>


服務器端:
package cn.web.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/*
* Netty Reactor模型 -
* 1.單線程模型:一個用戶一個線程來處理,線程有極限
* 2.多線程模型:加入線程池,線程池線程輪詢執行任務
* 3.主從多線程模型:倆個線程池,一個線程池接收請求,一個線程池處理IO(推薦,適用高並發環境)
*
* 以下代碼為主從多線程模型
* */
public class WebSocketServer {
    public static void main(String[] args) {
        NioEventLoopGroup mainGrp=new NioEventLoopGroup();//主線程池
        NioEventLoopGroup subGrp=new NioEventLoopGroup();//從線程池

        try {
            //1.創建netty服務器啟動對象
            ServerBootstrap serverBootstrap=new ServerBootstrap();

            //2.初始化
            serverBootstrap
                    //指定使用的線程池
                    .group(mainGrp,subGrp)
                    //指定netty通道類型
                    .channel(NioServerSocketChannel.class)
                    // 指定通道初始化器用來加載當Channel收到事件消息后,
                    // 如何進行業務處理
                    .childHandler(new WebSocketChannelInitializer());

            //3.綁定端口,以同步的方式啟動
            ChannelFuture future=serverBootstrap.bind(9090).sync();

            //4.等待服務關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();

            //5.異常關閉服務器
            mainGrp.shutdownGracefully();
            subGrp.shutdownGracefully();
        }

    }
}
 
         
package cn.web.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;


/**
* 通道初始化器
* 用來加載通道處理器(ChannelHandler)
*/
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

// 初始化通道
// 在這個方法中去加載對應的ChannelHandler
protected void initChannel(SocketChannel socketChannel) throws Exception {

// 獲取管道,將一個一個的ChannelHandler添加到管道中
ChannelPipeline pipeline = socketChannel.pipeline();

// 添加一個http的編解碼器
pipeline.addLast(new HttpServerCodec());
// 添加一個用於支持大數據流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加一個聚合器,這個聚合器主要是將HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(1024*64));
// 需要指定接收請求的路由
// 必須使用以ws后綴結尾的url才能訪問
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加自定義的Handler
pipeline.addLast(new ChatHandler());

// 增加心跳事件支持
// 第一個參數: 讀空閑4秒
// 第二個參數: 寫空閑8秒
// 第三個參數: 讀寫空閑12秒
pipeline.addLast(new IdleStateHandler(4,8,12));
pipeline.addLast(new HearBeatHandler());
}
}
 
package cn.web.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;


public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    //存儲用戶連接
    private static ChannelGroup clients=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private SimpleDateFormat sdf=new SimpleDateFormat("yyyy-mm-dd hh:MM:ss");

    // 當Channel中有新的事件消息會自動調用
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        String text=textWebSocketFrame.text();
        System.out.println("接收到消息:"+text);

        // 獲取客戶端發送過來的文本消息
        for (Channel client :clients) {
            // 將消息發送到所有的客戶端
            client.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date())+":"+text));
        }
    }

    // 當有新的客戶端連接服務器之后,會自動調用這個方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

    // 端口連接處理
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("斷開連接");
        clients.remove(ctx.channel());
    }
}

 

package cn.web.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class HearBeatHandler extends ChannelInboundHandlerAdapter {
    // 客戶端在一定的時間沒有動作就會觸發這個事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event=(IdleStateEvent)evt;

            if (event.state()== IdleState.READER_IDLE){
                System.out.println("讀空閑");
            }else if(event.state()==IdleState.WRITER_IDLE){
                System.out.println("寫空閑");
            }else if(event.state()==IdleState.ALL_IDLE){
                System.out.println("讀寫都空閑你,關閉通道");
                ctx.channel().close();
            }
        }
    }
}

 

HTML頁面連接:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>在線聊天室</title>
</head>
<body>
    <input type="text" id="message">
    <input type="button" value="發送消息" onclick="sendMsg()">

    接收到的消息:
    <p id="server_message" style="background-color: #AAAAAA"></p>

    <script>

        var websocket = null;

        // 判斷當前瀏覽器是否支持websocket
        if(window.WebSocket) {
            websocket = new WebSocket("ws://127.0.0.1:9090/ws");

            websocket.onopen = function() {
                console.log("建立連接.");
            }
            websocket.onclose = function() {
                console.log("斷開連接");
            }
            websocket.onmessage = function(e) {
                console.log("接收到服務器消息:" + e.data);
                var server_message = document.getElementById("server_message");
                server_message.innerHTML += e.data + "<br/>";
            }
        }
        else {
            alert("當前瀏覽器不支持web socket");
        }

        function sendMsg() {
            var message = document.getElementById("message");
            websocket.send(message.value);
        }
    </script>
</body>
</html>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM