SpringBoot2.0集成WebSocket,實現后台向前端推送信息


鏈接:https://blog.csdn.net/moshowgame/article/details/80275084
SpringBoot+WebSocket集成

什么是WebSocket?

這里寫圖片描述
WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。

為什么需要 WebSocket?

初次接觸 WebSocket 的人,都會問同樣的問題:我們已經有了 HTTP 協議,為什么還需要另一個協議?它能帶來什么好處?

  • 答案很簡單,因為 HTTP 協議有一個缺陷:通信只能由客戶端發起,HTTP 協議做不到服務器主動向客戶端推送信息。
    這里寫圖片描述
    舉例來說,我們想要查詢當前的排隊情況,只能是頁面輪詢向服務器發出請求,服務器返回查詢結果。輪詢的效率低,非常浪費資源(因為必須不停連接,或者 HTTP 連接始終打開)。因此WebSocket 就是這樣發明的。
  • 前言

    2020-10-20 教程補充:

    • 補充關於@Component@ServerEndpoint關於是否單例模式等的解答,感謝大家熱心提問和研究。
    • Vue版本的websocket連接方法

    2020-01-05 教程補充:

    感謝大家的支持和留言,14W訪問量是滿滿的動力!接下來還會有websocket+redis集群優化篇針對多ws服務器做簡單優化處理,敬請期待!

    話不多說,馬上進入干貨時刻。

    maven依賴

    SpringBoot2.0對WebSocket的支持簡直太棒了,直接就有包可以引入

    <dependency>  
               <groupId>org.springframework.boot</groupId>  
               <artifactId>spring-boot-starter-websocket</artifactId>  
           </dependency>

    WebSocketConfig

    啟用WebSocket的支持也是很簡單,幾句代碼搞定

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
     * 開啟WebSocket支持
     * @author zhengkai.blog.csdn.net
     */
    @Configuration  
    public class WebSocketConfig {  
        
        @Bean  
        public ServerEndpointExporter serverEndpointExporter() {  
            return new ServerEndpointExporter();  
        }  
      
    }

    WebSocketServer

    這就是重點了,核心都在這里。

    1. 因為WebSocket是類似客戶端服務端的形式(采用ws協議),那么這里的WebSocketServer其實就相當於一個ws協議的Controller

    2. 直接@ServerEndpoint("/imserver/{userId}")@Component啟用即可,然后在里面實現@OnOpen開啟連接,@onClose關閉連接,@onMessage接收消息等方法。

    3. 新建一個ConcurrentHashMap webSocketMap 用於接收當前userId的WebSocket,方便IM之間對userId進行推送消息。單機版實現到這里就可以。

    4. 集群版(多個ws節點)還需要借助mysql或者redis等進行處理,改造對應的sendMessage方法即可。

    package com.softdev.system.demo.config;
    
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.stereotype.Component;
    import cn.hutool.log.Log;
    import cn.hutool.log.LogFactory;
    
    
    /**
     * @author zhengkai.blog.csdn.net
     */
    @ServerEndpoint("/imserver/{userId}")
    @Component
    public class WebSocketServer {
    
        static Log log=LogFactory.get(WebSocketServer.class);
        /**靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。*/
        private static int onlineCount = 0;
        /**concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。*/
        private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
        /**與某個客戶端的連接會話,需要通過它來給客戶端發送數據*/
        private Session session;
        /**接收userId*/
        private String userId="";
    
        /**
         * 連接建立成功調用的方法*/
        @OnOpen
        public void onOpen(Session session,@PathParam("userId") String userId) {
            this.session = session;
            this.userId=userId;
            if(webSocketMap.containsKey(userId)){
                webSocketMap.remove(userId);
                webSocketMap.put(userId,this);
                //加入set中
            }else{
                webSocketMap.put(userId,this);
                //加入set中
                addOnlineCount();
                //在線數加1
            }
    
            log.info("用戶連接:"+userId+",當前在線人數為:" + getOnlineCount());
    
            try {
                sendMessage("連接成功");
            } catch (IOException e) {
                log.error("用戶:"+userId+",網絡異常!!!!!!");
            }
        }
    
        /**
         * 連接關閉調用的方法
         */
        @OnClose
        public void onClose() {
            if(webSocketMap.containsKey(userId)){
                webSocketMap.remove(userId);
                //從set中刪除
                subOnlineCount();
            }
            log.info("用戶退出:"+userId+",當前在線人數為:" + getOnlineCount());
        }
    
        /**
         * 收到客戶端消息后調用的方法
         *
         * @param message 客戶端發送過來的消息*/
        @OnMessage
        public void onMessage(String message, Session session) {
            log.info("用戶消息:"+userId+",報文:"+message);
            //可以群發消息
            //消息保存到數據庫、redis
            if(StringUtils.isNotBlank(message)){
                try {
                    //解析發送的報文
                    JSONObject jsonObject = JSON.parseObject(message);
                    //追加發送人(防止串改)
                    jsonObject.put("fromUserId",this.userId);
                    String toUserId=jsonObject.getString("toUserId");
                    //傳送給對應toUserId用戶的websocket
                    if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                        webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                    }else{
                        log.error("請求的userId:"+toUserId+"不在該服務器上");
                        //否則不在這個服務器上,發送到mysql或者redis
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        /**
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("用戶錯誤:"+this.userId+",原因:"+error.getMessage());
            error.printStackTrace();
        }
        /**
         * 實現服務器主動推送
         */
        public void sendMessage(String message) throws IOException {
            this.session.getBasicRemote().sendText(message);
        }
    
    
        /**
         * 發送自定義消息
         * */
        public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
            log.info("發送消息到:"+userId+",報文:"+message);
            if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
                webSocketMap.get(userId).sendMessage(message);
            }else{
                log.error("用戶"+userId+",不在線!");
            }
        }
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        public static synchronized void addOnlineCount() {
            WebSocketServer.onlineCount++;
        }
    
        public static synchronized void subOnlineCount() {
            WebSocketServer.onlineCount--;
        }
    }

    消息推送

    至於推送新信息,可以再自己的Controller寫個方法調用WebSocketServer.sendInfo();即可

    import com.softdev.system.demo.config.WebSocketServer;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.servlet.ModelAndView;
    import java.io.IOException;
    
    /**
     * WebSocketController
     * @author zhengkai.blog.csdn.net
     */
    @RestController
    public class DemoController {
    
        @GetMapping("index")
        public ResponseEntity<String> index(){
            return ResponseEntity.ok("請求成功");
        }
    
        @GetMapping("page")
        public ModelAndView page(){
            return new ModelAndView("websocket");
        }
    
        @RequestMapping("/push/{toUserId}")
        public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {
            WebSocketServer.sendInfo(message,toUserId);
            return ResponseEntity.ok("MSG SEND SUCCESS");
        }
    }

    頁面發起

    頁面用js代碼調用websocket,當然,太古老的瀏覽器是不行的,一般新的瀏覽器或者谷歌瀏覽器是沒問題的。還有一點,記得協議是ws的,如果使用了一些路徑類,可以replace(“http”,“ws”)來替換協議。

    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8">
        <title>websocket通訊</title>
    </head>
    <script src="js/jquery-3.3.1.min.js" type="text/javascript"></script>
    <script>
        var socket;
        var contentText = "";
        function openSocket() {
            if(typeof(WebSocket) == "undefined") {
                console.log("您的瀏覽器不支持WebSocket");
            }else{
                console.log("您的瀏覽器支持WebSocket");
                //實現化WebSocket對象,指定要連接的服務器地址與端口  建立連接
                //等同於socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
                //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
                var socketUrl="http://localhost:8082/3dserver/imserver/"+$("#userId").val();
                socketUrl=socketUrl.replace("https","ws").replace("http","ws");
                console.log(socketUrl);
                if(socket!=null){
                    socket.close();
                    socket=null;
                }
                socket = new WebSocket(socketUrl);
                //打開事件
                socket.onopen = function() {
                    console.log("websocket已打開");
                    alert("websocket已連接");
                    //socket.send("這是來自客戶端的消息" + location.href + new Date());
                };
                //獲得消息事件
                socket.onmessage = function(msg) {
                    console.log(msg.data);
                    contentText = contentText + $.parseJSON(msg.data).contentText + "\n";
                    //發現消息進入    開始處理前端觸發邏輯
                    $("#textareaId").val(contentText);
                };
                //關閉事件
                socket.onclose = function() {
                    console.log("websocket已關閉");
                };
                //發生了錯誤事件
                socket.onerror = function() {
                    console.log("websocket發生了錯誤");
                }
            }
        }
        function sendMessage() {
            if(typeof(WebSocket) == "undefined") {
                console.log("您的瀏覽器不支持WebSocket");
            }else {
                console.log("您的瀏覽器支持WebSocket");
                console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
                socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            }
        }
    </script>
    <body>
        <p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
        <p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
        <p>【發送內容】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
        <p><textarea id="textareaId" rows="10" cols="30" ></textarea></p>
        <p>【操作】:<input type="submit" onclick="openSocket()" value="開啟socket"></p>
        <p>【操作】:<input type="submit" onclick="sendMessage()" value="發送消息"></p>
    </body>
    
    </html>

    運行效果

    • v20200105,加入開源項目spring-cloud-study-websocket,更新運行效果,更方便理解。
    • v1.1的效果,剛剛修復了日志,並且支持指定監聽某個端口,代碼已經全部更新,現在是這樣的效果
    1. 打開兩個頁面,按F12調出控控制台查看測試效果:


    image

    分別開啟socket,再發送消息

    在這里插入圖片描述

    在這里插入圖片描述

    向前端推送數據:

    在這里插入圖片描述


    通過調用push api,可以向指定的userId推送信息,當然報文這里亂寫,建議規定好格式。

    后續

    針對簡單IM的業務場景,進行了一些優化,可以看后續的文章SpringBoot2+WebSocket之聊天應用實戰(優化版本)(v20201005已整合)

    主要變動是CopyOnWriteArraySet改為ConcurrentHashMap,保證多線程安全同時方便利用map.get(userId)進行推送到指定端口。

    相比之前的Set,Set遍歷是費事且麻煩的事情,而Map的get是簡單便捷的,當WebSocket數量大的時候,這個小小的消耗就會聚少成多,影響體驗,所以需要優化。在IM的場景下,指定userId進行推送消息更加方便。

    Websocker注入Bean問題

    關於這個問題,可以看最新發表的這篇文章,在參考和研究了網上一些攻略后,項目已經通過該方法注入成功,大家可以參考。
    關於controller調用controller/service調用service/util調用service/websocket中autowired的解決方法

    netty-websocket-spring-boot-starter

    Springboot2構建基於Netty的高性能Websocket服務器(netty-websocket-spring-boot-starter)
    只需要換個starter即可實現高性能websocket,趕緊使用吧

    Springboot2+Netty+Websocket

    Springboot2+Netty實現Websocket,使用官方的netty-all的包,比原生的websocket更加穩定更加高性能,同等配置情況下可以handle更多的連接。

    代碼樣式全部已經更正,也支持websocket連接url帶參數功能,另外也感謝大家的閱讀和評論,一起進步,謝謝!~~

    ServerEndpointExporter錯誤

    org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘serverEndpointExporter’ defined in class path resource [com/xxx/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

    感謝@來了老弟兒 的反饋:

    如果tomcat部署一直報這個錯,請移除 WebSocketConfig@Bean ServerEndpointExporter 的注入 。

    ServerEndpointExporter 是由Spring官方提供的標准實現,用於掃描ServerEndpointConfig配置類和@ServerEndpoint注解實例。使用規則也很簡單:

    1. 如果使用默認的嵌入式容器 比如Tomcat 則必須手工在上下文提供ServerEndpointExporter
    2. 如果使用外部容器部署war包,則不需要提供提供ServerEndpointExporter,因為此時SpringBoot默認將掃描服務端的行為交給外部容器處理,所以線上部署的時候要把WebSocketConfig中這段注入bean的代碼注掉。

    正式項目的前端WebSocket框架 GoEasy

    感謝kkatrina的補充,正式的項目中,一般是用第三方websocket框架來做,穩定性、實時性有保證的多,也會包括一些心跳、重連機制。

    GoEasy專注於服務器與瀏覽器,瀏覽器與瀏覽器之間消息推送,完美兼容世界上的絕大多數瀏覽器,包括IE6, IE7之類的非常古老的瀏覽器。支持Uniapp,各種小程序,react,vue等所有主流Web前端技術。
    GoEasy采用 發布/訂閱 的消息模式,幫助您非常輕松的實現一對一,一對多的通信。
    https://www.goeasy.io/cn/doc/

    @Component@ServerEndpoint關於是否單例模式,能否使用static Map等一些問題的解答

    看到大家都在熱心的討論關於是否單例模式這個問題,請大家相信自己的直接,如果websocket是單例模式,還怎么服務這么多session呢。

    1. websocket是原型模式@ServerEndpoint每次建立雙向通信的時候都會創建一個實例,區別於spring的單例模式。
    2. Spring的@Component默認是單例模式,請注意,默認 而已,是可以被改變的。
    3. 這里的@Component僅僅為了支持@Autowired依賴注入使用,如果不加則不能注入任何東西,為了方便。
    4. 什么是prototype 原型模式? 基本就是你需要從A的實例得到一份與A內容相同,但是又互不干擾的實例B的話,就需要使用原型模式。
    5. 關於在原型模式下使用static 的webSocketMap,請注意這是ConcurrentHashMap ,也就是線程安全/線程同步的,而且已經是靜態變量作為全局調用,這種情況下是ok的,或者大家如果有顧慮或者更好的想法的化,可以進行改進。 例如使用一個中間類來接收和存放session。
    6. 為什么每次都@OnOpen都要檢查webSocketMap.containsKey(userId) ,首先了為了代碼強壯性考慮,假設代碼以及機制沒有問題,那么肯定這個邏輯是廢的對吧。但是實際使用的時候發現偶爾會出現重連失敗或者其他原因導致之前的session還存在,這里就做了一個清除舊session,迎接新session的功能。

    Vue版本的websocket連接

    感謝**@GzrStudy**的貢獻,供大家參考。

    <script>
    export default {
        data() {
            return {
                socket:null,
                userId:localStorage.getItem("ms_uuid"),
                toUserId:'2',
                content:'3'
            }
        },
      methods: {
        openSocket() {
          if (typeof WebSocket == "undefined") {
            console.log("您的瀏覽器不支持WebSocket");
          } else {
            console.log("您的瀏覽器支持WebSocket");
            //實現化WebSocket對象,指定要連接的服務器地址與端口  建立連接
            //等同於socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
            //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
            var socketUrl =
              "http://localhost:8081/imserver/" + this.userId;
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            console.log(socketUrl);
            if (this.socket != null) {
              this.socket.close();
              this.socket = null;
            }
            this.socket = new WebSocket(socketUrl);
            //打開事件
            this.socket = new WebSocket(socketUrl);
            //打開事件
            this.socket.onopen = function() {
              console.log("websocket已打開");
              //socket.send("這是來自客戶端的消息" + location.href + new Date());
            };
            //獲得消息事件
            this.socket.onmessage = function(msg) {
              console.log(msg.data);
              //發現消息進入    開始處理前端觸發邏輯
            };
            //關閉事件
            this.socket.onclose = function() {
              console.log("websocket已關閉");
            };
            //發生了錯誤事件
            this.socket.onerror = function() {
              console.log("websocket發生了錯誤");
            };
          }
        },
        sendMessage() {
          if (typeof WebSocket == "undefined") {
            console.log("您的瀏覽器不支持WebSocket");
          } else {
            console.log("您的瀏覽器支持WebSocket");
            console.log(
              '{"toUserId":"' +
                 this.toUserId +
                '","contentText":"' +
                 this.content +
                '"}'
            );
            this.socket.send(
              '{"toUserId":"' +
                 this.toUserId +
                '","contentText":"' +
                 this.content +
                '"}'
             );
        
        }
    }



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM