最近部門有個需求,需要實現一個消息中心,簡而言之,就是給各個系統提供與客戶交互的橋梁,自然而然需要選擇websocket協議,由於我們是使用的spring cloud這一套,因此以springboot為例來進行說明。
一、方案
A、整體方案
先說一下簡單的場景,各系統通過Rabbitmq將要發送給客戶端的消息推送到消息中心,消息中心再基於ws連接,將消息推送給客戶端,實現交互。但是問題來了,生產上有多個節點(至少兩台服務器吧),但是客戶端只跟其中一台服務器建立ws連接,所以這個session如何維護呢?比如客戶端A與服務器1建立連接,此時要推送的消息到了服務器2上,他沒有與客戶端A的連接,這個消息就無法推送,因此設計方案如下:
a、客戶端與消息中心建立ws連接,各節點維護各自的連接,例如使用ConcurrentHashMap
b、各應用將要推送給客戶端的消息發送到rabbitmq,rabbitmq通過廣播的方式將消息發送到消息中心的各節點
c、消息中心通過userId判斷連接是否在本機維護,如果不在,直接忽略,如果session在本機維護,則推送消息到客戶端
下面有個重要的問題就是rabbitmq的廣播機制如何實現,這時候一百度,都是說不同服務訂閱不同的隊列就實現廣播了。我想問一句,我是一個應用,生產上部署多個節點,代碼配置都是同一套,你家上生產就部署一個節點啊???
B、Rabbimq廣播機制實現(同應用不同節點)
Rabbitmq的topic模式跟其他mq都不大一樣,他是指定exchange到queue的模式,如下圖:
所以我選擇在系統啟動時,基於雪花算法生成隨機id,作為隊列名,並且隊列非持久化,項目一重啟,之前的隊列就消失。不過這里有個問題,就是應用關閉時,若此時mq中有消息未消費,就全丟失了,不過我們的場景可接受這種情況,因此選用這種方式。
package com.yunzhangfang.platform.message.gateway.service.mq.consumer; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.*; import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageContentDTO; import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageDTO; import com.yunzhangfang.platform.message.gateway.client.dto.user.UserDTO; import com.yunzhangfang.platform.message.gateway.service.dto.MessageSaveDTO; import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.MqConstant; import com.yunzhangfang.platform.message.gateway.service.infrastructure.dataobject.MessageUser; import com.yunzhangfang.platform.message.gateway.service.service.MessageService; import com.yunzhangfang.platform.message.gateway.service.session.SessionManager; import com.yzf.accounting.common.exception.BizRuntimeException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; /** * 功能:接收來自於各應用的消息,並且將消息推送到目標用戶 * @author lenovo */ @Slf4j @Component public class MessageCreatedConsumer { @Autowired private ConnectionFactory connectionFactory; /** * 接收各應用發送的消息,並將消息保存進入mongodb * @throws IOException */ @PostConstruct public void handleMessage() throws IOException { // id是使用雪花算法隨機生成的 String queue = "FLOW_QUEUE_" + id; String exchange = "FANOUT_FLOW_EXCHANGE"; Connection conn = connectionFactory.createConnection(); Channel channel = conn.createChannel(false); // 1、創建一個隊列,id是使用雪花算法隨機生成的,並且非持久化的,自動刪除的 channel.queueDeclare(queue, false, false, true, null); // 2、創建交換器 channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT); // 3、將隊列和交換器通過路由鍵進行綁定。fanout模式路由鍵直接設置為""。 channel.queueBind(queue, exchange, ""); channel.basicConsume(queue, new DefaultConsumer(channel) { // 4、當消息到達時執行回調方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg; try { msg = new String(body, "UTF-8"); log.info("消息中心接收到消息,內容為:{}", msg); } catch (Exception e) { log.error("消息中心接收消息出現異常", e); } } }); } }
這地方有個坑,因為隊列是使用雪花算法生成的id拼裝的,所以沒辦法使用@RabbitListener這個注解,只能通過channel的方式去實現消息消費,只是寫起來麻煩一點,本質是一樣的。
二、springboot集成websocket
springboot集成websokcet主要有兩種方式,分別如下:
A、直接基於websocket協議實現
1、配置類
package com.chitic.supplywater.common.config.webSocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * 設置webSocket終端服務 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
2、處理類
package com.chitic.supplywater.common.config.webSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint("/webSocket/{sid}") @Component public class WebSocketServer { /** * 連接建立成功調用的方法 */ @OnOpen public void onOpen(Session session,@PathParam("sid") String sid) { // 維護會話等 } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { // 銷毀會話等 } /** * 收到客戶端消息后調用的方法 */ @OnMessage public void onMessage(String message, Session session) { // 收到客戶端發送的消息 } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { // 發生錯誤時 } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發自定義消息 * */ public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException { // 從會話管理器中獲取會話,進行群發 } }
B、基於stomp協議
1、配置類
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config; import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.WebSocketConstant; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.*; /** * 開啟使用STOMP協議來傳輸基於代理(MessageBroker)的消息,這時候控制器(controller)開始支持@MessageMapping,就像是使用@requestMapping一樣。 * @author lenovo */ @EnableWebSocketMessageBroker @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private WebSocketDecoratorFactory webSocketDecoratorFactory; @Autowired private WebSockHandshakeHandler webSockHandshakeHandler; @Override public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) { // 創建一個serverPoint與前端交互 stompEndpointRegistry.addEndpoint(WebSocketConstant.WEBSOCKET_SERVER_PATH) // 防止跨域問題 .setAllowedOrigins("*") // 握手時handler .setHandshakeHandler(webSockHandshakeHandler) // 指定使用SockJS協議 .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //全局使用的消息前綴(客戶端訂閱路徑上會體現出來) registry.setApplicationDestinationPrefixes("/app"); //用戶訂閱主題的前綴,/topic 代表發布廣播,即群發 ,/queue 代表點對點,即發指定用戶 registry.enableSimpleBroker("/topic", "/queue"); //點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/ registry.setUserDestinationPrefix("/user"); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(webSocketDecoratorFactory); } }
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Configuration; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import javax.servlet.http.HttpServletRequest; import java.security.Principal; import java.util.Map;
@Configuration @Slf4j public class WebSockHandshakeHandler extends DefaultHandshakeHandler { /** * 此類在客戶端與服務端握手的時候觸發。
* tips:將請求中的參數userId塞到Principal中,可以理解成塞到websocket的session中,后續可通過Principal principal = session.getPrincipal()獲取到 */ @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request; HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest(); // 握手取userid final String userId = httpRequest.getParameter("userId"); log.info("客戶端入參userId:{}", userId); if (StringUtils.isEmpty(userId)) { return null; } return () -> userId; } return null; } }
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config; import com.yunzhangfang.platform.message.gateway.service.session.SessionManager; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.WebSocketHandlerDecorator; import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory; import java.security.Principal; /** * 服務端和客戶端在進行握手揮手時會被執行。進行session的維護 */ @Component @Slf4j public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory { @Autowired private SessionManager sessionManager; @Override public WebSocketHandler decorate(WebSocketHandler handler) { return new WebSocketHandlerDecorator(handler) { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("有客戶端連接,sessionId:{}", session.getId()); // principal是自定義塞入到session中的數據(例如我們塞的是userId,后續通過userId可以找到其session) Principal principal = session.getPrincipal(); if (principal != null && StringUtils.isNotBlank(principal.getName())) { // principal.getName獲取的就是WebSockHandshakeHandler中塞入的userId if(!sessionManager.isConnected(principal.getName())) { // 身份校驗成功,緩存socket連接 sessionManager.add(principal.getName(), session); log.info("客戶端userId:{}存入redis", principal.getName()); } } super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { log.info("客戶端退出連接,sessionId:{}", session.getId()); Principal principal = session.getPrincipal(); if (principal != null) { // 身份校驗成功,移除socket連接 sessionManager.remove(principal.getName()); log.info("客戶端userId:{}從redis中刪除", principal.getName()); } super.afterConnectionClosed(session, closeStatus); } }; } }
通過使用WebSocketDecoratorFactory可在連接建立完成或連接關閉時觸發,將userId和session存入本地Map。
2、提供客戶端請求controller
@Controller public class MessageCenterController { @Autowired private MessageService messageService; /** * 客戶端進入頁面或刷新頁面調用。獲取來源列表、每個來源未讀消息數以及每個來源消息列表信息 */ @MessageMapping("/init/query") public void initQuery(String userId) { messageService.initQuery(userId); } }
這里也可以使用http去實現,不過既然已經建立了websocket協議,就直接使用ws協議操作更為合適,無需建立另外的連接。
注:客戶端請求"/init/query"地址,服務端是沒有辦法直接返回響應的,必須客戶端訂閱地址才能拿到服務端返回的信息。
/** * 根據用戶id查詢消息組合信息(應用列表、消息未讀數以及消息列表) * @param userId * @return */ @Override public void initQuery(String userId) { // 拼裝result simpMessagingTemplate.convertAndSendToUser(userId, "/init/query", result); }
因為我在握手的時候將userId存放到websocket連接的session信息中了,因此通過Springboot提供的SimpMessagingTemplate.convertAndSendToUser(userId...)就能將消息發送到對應客戶端。
三、客戶端代碼(js)
var stompClient = null; //加載完瀏覽器后 調用connect(),打開雙通道 $(function(){ //打開雙通道 connect() }) //打開雙通道 function connect(){ var socket = new SockJS('http://localhost:8281/message/center?userId=654366374251302912'); //連接SockJS的endpoint名稱為"endpointAric" stompClient = Stomp.over(socket);//使用STMOP子協議的WebSocket客戶端 stompClient.connect({},function(frame){//連接WebSocket服務端 stompQueue(); }); } //列隊(一對一) function stompQueue(){ //通過stompClient.subscribe訂閱/user/queue/init/query stompClient.subscribe('/user/queue/init/query',function(response){ var message=JSON.stringify(response.body); //alert(message); }); stompClient.send("/app/init/query",{},'654366374251302912'); } //強制關閉瀏覽器 調用websocket.close(),進行正常關閉 window.onunload = function() { disconnect() } //關閉雙通道 function disconnect(){ if(stompClient != null) { stompClient.disconnect(); } console.log("Disconnected"); }