https://www.jianshu.com/p/60799f1356c5
https://blog.csdn.net/Ouyzc/article/details/79994401
maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
項目實戰
1,設置webSocket終端服務
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
2,消息處理
import com.chitic.module.core.enums.ChiticResponseCode; import com.chitic.module.core.exception.ChiticException; import com.chitic.module.core.util.SpringUtils; import com.chitic.module.sso.api.common.UserCacheInfo; import com.chitic.module.sso.config.TokenService; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint(value = "/socketServer/{clientId}") @Component public class SocketServer { private Session session; public static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>(); public static ConcurrentHashMap<String, String> sessionIds = new ConcurrentHashMap<>(); public void setSession(Session session) { this.session = session; } private String token(String clientId) { if (StringUtils.isNotBlank(clientId)) { String[] split = clientId.split("_"); return split[0]; } return null; } @OnOpen public void open(Session session, @PathParam(value = "clientId") String clientId) { this.session = session; TokenService tokenService = SpringUtils.getBean(TokenService.class); String token = token(clientId); if (null == token) { throw ChiticException.of(ChiticResponseCode.ACCESS_DENY); } UserCacheInfo userCacheInfo = tokenService.getByToken(token); if (null == userCacheInfo) { throw ChiticException.of(ChiticResponseCode.ACCESS_DENY); } sessionPool.put(clientId, session); sessionIds.put(session.getId(), clientId); } @OnMessage public void onMessage(String message) { String sessionId = session.getId(); sendMessage(message, sessionIds.get(sessionId)); } @OnClose public void onClose() { sessionPool.remove(sessionIds.get(session.getId())); sessionIds.remove(session.getId()); } @OnError public void onError(Session session, Throwable error) { error.printStackTrace(); } public synchronized static void sendMessage(String message, String clientId) { Session s = sessionPool.get(clientId); if (s != null) { try { s.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } } }
3,發送消息(可以在MQ如kafka生產消息)
import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.websocket.Session; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /* * @Author * @Description websocket推送 * @Date 8:46 2019/7/17 * @Param * @return **/ @Service @AllArgsConstructor @Slf4j public class DataDevNowWsBizService { private final WaterUnitRepository waterUnitRepository;private String houseId(String clientId) { if (StringUtils.isNotBlank(clientId)) { String[] split = clientId.split("_"); return split[split.length - 1]; } return null; } public void pushWs(HubDataDevKafkaResponse data) { ConcurrentHashMap<String, Session> sessionPool = SocketServer.sessionPool; if (CollectionUtils.isEmpty(sessionPool.keySet())) { return; } sessionPool.keySet().forEach(r -> { String house = houseId(r); if (null != house && house.equals("" + houseId)) { Session session = sessionPool.get(r);
SocketServer socketServer = new SocketServer();
socketServer.setSession(session); socketServer.onMessage(JacksonUtil.toString(ChiticWsResponse.of(r, WsDataType.POINT.getCode(), wsPointResponse))); socketServer.onMessage(JacksonUtil.toString(ChiticWsResponse.of(r, WsDataType.PUMPS.getCode(), dataDevNowWsResponsePumps)));
} }); } }
第一種
常量
/** * @Description TODO webSocket相關常量 * @Author GX * @Date 2019/6/12 11:16 * @Version V1.0 **/ public interface SocketConstant { /** * 鏈接地址 */ String WEBSOCKETPATHPERFIX = "/ws-push"; String WEBSOCKETPATH = "/endpointWisely"; /** * 消息代理路徑 */ String WEBSOCKETBROADCASTPATH = "/topic"; /** * 前端發送給服務端請求地址 */ String FORETOSERVERPATH = "/welcome"; /** * 服務端生產地址,客戶端訂閱此地址以接收服務端生產的消息 */ String PRODUCERPATH = "/topic/getResponse"; /** * 點對點消息推送地址前綴 */ String P2PPUSHBASEPATH = "/user"; /** * 點對點消息推送地址后綴,最后的地址為/user/用戶識別碼/msg */ String P2PPUSHPATH = "/msg"; }
接收前端消息實體
import lombok.Data; /** * @Description TODO 接收前端消息實體 * @Author GX * @Date 2019/6/12 11:21 * @Version V1.0 **/ @Data public class WiselyMessage { private String name; }
后台發送消息實體
import lombok.AllArgsConstructor; import lombok.Data; /** * @Description TODO 后台發送消息實體 * @Author GX * @Date 2019/6/12 11:21 * @Version V1.0 **/ @Data @AllArgsConstructor public class WiselyResponse { private String responseMessage; }
配置websocket
import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * @EnableWebSocketMessageBroker注解用於開啟使用STOMP協議來傳輸基於代理(MessageBroker)的消息, * 這時候控制器(controller)開始支持@MessageMapping,就像是使用@requestMapping一樣。 * @Description TODO 配置類 * @Author GX * @Date 2019/6/12 11:21 * @Version V1.0 **/ @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { //注冊一個Stomp的節點(endpoint),並指定使用SockJS協議。 //TODO 注意: 涉及到跨域問題 需加.setAllowedOrigins("*") stompEndpointRegistry.addEndpoint(SocketConstant.WEBSOCKETPATH).setAllowedOrigins("*").withSockJS(); //TODO 注意: 不涉及到跨域問題 .setAllowedOrigins("*")可去掉,如下 //stompEndpointRegistry.addEndpoint(SocketConstant.WEBSOCKETPATH).withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //服務端發送消息給客戶端的域,多個用逗號隔開 registry.enableSimpleBroker(SocketConstant.WEBSOCKETBROADCASTPATH, SocketConstant.P2PPUSHBASEPATH); //定義一對一推送的時候前綴 registry.setUserDestinationPrefix(SocketConstant.P2PPUSHBASEPATH); //定義websoket前綴 registry.setApplicationDestinationPrefixes(SocketConstant.WEBSOCKETPATHPERFIX); } }
消息邏輯處理,發送消息
package com.chitic.supplywater.common.config.webSocket; import lombok.AllArgsConstructor; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import java.util.List; @Service @AllArgsConstructor public class WebSocketService { //@Autowired private final SimpMessagingTemplate template; /** * 廣播 * 發給所有在線用戶 * * @param msg */ public void sendMsg(WiselyResponse msg) { template.convertAndSend(SocketConstant.PRODUCERPATH, msg); } /** * 發送給指定用戶 * @param users * @param msg */ public void send2Users(List<String> users, WiselyResponse msg) { users.forEach(userName -> template.convertAndSendToUser(userName, SocketConstant.P2PPUSHPATH, msg)); } }
控制器controller
import lombok.AllArgsConstructor; import org.apache.commons.compress.utils.Lists; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Controller; import java.util.List; @Controller @AllArgsConstructor public class WsController { //@Resource private final WebSocketService webSocketService; @MessageMapping(SocketConstant.FORETOSERVERPATH)//@MessageMapping和@RequestMapping功能類似,用於設置URL映射地址,瀏覽器向服務器發起請求,需要通過該地址。 @SendTo(SocketConstant.PRODUCERPATH)//如果服務器接受到了消息,就會對訂閱了@SendTo括號中的地址傳送消息。 public WiselyResponse say(WiselyMessage message) throws Exception { List<String> users = Lists.newArrayList(); //此處寫死只是為了方便測試,此值需要對應頁面中訂閱個人消息的userId, 實際開發中,可以指定將此消息發送給誰 users.add("aa"); users.add("bb"); webSocketService.send2Users(users, new WiselyResponse("admin hello")); return new WiselyResponse("Welcome, " + message.getName() + "!"); } }
前端頁面
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8" />
<title>Spring Boot+WebSocket+廣播式</title>
</head>
<body onload="disconnect()">
<noscript><h2 style="color: #ff0000">貌似你的瀏覽器不支持websocket</h2></noscript>
<div>
<div>
<button id="connect" onclick="connect();">連接</button>
<button id="disconnect" disabled="disabled" onclick="disconnect();">斷開連接</button>
</div>
<div id="conversationDiv">
<label>輸入你的名字</label><input type="text" id="name" />
<button id="sendName" onclick="sendName();">發送</button>
<p id="response"></p>
<p id="response1"></p>
</div>
</div>
<!--<script th:src="@{sockjs.min.js}"></script>
<script th:src="@{stomp.min.js}"></script>
<script th:src="@{jquery.js}"></script>-->
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.2.1/jquery.min.js"></script>
<script th:inline="javascript">
var stompClient = null;
//此值有服務端傳遞給前端,實現方式沒有要求
var userId = "aa";
function setConnected(connected) {
document.getElementById('connect').disabled = connected;
document.getElementById('disconnect').disabled = !connected;
document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
$('#response').html();
}
function connect() {
var socket = new SockJS('http://localhost:9000/endpointWisely'); //1連接SockJS的endpoint是“endpointWisely”,與后台代碼中注冊的endpoint要一樣。
stompClient = Stomp.over(socket);//2創建STOMP協議的webSocket客戶端。
stompClient.connect({}, function(frame) {//3連接webSocket的服務端。
setConnected(true);
console.log('開始進行連接Connected: ' + frame);
//4通過stompClient.subscribe()訂閱服務器的目標是'/topic/getResponse'發送過來的地址,與@SendTo中的地址對應。
stompClient.subscribe('/topic/getResponse', function(respnose){
showResponse(JSON.parse(respnose.body).responseMessage);
});
//4通過stompClient.subscribe()訂閱服務器的目標是'/user/' + userId + '/msg'接收一對一的推送消息,其中userId由服務端傳遞過來,用於表示唯一的用戶,通過此值將消息精確推送給一個用戶
stompClient.subscribe('/user/' + userId + '/msg', function(respnose){
console.log(respnose);
showResponse1(JSON.parse(respnose.body).responseMessage);
});
});
}
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
setConnected(false);
console.log("Disconnected");
}
function sendName() {
var name = $('#name').val();
//通過stompClient.send()向地址為"/welcome"的服務器地址發起請求,與@MessageMapping里的地址對應。因為我們配置了registry.setApplicationDestinationPrefixes(Constant.WEBSOCKETPATHPERFIX);所以需要增加前綴/ws-push/
stompClient.send("/ws-push/welcome", {}, JSON.stringify({ 'name': name }));
}
function showResponse(message) {
var response = $("#response");
response.html(message);
}
function showResponse1(message) {
var response = $("#response1");
response.html(message);
}
</script>
</body>
</html>
第二種
設置WebSocket的配置類
package com.chitic.supplywater.common.config.webSocket;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * 設置webSocket終端服務 * @Author: gx * @Date: 2019/6/10 16:39 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
WebSocket消息處理類
package com.chitic.supplywater.common.config.webSocket;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint("/webSocket/{sid}") @Component public class WebSocketServer { static Logger logger= LoggerFactory.getLogger(WebSocketServer.class); /** * 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 */ private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); /** * 與某個客戶端的連接會話,需要通過它來給客戶端發送數據 */ private Session session; //接收sid private String sid=""; /** * 連接建立成功調用的方法*/ @OnOpen public void onOpen(Session session,@PathParam("sid") String sid) { this.session = session; //加入set中 webSocketSet.add(this); //在線數加1 addOnlineCount(); logger.info("有新窗口開始監聽:識別碼為"+sid+",當前在線人數為" + getOnlineCount()); this.sid=sid; try { sendMessage("連接成功"); } catch (IOException e) { logger.error("websocket IO異常"); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數減1 logger.info("有一連接關閉!當前在線人數為" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) { logger.info("收到來自窗口"+sid+"的信息:"+message); //群發消息 for (WebSocketServer item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { logger.error("發生錯誤"); error.printStackTrace(); } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發自定義消息 * */ public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException { logger.info("推送消息到窗口"+sid+",推送內容:"+message); for (WebSocketServer item : webSocketSet) { try { //這里可以設定只推送給這個sid的,為null則全部推送 if(sid==null) { item.sendMessage(message); }else if(item.sid.equals(sid)){ item.sendMessage(message); } } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
前端代碼
<!DOCTYPE HTML>
<html>
<head>
<title>My WebSocket</title>
</head>
<body>
Welcome<br/>
<input id="text" type="text"/>
<button onclick="send()">Send</button>
<button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>
<script type="text/javascript"> var websocket = null; //判斷當前瀏覽器是否支持WebSocket if ('WebSocket' in window) { websocket = new WebSocket("ws://localhost:9000/webSocket/1"); } else { alert('Not support websocket') } //連接發生錯誤的回調方法 websocket.onerror = function () { setMessageInnerHTML("error"); }; //連接成功建立的回調方法 websocket.onopen = function (event) { setMessageInnerHTML("open"); } //接收到消息的回調方法 websocket.onmessage = function (event) { setMessageInnerHTML(event.data); } //連接關閉的回調方法 websocket.onclose = function () { setMessageInnerHTML("close"); } //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。 window.onbeforeunload = function () { websocket.close(); } //將消息顯示在網頁上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //關閉連接 function closeWebSocket() { websocket.close(); } //發送消息 function send() { var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
其中: sid可以控制將消息推送給那個用戶
