基於websocket實現消息中心


最近部門有個需求,需要實現一個消息中心,簡而言之,就是給各個系統提供與客戶交互的橋梁,自然而然需要選擇websocket協議,由於我們是使用的spring cloud這一套,因此以springboot為例來進行說明。

一、方案

A、整體方案

先說一下簡單的場景,各系統通過Rabbitmq將要發送給客戶端的消息推送到消息中心,消息中心再基於ws連接,將消息推送給客戶端,實現交互。但是問題來了,生產上有多個節點(至少兩台服務器吧),但是客戶端只跟其中一台服務器建立ws連接,所以這個session如何維護呢?比如客戶端A與服務器1建立連接,此時要推送的消息到了服務器2上,他沒有與客戶端A的連接,這個消息就無法推送,因此設計方案如下:

 

 

 

 a、客戶端與消息中心建立ws連接,各節點維護各自的連接,例如使用ConcurrentHashMap

b、各應用將要推送給客戶端的消息發送到rabbitmq,rabbitmq通過廣播的方式將消息發送到消息中心的各節點

c、消息中心通過userId判斷連接是否在本機維護,如果不在,直接忽略,如果session在本機維護,則推送消息到客戶端

 

 

下面有個重要的問題就是rabbitmq的廣播機制如何實現,這時候一百度,都是說不同服務訂閱不同的隊列就實現廣播了。我想問一句,我是一個應用,生產上部署多個節點,代碼配置都是同一套,你家上生產就部署一個節點啊???

B、Rabbimq廣播機制實現(同應用不同節點)

Rabbitmq的topic模式跟其他mq都不大一樣,他是指定exchange到queue的模式,如下圖:

 

 

 所以我選擇在系統啟動時,基於雪花算法生成隨機id,作為隊列名,並且隊列非持久化,項目一重啟,之前的隊列就消失。不過這里有個問題,就是應用關閉時,若此時mq中有消息未消費,就全丟失了,不過我們的場景可接受這種情況,因此選用這種方式。

package com.yunzhangfang.platform.message.gateway.service.mq.consumer;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageContentDTO;
import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageDTO;
import com.yunzhangfang.platform.message.gateway.client.dto.user.UserDTO;
import com.yunzhangfang.platform.message.gateway.service.dto.MessageSaveDTO;
import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.MqConstant;
import com.yunzhangfang.platform.message.gateway.service.infrastructure.dataobject.MessageUser;
import com.yunzhangfang.platform.message.gateway.service.service.MessageService;
import com.yunzhangfang.platform.message.gateway.service.session.SessionManager;
import com.yzf.accounting.common.exception.BizRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * 功能:接收來自於各應用的消息,並且將消息推送到目標用戶
 * @author lenovo
 */
@Slf4j
@Component
public class MessageCreatedConsumer {

    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 接收各應用發送的消息,並將消息保存進入mongodb
     * @throws IOException
     */
    @PostConstruct
    public void handleMessage() throws IOException {
        // id是使用雪花算法隨機生成的
        String queue = "FLOW_QUEUE_" + id;
        String exchange = "FANOUT_FLOW_EXCHANGE";
        Connection conn = connectionFactory.createConnection();
        Channel channel = conn.createChannel(false);
        // 1、創建一個隊列,id是使用雪花算法隨機生成的,並且非持久化的,自動刪除的
        channel.queueDeclare(queue, false, false, true, null);
        // 2、創建交換器
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
        // 3、將隊列和交換器通過路由鍵進行綁定。fanout模式路由鍵直接設置為""。
        channel.queueBind(queue, exchange, "");
        channel.basicConsume(queue, new DefaultConsumer(channel) {
            // 4、當消息到達時執行回調方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg;
                try {
                    msg = new String(body, "UTF-8");
                    log.info("消息中心接收到消息,內容為:{}", msg);
                } catch (Exception e) {
                    log.error("消息中心接收消息出現異常", e);
                }
            }
        });
    }
}

這地方有個坑,因為隊列是使用雪花算法生成的id拼裝的,所以沒辦法使用@RabbitListener這個注解,只能通過channel的方式去實現消息消費,只是寫起來麻煩一點,本質是一樣的。

 

二、springboot集成websocket

springboot集成websokcet主要有兩種方式,分別如下:

A、直接基於websocket協議實現

1、配置類

package com.chitic.supplywater.common.config.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 設置webSocket終端服務
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

}

2、處理類

package com.chitic.supplywater.common.config.webSocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint("/webSocket/{sid}")
@Component
public class WebSocketServer {

    /**
     * 連接建立成功調用的方法
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("sid") String sid) {
       // 維護會話等
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        // 銷毀會話等
    }

    /**
     * 收到客戶端消息后調用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        // 收到客戶端發送的消息
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        // 發生錯誤時
    }
    /**
     * 實現服務器主動推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 群發自定義消息
     * */
    public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
        // 從會話管理器中獲取會話,進行群發
    }
}

B、基於stomp協議

1、配置類

package com.yunzhangfang.platform.message.gateway.service.infrastructure.config;

import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.WebSocketConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

/**
 * 開啟使用STOMP協議來傳輸基於代理(MessageBroker)的消息,這時候控制器(controller)開始支持@MessageMapping,就像是使用@requestMapping一樣。
 * @author lenovo
 */
@EnableWebSocketMessageBroker
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private WebSocketDecoratorFactory webSocketDecoratorFactory;

    @Autowired
    private WebSockHandshakeHandler webSockHandshakeHandler;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        // 創建一個serverPoint與前端交互
        stompEndpointRegistry.addEndpoint(WebSocketConstant.WEBSOCKET_SERVER_PATH)
                             // 防止跨域問題
                             .setAllowedOrigins("*")
                             // 握手時handler
                             .setHandshakeHandler(webSockHandshakeHandler)
                             // 指定使用SockJS協議
                             .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //全局使用的消息前綴(客戶端訂閱路徑上會體現出來)
        registry.setApplicationDestinationPrefixes("/app");
        //用戶訂閱主題的前綴,/topic 代表發布廣播,即群發 ,/queue 代表點對點,即發指定用戶
        registry.enableSimpleBroker("/topic", "/queue");
        //點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/
        registry.setUserDestinationPrefix("/user");
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(webSocketDecoratorFactory);
    }
}
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpServletRequest;
import java.security.Principal;
import java.util.Map;

@Configuration @Slf4j
public class WebSockHandshakeHandler extends DefaultHandshakeHandler { /** * 此類在客戶端與服務端握手的時候觸發。
* tips:將請求中的參數userId塞到Principal中,可以理解成塞到websocket的session中,后續可通過Principal principal = session.getPrincipal()獲取到
*/ @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request; HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest(); // 握手取userid final String userId = httpRequest.getParameter("userId"); log.info("客戶端入參userId:{}", userId); if (StringUtils.isEmpty(userId)) { return null; } return () -> userId; } return null; } }
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config;

import com.yunzhangfang.platform.message.gateway.service.session.SessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;

import java.security.Principal;

/**
 * 服務端和客戶端在進行握手揮手時會被執行。進行session的維護
 */
@Component
@Slf4j
public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory {

    @Autowired
    private SessionManager sessionManager;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
                log.info("有客戶端連接,sessionId:{}", session.getId());
                // principal是自定義塞入到session中的數據(例如我們塞的是userId,后續通過userId可以找到其session)
                Principal principal = session.getPrincipal();
                if (principal != null && StringUtils.isNotBlank(principal.getName())) {
                    // principal.getName獲取的就是WebSockHandshakeHandler中塞入的userId
                    if(!sessionManager.isConnected(principal.getName())) {
                        // 身份校驗成功,緩存socket連接
                        sessionManager.add(principal.getName(), session);
                        log.info("客戶端userId:{}存入redis", principal.getName());
                    }
                }

                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                log.info("客戶端退出連接,sessionId:{}", session.getId());
                Principal principal = session.getPrincipal();
                if (principal != null) {
                    // 身份校驗成功,移除socket連接
                    sessionManager.remove(principal.getName());
                    log.info("客戶端userId:{}從redis中刪除", principal.getName());
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}

通過使用WebSocketDecoratorFactory可在連接建立完成或連接關閉時觸發,將userId和session存入本地Map。

2、提供客戶端請求controller

@Controller
public class MessageCenterController {

    @Autowired
    private MessageService messageService;

    /**
     * 客戶端進入頁面或刷新頁面調用。獲取來源列表、每個來源未讀消息數以及每個來源消息列表信息
     */
    @MessageMapping("/init/query")
    public void initQuery(String userId) {
        messageService.initQuery(userId);
    }
}

這里也可以使用http去實現,不過既然已經建立了websocket協議,就直接使用ws協議操作更為合適,無需建立另外的連接。

注:客戶端請求"/init/query"地址,服務端是沒有辦法直接返回響應的,必須客戶端訂閱地址才能拿到服務端返回的信息。

/**
     * 根據用戶id查詢消息組合信息(應用列表、消息未讀數以及消息列表)
     * @param userId
     * @return
     */
    @Override
    public void initQuery(String userId) {
        // 拼裝result
        simpMessagingTemplate.convertAndSendToUser(userId, "/init/query", result);
    }

因為我在握手的時候將userId存放到websocket連接的session信息中了,因此通過Springboot提供的SimpMessagingTemplate.convertAndSendToUser(userId...)就能將消息發送到對應客戶端。

 

三、客戶端代碼(js)

var stompClient = null;

//加載完瀏覽器后  調用connect(),打開雙通道
$(function(){
    //打開雙通道
    connect()
})

//打開雙通道
function connect(){
    var socket = new SockJS('http://localhost:8281/message/center?userId=654366374251302912'); //連接SockJS的endpoint名稱為"endpointAric"
    stompClient = Stomp.over(socket);//使用STMOP子協議的WebSocket客戶端
    stompClient.connect({},function(frame){//連接WebSocket服務端
        stompQueue();
    });
}

//列隊(一對一)
function stompQueue(){
    //通過stompClient.subscribe訂閱/user/queue/init/query
    stompClient.subscribe('/user/queue/init/query',function(response){
        var message=JSON.stringify(response.body);
        //alert(message);
    });

    stompClient.send("/app/init/query",{},'654366374251302912');
}

//強制關閉瀏覽器  調用websocket.close(),進行正常關閉
window.onunload = function() {
    disconnect()
}
//關閉雙通道
function disconnect(){
    if(stompClient != null) {
        stompClient.disconnect();
    }
    console.log("Disconnected");
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM