SpringBoot下WebSocket+React例子


1、Java端

1.1、引入SpringBoot的WebSocket包,Maven配置:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

1.2、增加WebSocket配置類

package com.tfe.sell.common.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;


/**
 * 開啟WebSocket支持
 */
@Configuration  
public class WebSocketConfig {  
    
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
} 

1.3、添加WebSocket的服務類

package com.tfe.sell.common.config;

import com.alibaba.fastjson.JSON;
import com.tfe.sell.common.entity.JsonResult;
import com.tfe.sell.common.entity.WebSocketJsonResult;
import com.tfe.sell.common.enumeration.WebSocketMessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint("/admin/websocket/{userid}")
@Component
public class WebSocketServer {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);

    //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。
    private static int onlineCount = 0;
    //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    //與某個客戶端的連接會話,需要通過它來給客戶端發送數據
    private Session session;

    //接收sid
    private String userid = "";
    /**
     * 連接建立成功調用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("userid") String userid) {
        if (!WebSocketServer.contains(userid)){
            this.session = session;
            webSocketSet.add(this);     //加入set中
            addOnlineCount();           //在線數加1
            logger.info("有新窗口開始監聽:" + userid + ",當前在線人數為" + getOnlineCount());
            this.userid = userid;
        }
    }

    public static Boolean contains(String userid){
        for (WebSocketServer item : webSocketSet) {
            if (item.userid.equals(userid)) {
                return true;
            }
        }
        return false;
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();           //在線數減1
        logger.info("有一連接關閉["+ this.userid +"],當前在線人數為" + getOnlineCount());
    }

    /**
     * 收到客戶端消息后調用的方法
     *
     * @param message 客戶端發送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("收到來自窗口" + userid + "的信息:" + message);
        WebSocketJsonResult jsonResult = JSON.parseObject(message,WebSocketJsonResult.class);
        handleMessage(jsonResult);
    }

    public void handleMessage(WebSocketJsonResult jsonResult){
        WebSocketMessageType messageType = WebSocketMessageType.getByCode(jsonResult.getCode());
        if (messageType == null){
            logger.error("傳入的類型不正確",jsonResult.getCode());
            return;
        }
        switch (messageType){
            case clientHeartbeat:
                WebSocketJsonResult newMessage = new WebSocketJsonResult(null,
                        WebSocketMessageType.clientHeartbeatReply.getCode(),
                        WebSocketMessageType.clientHeartbeatReply.getName());
                String jsonString = JSON.toJSONString(newMessage);
                sendInfo(jsonString,userid);
                break;
            default:
                //不做任何事情
                logger.error("傳入的類型沒有對應的處理");
                break;
        }
    }

    /**
     * 
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.error("發生錯誤" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 實現服務器主動推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 群發自定義消息
     * */
    public static void sendInfo(String message,String userid) {
        logger.info("推送消息到窗口" + userid + ",推送內容:" + message);
        for (WebSocketServer item : webSocketSet) {
            try {
                //這里可以設定只推送給這個sid的,為null則全部推送
                if(userid == null) {
                    item.sendMessage(message);
                }else if(item.userid.equals(userid)){
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                logger.error("error:" + e.getMessage());
                continue;
            }
        }
    }

    public static Integer getSize(){
        return webSocketSet.size();
    }

    public static String getUserIdList(){
        StringBuilder sb = new StringBuilder();
        for (WebSocketServer item : webSocketSet) {
            sb.append(item.userid);
            sb.append(",");
        }
        return sb.toString();
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

這個服務類用到一些實體和枚舉:

WebSocketJsonResult:

package com.tfe.sell.common.entity;

/**
 * 
 * JSON模型
 * 
 * 用戶后台向前台返回的JSON對象
 *
 * 
 */
public class WebSocketJsonResult implements java.io.Serializable {

    private static final long serialVersionUID = -1118025395225258944L;

    private String message = "";

    private int code = 1;

    private Object result = null;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object obj) {
        this.result = obj;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public WebSocketJsonResult(){}

    public WebSocketJsonResult(Object result, int code, String message){
        this.result = result;
        this.code = code;
        this.message = message;
    }

    public static WebSocketJsonResult successInstance(){
        return new WebSocketJsonResult(null,1,"成功");
    }

    public static WebSocketJsonResult failInstance(String message){
        return new WebSocketJsonResult(null,-1,message);
    }

}

 

2.React端

2.1、添加WebSocket組件

新建WebSocket目錄,添加Index.js,代碼如下:

/**
 * 參數:[socketOpen|socketClose|socketMessage|socketError] = func,[socket連接成功時觸發|連接關閉|發送消息|連接錯誤]
 * timeout:連接超時時間
 * @type {module.webSocket}
 */
module.exports =  class webSocket {
  constructor(param = {}) {
    this.param = param;
    this.reconnectCount = 0;
    this.socket = null;
    this.taskRemindInterval = null;
    this.isSucces=true;
  }
  connection = () => {
    let {socketUrl, timeout = 0} = this.param;
    // 檢測當前瀏覽器是什么瀏覽器來決定用什么socket
    if ('WebSocket' in window) {
      console.log('WebSocket');

      this.socket = new WebSocket(socketUrl);
    }
    else if ('MozWebSocket' in window) {
      console.log('MozWebSocket');

      this.socket = new MozWebSocket(socketUrl);
    }
    else {
      console.log('SockJS');

      this.socket = new SockJS(socketUrl);
    }
    this.socket.onopen = this.onopen;
    this.socket.onmessage = this.onmessage;
    this.socket.onclose = this.onclose;
    this.socket.onerror = this.onerror;
    this.socket.sendMessage = this.sendMessage;
    this.socket.closeSocket = this.closeSocket;
  };
  // 連接成功觸發
  onopen = () => {
    let {socketOpen} = this.param;
    this.isSucces=false;  //連接成功將標識符改為false
    socketOpen && socketOpen();
  };
  // 后端向前端推得數據
  onmessage = (msg) => {
    let {socketMessage} = this.param;
    socketMessage && socketMessage(msg);
  };
  // 關閉連接觸發
  onclose = (e) => {
    this.isSucces = true;   //關閉將標識符改為true
    this.socket.close();
    let {socketClose} = this.param;
    socketClose && socketClose(e);
  };
  onerror = (e) => {
    // socket連接報錯觸發
    let {socketError} = this.param;
    this.socket = null;
    socketError && socketError(e);
  };
  sendMessage = (value) => {
    // 向后端發送數據
    if(this.socket) {
      this.socket.send(JSON.stringify(value));
    }
  };
  closeSocket = () => {
    this.socket.close();
  };
  //獲得狀態
  readyState = () => {
    return this.socket.readyState;
  }
};

2.2 在需要觸發服務的頁面,例如訂單管理頁面增加WebSocket引用代碼

import WebSocket from "components/WebSocket";
 
         
socket = null;
@observable
webSockethadOpen = false;
//重試次數
websocketConnectedCount = 0;


openWebSocket = () => { let head = constantStore.ROOT_API_URL.replace("https","wss").replace("http","ws"); let url = head + "/admin/websocket/" + userStore.id; this.socket = new WebSocket({ socketUrl: url, socketMessage: (msg) => { var myMessage = msg; var data = myMessage.data; if (data != undefined){ let jsonResult = JSON.parse(data); if (jsonResult.code == 101) { console.info("收到新訂單消息",jsonResult); let orderGroupId = jsonResult.result; this.soundNotice(); Modal.confirm({ title: "新訂單", content: "有新訂單,是否顯示?", okText: "確認", cancelText: "取消", onOk: () => { if(this != undefined){ this.refs["DetailModal"].showModal(orderGroupId); } else{ message.error("請先打開【訂單管理】頁面"); } } }); } else if (jsonResult.code == 401){ //401是后端收到心跳包,並返回的消息類型 console.info("收到后台回復心跳包",jsonResult); this.heartCheck.reset().start(); // 如果獲取到消息,說明連接是正常的,重置心跳檢測 } } }, socketClose: (msg) => { this.webSockethadOpen = false; message.success("訂單監控已關閉"); console.info("訂單監控已關閉"); }, socketError: () => { console.info("連接建立失敗"); this.webSockethadOpen = false; this.websocketConnectedCount++; if(this.websocketConnectedCount <= 5){ message.error('連接建立失敗,嘗試重連'); this.openWebSocket(); } else{ message.error('連接建立失敗,請聯系管理員'); } }, socketOpen: () => { message.success("開始訂單監控"); console.info("開始訂單監控"); this.heartCheck.reset().start(); // 成功建立連接后,重置心跳檢測 } }); //重試創建socket連接 try { this.socket.connection(); } catch (e) { // 捕獲異常,防止js error } this.webSockethadOpen = true; }; closeWebSocket = () => { if (this.webSockethadOpen == true){ this.socket.closeSocket(); } }; soundNotice = () => { var url = "/new_order.mp3"; var audio = new Audio(url); audio.src = url; audio.play(); }; componentDidMount() { //初始化心跳包對象 this.initHeartCheck(); //進入自動開啟監控 this.openWebSocket(); } //初始化心跳包對象 initHeartCheck = () => { let _this = this; // 心跳檢測, 每隔一段時間檢測連接狀態,如果處於連接中,就向server端主動發送消息,來重置server端與客戶端的最大連接時間, // 如果已經斷開了,發起重連。 this.heartCheck = { timeout: 10 * 60 * 1000, //10分鍾發送一次心跳包,輸入毫秒數 serverTimeoutObj: null, reset: function () { if (this.serverTimeoutObj != undefined){ clearTimeout(this.serverTimeoutObj); } return this; }, start: function () { this.serverTimeoutObj = setInterval(function () { if (_this.webSockethadOpen) { if (_this.socket.readyState() == 1) { var message = { "code":301, "message":"發送心跳包" }; console.log("發送心跳包消息",message); _this.socket.sendMessage(message); _this.heartCheck.reset().start(); // 如果獲取到消息,說明連接是正常的,重置心跳檢測 } else { console.log("斷開狀態,嘗試重連"); _this.openWebSocket(); } } }, this.timeout) } }; }; componentWillUnmount(){ console.info("componentWillUnmount"); this.closeWebSocket(); }

<Button type="primary" disabled={this.webSockethadOpen} onClick={() => this.openWebSocket()} >
監聽訂單
</Button>
<Button type="primary" disabled={!this.webSockethadOpen} onClick={() => this.closeWebSocket()}>
關閉監聽
</Button>
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM