一款基於Netty開發的WebSocket服務器


代碼地址如下:
http://www.demodashi.com/demo/13577.html

一款基於Netty開發的WebSocket服務器

這是一款基於Netty框架開發的服務端,通信協議為WebSocket。主要用於Java后台服務器向瀏覽器進行消息推送。

需求

對於一個Web項目而言,客戶端一般均為各種各樣的瀏覽器,如何從后端服務器向瀏覽器客戶端進行消息推送,便成了一個棘手的問題,好在在HTTP1.1之后,HTTP可以支持長連接,由此,我在Netty框架的基礎上開發了這個WebSocket服務端。

當然,你依舊可以下載源碼進行測試,集成,二次開發等等。

環境

  • Intellij IDEA2018
  • JDK 1.8
  • 插件:Simple WebSocket Client 0.1.3
    • FireFox Quantum 60.0.1 (64 位)
    • Google Chrome 67.0.3396.99(正式版本)(64 位)

運行結果

請看下圖:

推送結果圖

實現步驟及源碼

WebSocket

WebSocket是HTML5開始提供的一種瀏覽器與服務器間進行全雙工通訊的網絡技術。 WebSocket通信協議於2011年被IETF定為標准RFC 6455,WebSocketAPI被W3C定為標准。 在WebSocket API中,瀏覽器和服務器只需要要做一個握手的動作,然后,瀏覽器和服務器之間就形成了一條快速通道。兩者之間就直接可以數據互相傳送。

目錄結構

目錄結構

核心類講解

  • WebSocketChildHandler
  • WebSocketServerHandler

WebSocketChildHandler

這個類的主要作用就是為Netty的通道注冊事件。其核心代碼見下,其中webSocketUrl就是客戶端與服務端進行連接的請求路徑,我將其寫入了配置文件,交由Spring管理,以注入的方式傳遞到WebSocketServerHandler中。

ChannelPipeline pipeline = socketChannel.pipeline();
        // 將請求與應答消息編碼或者解碼為HTTP消息
        pipeline.addLast("http-codec", new HttpServerCodec());
        // 將http消息的多個部分組合成一條完整的HTTP消息
        pipeline.addLast("aggregator", new HttpObjectAggregator(HttpObjectConstant.MAX_CONTENT_LENGTH));
        // 向客戶端發送HTML5文件。主要用於支持瀏覽器和服務端進行WebSocket通信
        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
        // 服務端Handler
        pipeline.addLast("handler", new WebSocketServerHandler(webSocketUrl));

WebSocketServerHandler

這個類是真正的核心類,這個類的主要功能為:

  • 進行第一次握手
  • 對消息進行處理
    • 可以實現點對點通信
    • 可以實現廣播功能
    • 可以實現點對端通信
/**
     * 接收客戶端發送的消息
     *
     * @param channelHandlerContext ChannelHandlerContext
     * @param receiveMessage        消息
     */
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object receiveMessage) throws Exception {
        // 傳統http接入 第一次需要使用http建立握手
        if (receiveMessage instanceof FullHttpRequest) {
            FullHttpRequest fullHttpRequest = (FullHttpRequest) receiveMessage;
            LOGGER.info("├ [握手]: {}", fullHttpRequest.uri());
            // 握手
            handlerHttpRequest(channelHandlerContext, fullHttpRequest);
            // 發送連接成功給客戶端
            channelHandlerContext.channel().write(new TextWebSocketFrame("連接成功"));
        }
        // WebSocket接入
        else if (receiveMessage instanceof WebSocketFrame) {
            WebSocketFrame webSocketFrame = (WebSocketFrame) receiveMessage;
            handlerWebSocketFrame(channelHandlerContext, webSocketFrame);
        }
    }

    /**
     * 第一次握手
     *
     * @param channelHandlerContext channelHandlerContext
     * @param req                   請求
     */
    private void handlerHttpRequest(ChannelHandlerContext channelHandlerContext, FullHttpRequest req) {
        // 構造握手響應返回,本機測試
        WebSocketServerHandshakerFactory wsFactory
                = new WebSocketServerHandshakerFactory(webSocketUrl, Constant.NULL, Constant.FALSE);
        // region 從連接路徑中截取連接用戶名
        String uri = req.uri();
        int i = uri.lastIndexOf("/");
        String userName = uri.substring(i + 1, uri.length());
        // endregion
        Channel connectChannel = channelHandlerContext.channel();
        // 加入在線用戶
        WebSocketUsers.put(userName, connectChannel);
        socketServerHandShaker = wsFactory.newHandshaker(req);
        if (socketServerHandShaker == null) {
            // 發送版本錯誤
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(connectChannel);
        } else {
            // 握手響應
            socketServerHandShaker.handshake(connectChannel, req);
        }
    }

    /**
     * webSocket處理邏輯
     *
     * @param channelHandlerContext channelHandlerContext
     * @param frame                 webSocketFrame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame) throws IOException {
        Channel channel = channelHandlerContext.channel();
        // region 判斷是否是關閉鏈路的指令
        if (frame instanceof CloseWebSocketFrame) {
            LOGGER.info("├ 關閉與客戶端[{}]鏈接", channel.remoteAddress());
            socketServerHandShaker.close(channel, (CloseWebSocketFrame) frame.retain());
            return;
        }
        // endregion
        // region 判斷是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            LOGGER.info("├ [Ping消息]");
            channel.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // endregion
        // region 純文本消息
        if (frame instanceof TextWebSocketFrame) {
            String text = ((TextWebSocketFrame) frame).text();
            LOGGER.info("├ [{} 接收到客戶端的消息]: {}", new Date(), text);
            channel.write(new TextWebSocketFrame(new Date() + " 服務器將你發的消息原樣返回:" + text));
        }
        // endregion
        // region 二進制消息 此處使用了MessagePack編解碼方式
        if (frame instanceof BinaryWebSocketFrame) {
            BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) frame;
            ByteBuf content = binaryWebSocketFrame.content();
            LOGGER.info("├ [二進制數據]:{}", content);
            final int length = content.readableBytes();
            final byte[] array = new byte[length];
            content.getBytes(content.readerIndex(), array, 0, length);
            MessagePack messagePack = new MessagePack();
            WebSocketMessageEntity webSocketMessageEntity = messagePack.read(array, WebSocketMessageEntity.class);
            LOGGER.info("├ [解碼數據]: {}", webSocketMessageEntity);
            WebSocketUsers.sendMessageToUser(webSocketMessageEntity.getAcceptName(), webSocketMessageEntity.getContent());
        }
        // endregion
    }

至此,服務端算是開發完成。但可以看出,服務端中仍有很大的發展空間,細心的同學可以發現我在第一次握手時,將Channel存儲了起來,對於上述的三種情況也有簡易的實現方案。

如果有必要,我也會將非瀏覽器客戶端代碼(非Js客戶端)寫成例子,共享出來。

一款基於Netty開發的WebSocket服務器

代碼地址如下:
http://www.demodashi.com/demo/13577.html

注:本文著作權歸作者,由demo大師代發,拒絕轉載,轉載需要作者授權


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM