Netty 搭建 WebSocket 服務端


一、編碼器、解碼器

... ...

@Autowired
private HttpRequestHandler httpRequestHandler;
@Autowired
private TextWebSocketFrameHandler textWebSocketFrameHandler;

... ...

.childHandler(new ChannelInitializer<SocketChannel> () {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // WebSocket 是基於 Http 協議的,要使用 Http 解編碼器
        channel.pipeline().addLast("http-codec", new HttpServerCodec());
        // 用於大數據流的分區傳輸
        channel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
        // 將多個消息轉換為單一的 request 或者 response 對象,最終得到的是 FullHttpRequest 對象
        channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        // 創建 WebSocket 之前會有唯一一次 Http 請求 (Header 中包含 Upgrade 並且值為 websocket)
        channel.pipeline().addLast("http-request",httpRequestHandler);
        // 處理所有委托管理的 WebSocket 幀類型以及握手本身
        // 入參是 ws://server:port/context_path 中的 contex_path
        channel.pipeline().addLast("websocket-server", new WebSocketServerProtocolHandler(socketUri));
        // WebSocket RFC 定義了 6 種幀,TextWebSocketFrame 是我們唯一真正需要處理的幀類型
        channel.pipeline().addLast("text-frame",textWebSocketFrameHandler);
    }
});

... ...

其中 HttpRequestHandler 和 TextWebSocketFrameHandler 是自定義 Handler

1.1 HttpRequestHandler

@Component
@ChannelHandler.Sharable
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestHandler.class);

    @Value("${server.socket-uri}")
    private String socketUri;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        if (msg.uri().startsWith(socketUri)) {
            String userId = UriUtil.getParam(msg.uri(), "userId");
            if (userId != null) {
                // todo: 用戶校驗,重復登錄判斷
                ChannelSupervise.addChannel(userId, ctx.channel());
                ctx.fireChannelRead(msg.setUri(socketUri).retain());
            } else {
                ctx.close();
            }
        } else {
            ctx.close();
        }
    }

}

1.2 TextWebSocketFrameHandler

@Component
@ChannelHandler.Sharable
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TextWebSocketFrameHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            ctx.pipeline().remove(HttpRequestHandler.class);
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String requestMsg = msg.text();
        String responseMsg = "服務端接收客戶端消息:" + requestMsg;
        TextWebSocketFrame resp = new TextWebSocketFrame(responseMsg);
        ctx.writeAndFlush(resp.retain());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        LOGGER.error(ctx.channel().id().asShortText(), cause);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        ChannelSupervise.removeChannel(ctx.channel());
        LOGGER.info("[%s]斷開連接", ctx.channel().id().asShortText());
    }
}

二、主動向客戶端推送消息

2.1 推送工具類

public class ChannelSupervise {

    private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static ConcurrentMap<String, ChannelId> UserChannelMap = new ConcurrentHashMap();
    private static ConcurrentMap<String, String> ChannelUserMap = new ConcurrentHashMap();

    public static void addChannel(String userId, Channel channel){
        GlobalGroup.add(channel);
        UserChannelMap.put(userId, channel.id());
        ChannelUserMap.put(channel.id().asShortText(), userId);
    }
    public static void removeChannel(Channel channel){
        GlobalGroup.remove(channel);
        String userId = ChannelUserMap.get(channel.id().asShortText());
        UserChannelMap.remove(userId);
        ChannelUserMap.remove(channel.id().asShortText());
    }
    public static void sendToUser(String userId, String msg){
        TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(msg);
        Channel channel = GlobalGroup.find(UserChannelMap.get(userId));
        channel.writeAndFlush(textWebSocketFrame);
    }
    public static void sendToAll(String msg){
        TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(msg);
        GlobalGroup.writeAndFlush(textWebSocketFrame);
    }
}

支持向具體某個客戶端發送消息,或者群發消息

2.2 推送接口

@RestController
public class WebsocketController {
    @RequestMapping("sendToAll")
    public void sendToAll(String msg) {
        ChannelSupervise.sendToAll(msg);
    }

    @RequestMapping("sendToUser")
    public void sendToUser(String userId, String msg) {
        ChannelSupervise.sendToUser(userId, msg);
    }
}

三、測試

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>WebSocket客戶端</title>
    </head>
    <body>

        <script type="text/javascript">

            var socket;

            function connect(){
                var userId = document.getElementById('userId').value;
                if(window.WebSocket){
                    // 參數就是與服務器連接的地址
                    // socket = new WebSocket('ws://localhost:8081/ws');
                    socket = new WebSocket('ws://localhost:8081/ws?userId=' + userId);


                    // 客戶端收到服務器消息的時候就會執行這個回調方法
                    socket.onmessage = function (event) {
                        var response = document.getElementById('response');
                        response.innerHTML = response.innerHTML 
                            + '<p style="color:LimeGreen;"> 接收:' + event.data + '</p>';
                    }

                    // 連接建立的回調函數
                    socket.onopen = function(event){
                        var status = document.getElementById('status');
                        status.innerHTML = '<p style="color:YellowGreen;">WebSocket 連接開啟</p>';
                    }

                    // 連接斷掉的回調函數
                    socket.onclose = function (event) {
                        var status = document.getElementById('status');
                        status.innerHTML = '<p style="color:Red;">WebSocket 連接關閉</p>';
                    }
                }else{
                    var status = document.getElementById('status');
                    status.innerHTML = '<p style="color:Red;">瀏覽器不支持 WebSocket</p>';
                }
            }

            // 發送數據
            function send(message){
                if(!window.WebSocket){
                    return;
                }

                var ta = document.getElementById('response');
                ta.innerHTML = ta.innerHTML + '<p style="color:SkyBlue;"> 發送:' + message + '</p>';

                // 當websocket狀態打開
                if(socket.readyState == WebSocket.OPEN){
                    socket.send(message);
                }else{
                    var response = document.getElementById("response");
                    response.innerHTML = '<p style="color:Red;">連接沒有開啟</p>';
                }
            }
        </script>

        <form onsubmit="return false">
            <label for="userId">用戶ID:</label>
            <input type="text" name="userId" id="userId" />
            <input type ="button" value="連接服務器" onclick="connect();">
        </form>

        <div id ="status"></div>

        <form onsubmit="return false">
            <input name = "message" style="width: 200px;"></input>
            <input type ="button" value="發送消息" onclick="send(this.form.message.value);">
        </form>

        <div id ="response"></div>

        <input type="button" onclick="javascript:document.getElementById('response').innerHTML=''" value="清空消息">
    </body>
</html>

注意

因為自定義 Handler 使用依賴注入實例化,所以需要添加 @ChannelHandler.Sharable 注解,否則會報錯:is not a @Sharable handler, so can’t be added or removed multiple times.

參考

  1. 微言Netty:分布式服務框架
  2. 基於netty搭建websocket,實現消息的主動推送
  3. Netty筆記之六:Netty對websocket的支持
  4. 使用Netty做WebSocket服務端

完整代碼:GitHub


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM