1、Java端
1.1、引入SpringBoot的WebSocket包,Maven配置:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.1.0.RELEASE</version> </dependency>
1.2、增加WebSocket配置類
package com.tfe.sell.common.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * 開啟WebSocket支持 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
1.3、添加WebSocket的服務類
package com.tfe.sell.common.config; import com.alibaba.fastjson.JSON; import com.tfe.sell.common.entity.JsonResult; import com.tfe.sell.common.entity.WebSocketJsonResult; import com.tfe.sell.common.enumeration.WebSocketMessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint("/admin/websocket/{userid}") @Component public class WebSocketServer { private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); //與某個客戶端的連接會話,需要通過它來給客戶端發送數據 private Session session; //接收sid private String userid = ""; /** * 連接建立成功調用的方法*/ @OnOpen public void onOpen(Session session,@PathParam("userid") String userid) { if (!WebSocketServer.contains(userid)){ this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在線數加1 logger.info("有新窗口開始監聽:" + userid + ",當前在線人數為" + getOnlineCount()); this.userid = userid; } } public static Boolean contains(String userid){ for (WebSocketServer item : webSocketSet) { if (item.userid.equals(userid)) { return true; } } return false; } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數減1 logger.info("有一連接關閉["+ this.userid +"],當前在線人數為" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) { logger.info("收到來自窗口" + userid + "的信息:" + message); WebSocketJsonResult jsonResult = JSON.parseObject(message,WebSocketJsonResult.class); handleMessage(jsonResult); } public void handleMessage(WebSocketJsonResult jsonResult){ WebSocketMessageType messageType = WebSocketMessageType.getByCode(jsonResult.getCode()); if (messageType == null){ logger.error("傳入的類型不正確",jsonResult.getCode()); return; } switch (messageType){ case clientHeartbeat: WebSocketJsonResult newMessage = new WebSocketJsonResult(null, WebSocketMessageType.clientHeartbeatReply.getCode(), WebSocketMessageType.clientHeartbeatReply.getName()); String jsonString = JSON.toJSONString(newMessage); sendInfo(jsonString,userid); break; default: //不做任何事情 logger.error("傳入的類型沒有對應的處理"); break; } } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { logger.error("發生錯誤" + error.getMessage()); error.printStackTrace(); } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發自定義消息 * */ public static void sendInfo(String message,String userid) { logger.info("推送消息到窗口" + userid + ",推送內容:" + message); for (WebSocketServer item : webSocketSet) { try { //這里可以設定只推送給這個sid的,為null則全部推送 if(userid == null) { item.sendMessage(message); }else if(item.userid.equals(userid)){ item.sendMessage(message); } } catch (IOException e) { logger.error("error:" + e.getMessage()); continue; } } } public static Integer getSize(){ return webSocketSet.size(); } public static String getUserIdList(){ StringBuilder sb = new StringBuilder(); for (WebSocketServer item : webSocketSet) { sb.append(item.userid); sb.append(","); } return sb.toString(); } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
這個服務類用到一些實體和枚舉:
WebSocketJsonResult:
package com.tfe.sell.common.entity; /** * * JSON模型 * * 用戶后台向前台返回的JSON對象 * * */ public class WebSocketJsonResult implements java.io.Serializable { private static final long serialVersionUID = -1118025395225258944L; private String message = ""; private int code = 1; private Object result = null; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public Object getResult() { return result; } public void setResult(Object obj) { this.result = obj; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public WebSocketJsonResult(){} public WebSocketJsonResult(Object result, int code, String message){ this.result = result; this.code = code; this.message = message; } public static WebSocketJsonResult successInstance(){ return new WebSocketJsonResult(null,1,"成功"); } public static WebSocketJsonResult failInstance(String message){ return new WebSocketJsonResult(null,-1,message); } }
2.React端
2.1、添加WebSocket組件
新建WebSocket目錄,添加Index.js,代碼如下:

/** * 參數:[socketOpen|socketClose|socketMessage|socketError] = func,[socket連接成功時觸發|連接關閉|發送消息|連接錯誤] * timeout:連接超時時間 * @type {module.webSocket} */ module.exports = class webSocket { constructor(param = {}) { this.param = param; this.reconnectCount = 0; this.socket = null; this.taskRemindInterval = null; this.isSucces=true; } connection = () => { let {socketUrl, timeout = 0} = this.param; // 檢測當前瀏覽器是什么瀏覽器來決定用什么socket if ('WebSocket' in window) { console.log('WebSocket'); this.socket = new WebSocket(socketUrl); } else if ('MozWebSocket' in window) { console.log('MozWebSocket'); this.socket = new MozWebSocket(socketUrl); } else { console.log('SockJS'); this.socket = new SockJS(socketUrl); } this.socket.onopen = this.onopen; this.socket.onmessage = this.onmessage; this.socket.onclose = this.onclose; this.socket.onerror = this.onerror; this.socket.sendMessage = this.sendMessage; this.socket.closeSocket = this.closeSocket; }; // 連接成功觸發 onopen = () => { let {socketOpen} = this.param; this.isSucces=false; //連接成功將標識符改為false socketOpen && socketOpen(); }; // 后端向前端推得數據 onmessage = (msg) => { let {socketMessage} = this.param; socketMessage && socketMessage(msg); }; // 關閉連接觸發 onclose = (e) => { this.isSucces = true; //關閉將標識符改為true this.socket.close(); let {socketClose} = this.param; socketClose && socketClose(e); }; onerror = (e) => { // socket連接報錯觸發 let {socketError} = this.param; this.socket = null; socketError && socketError(e); }; sendMessage = (value) => { // 向后端發送數據 if(this.socket) { this.socket.send(JSON.stringify(value)); } }; closeSocket = () => { this.socket.close(); }; //獲得狀態 readyState = () => { return this.socket.readyState; } };
2.2 在需要觸發服務的頁面,例如訂單管理頁面增加WebSocket引用代碼
import WebSocket from "components/WebSocket";
socket = null;
@observable
webSockethadOpen = false;
//重試次數
websocketConnectedCount = 0;
openWebSocket = () => { let head = constantStore.ROOT_API_URL.replace("https","wss").replace("http","ws"); let url = head + "/admin/websocket/" + userStore.id; this.socket = new WebSocket({ socketUrl: url, socketMessage: (msg) => { var myMessage = msg; var data = myMessage.data; if (data != undefined){ let jsonResult = JSON.parse(data); if (jsonResult.code == 101) { console.info("收到新訂單消息",jsonResult); let orderGroupId = jsonResult.result; this.soundNotice(); Modal.confirm({ title: "新訂單", content: "有新訂單,是否顯示?", okText: "確認", cancelText: "取消", onOk: () => { if(this != undefined){ this.refs["DetailModal"].showModal(orderGroupId); } else{ message.error("請先打開【訂單管理】頁面"); } } }); } else if (jsonResult.code == 401){ //401是后端收到心跳包,並返回的消息類型 console.info("收到后台回復心跳包",jsonResult); this.heartCheck.reset().start(); // 如果獲取到消息,說明連接是正常的,重置心跳檢測 } } }, socketClose: (msg) => { this.webSockethadOpen = false; message.success("訂單監控已關閉"); console.info("訂單監控已關閉"); }, socketError: () => { console.info("連接建立失敗"); this.webSockethadOpen = false; this.websocketConnectedCount++; if(this.websocketConnectedCount <= 5){ message.error('連接建立失敗,嘗試重連'); this.openWebSocket(); } else{ message.error('連接建立失敗,請聯系管理員'); } }, socketOpen: () => { message.success("開始訂單監控"); console.info("開始訂單監控"); this.heartCheck.reset().start(); // 成功建立連接后,重置心跳檢測 } }); //重試創建socket連接 try { this.socket.connection(); } catch (e) { // 捕獲異常,防止js error } this.webSockethadOpen = true; }; closeWebSocket = () => { if (this.webSockethadOpen == true){ this.socket.closeSocket(); } }; soundNotice = () => { var url = "/new_order.mp3"; var audio = new Audio(url); audio.src = url; audio.play(); }; componentDidMount() { //初始化心跳包對象 this.initHeartCheck(); //進入自動開啟監控 this.openWebSocket(); } //初始化心跳包對象 initHeartCheck = () => { let _this = this; // 心跳檢測, 每隔一段時間檢測連接狀態,如果處於連接中,就向server端主動發送消息,來重置server端與客戶端的最大連接時間, // 如果已經斷開了,發起重連。 this.heartCheck = { timeout: 10 * 60 * 1000, //10分鍾發送一次心跳包,輸入毫秒數 serverTimeoutObj: null, reset: function () { if (this.serverTimeoutObj != undefined){ clearTimeout(this.serverTimeoutObj); } return this; }, start: function () { this.serverTimeoutObj = setInterval(function () { if (_this.webSockethadOpen) { if (_this.socket.readyState() == 1) { var message = { "code":301, "message":"發送心跳包" }; console.log("發送心跳包消息",message); _this.socket.sendMessage(message); _this.heartCheck.reset().start(); // 如果獲取到消息,說明連接是正常的,重置心跳檢測 } else { console.log("斷開狀態,嘗試重連"); _this.openWebSocket(); } } }, this.timeout) } }; }; componentWillUnmount(){ console.info("componentWillUnmount"); this.closeWebSocket(); }
<Button type="primary" disabled={this.webSockethadOpen} onClick={() => this.openWebSocket()} >
監聽訂單
</Button>
<Button type="primary" disabled={!this.webSockethadOpen} onClick={() => this.closeWebSocket()}>
關閉監聽
</Button>

