WebSocket集群分布式改造:實現多人在線聊天室


前言

書接上文,我們開始對我們的小小聊天室進行集群化改造。

上文地址:

[WebSocket入門]手把手搭建WebSocket多人在線聊天室(SpringBoot+WebSocket)

本文內容摘要:

  • 為何要改造為分布式集群
  • 如何改造為分布式集群
    • 用戶在聊天室集群如何發消息
    • 用戶在聊天室集群如何接收消息
  • 補充知識點:STOMP 簡介
  • 功能一:向聊天室集群中的全體用戶發消息——Redis的訂閱/發布
  • 功能二:集群集群用戶上下線通知——Redis訂閱發布
  • 功能三:集群用戶信息維護——Redis集合
  • WebSocket集群還有哪些可能性

本文源碼:(媽媽再也不用擔心我無法復現文章代碼啦)

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集群版

如果您覺得這個教程對您有用,請關注我的技術公眾號:Rude3Knife,不定時更新技術點滴。

正文

WebSocket集群/分布式改造:實現多人在線聊天室

為何要改造為分布式集群

分布式就是為了解決單點故障問題,想象一下,如果一個服務器承載了1000個大佬同時聊天,服務器突然掛了,1000個大佬瞬間全部掉線,大概明天你就被大佬們吊起來打了。

當聊天室改為集群后,就算服務器A掛了,服務器B上聊天的大佬們還可以愉快的聊天,並且在前端還能通過代碼,讓連接A的大佬們快速重連至存活的服務器B,繼續和大家愉快的聊天,豈不美哉!

總結一下:實現了分布式WebSocket后,我們可以將流量負載均衡到不同的服務器上並提供一種通信機制讓各個服務器能進行消息同步(不然用戶A連上服務器A,用戶B臉上服務器B,它們發消息的時候對方都沒法收到)。

如何改造為分布式集群

當我們要實現分布式的時候,我們則需要在各個機器上共享這些信息,所以我們需要一個Publish/Subscribe的中間件。我們現在使用Redis作為我們的解決方案。

1. 用戶在聊天室集群如何發消息

假設我們的聊天室集群有服務器A和B,用戶Alice連接在A上,Bob連接在B上、

Alice向聊天室的服務器A發送消息,A服務器必須要將收到的消息轉發到Redis,才能保證聊天室集群的所有服務器(也就是A和B)能夠拿到消息。否則,只有Alice在的服務器A能夠讀到消息,用戶Bob在的服務器B並不能收到消息,A和B也就無法聊天了。

2. 用戶在聊天室集群如何接收消息

說完了發送消息,那么如何保證Alice發的消息,其他所有人都能收到呢,前面我們知道了Alice發送的消息已經被傳到了Redis的頻道,那么所有服務器都必須訂閱這個Redis頻道,然后把這個頻道的消息轉發到自己的用戶那里,這樣自己服務器所管轄的用戶就能收到消息。

補充知識點:STOMP 簡介

上期我們搭建了個websocket聊天室demo,並且使用了STOMP協議,但是我並沒有介紹到底什么是STOMP協議,同學們會有疑惑,這里對於STOMP有很好地總結:

當直接使用WebSocket時(或SockJS)就很類似於使用TCP套接字來編寫Web應用。因為沒有高層級的線路協議(wire protocol),因此就需要我們定義應用之間所發送消息的語義,還需要確保連接的兩端都能遵循這些語義。

就像HTTP在TCP套接字之上添加了請求-響應模型層一樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。

與HTTP請求和響應類似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,如下就是發送數據的一個STOMP幀:

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20

{"message":"Marco!"}

好了,介紹完了概念,讓我們開始動手改造!

功能一:向聊天室集群中的全體用戶發消息——Redis的訂閱/發布

如果你不熟悉Redis的sub/pub(訂閱/發布)功能,請看這里進行簡單了解它的用法,很簡單:

https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

在我們上篇文章的Demo基礎上,我們進行集群改造。上一篇文章的源碼見下方:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/單機版

1. 添加Redis依賴pom

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2. application.properties新增redis配置

當然首先要確保你安裝了Redis,windows下安裝redis比較麻煩,你可以搜索redis-for-windows下載安裝。

# redis 連接配置
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false
# 空閑連接最大數
spring.redis.jedis.pool.max-idle=10
# 獲取連接最大等待時間(s)
spring.redis.jedis.pool.max-wait=60000

3. 在application.properties添加頻道名定義

# Redis定義
redis.channel.msgToAll = websocket.msgToAll

4. 新建redis/RedisListenerBean

package cn.monitor4all.springbootwebsocketdemo.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import java.net.Inet4Address;
import java.net.InetAddress;

/**
 * Redis訂閱頻道屬性類
 * @author yangzhendong01
 */
@Component
public class RedisListenerBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);

    @Value("${server.port}")
    private String serverPort;

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;

    /**
     * redis消息監聽器容器
     * 可以添加多個監聽不同話題的redis監聽器,只需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
     * 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 監聽msgToAll
        container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
        LOGGER.info("Subscribed Redis channel: " + msgToAll);
        return container;
    }
}

可以看到,我們在代碼里監聽了redis頻道msgToAll,這個是在application.properties定義的,當然如果你懶得定義,這里可以寫死。

5. 聊天室集群:發消息改造

我們單機聊天室的發送消息Controller是這樣的:

@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
    public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
        return chatMessage;

我們前端發給我們消息后,直接給/topic/public轉發這個消息,讓其他用戶收到。

在集群中,我們需要把消息轉發給Redis,並且不轉發給前端,而是讓服務端監聽Redis消息,在進行消息發送。

將Controller改為:

@Value("${redis.channel.msgToAll}")
private String msgToAll;

@Autowired
private RedisTemplate<String, String> redisTemplate;
    
@MessageMapping("/chat.sendMessage")
    public void sendMessage(@Payload ChatMessage chatMessage) {
        try {
            redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

你會發現我們在代碼中使用了JsonUtil將實體類ChatMessage轉為了Json發送給了Redis,這個Json工具類需要使用到FaskJson依賴:

  1. pom添加FastJson依賴
<!-- json -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.58</version>
</dependency>
  1. 添加Json解析工具類JsonUtil,提供對象轉Json,Json轉對象的能力
package cn.monitor4all.springbootwebsocketdemo.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * JSON 轉換
 */
public final class JsonUtil {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);

    /**
     * 把Java對象轉換成json字符串
     *
     * @param object 待轉化為JSON字符串的Java對象
     * @return json 串 or null
     */
    public static String parseObjToJson(Object object) {
        String string = null;
        try {
            string = JSONObject.toJSONString(object);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return string;
    }

    /**
     * 將Json字符串信息轉換成對應的Java對象
     *
     * @param json json字符串對象
     * @param c    對應的類型
     */
    public static <T> T parseJsonToObj(String json, Class<T> c) {
        try {
            JSONObject jsonObject = JSON.parseObject(json);
            return JSON.toJavaObject(jsonObject, c);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return null;
    }
}

這樣,我們接收到用戶發送消息的請求時,就將消息轉發給了redis的頻道websocket.msgToAll

6. 聊天室集群:接收消息改造

單機的聊天室,我們接收消息是通過Controller直接把消息轉發到所有人的頻道上,這樣就能在所有人的聊天框顯示。

在集群中,我們需要服務器把消息從Redis中拿出來,並且推送到自己管的用戶那邊,我們在Service層實現消息的推送。

  • 在處理消息之后發送消息:
    正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 注解可以處理客戶端發送過來的消息,並選擇方法是否有返回值。
    如果 @MessageMapping注解的控制器方法有返回值的話,返回值會被發送到消息代理,只不過會添加上"/topic"前綴。可以使用@SendTo 重寫消息目的地;
    如果 @SubscribeMapping注解的控制器方法有返回值的話,返回值會直接發送到客戶端,不經過代理。如果加上@SendTo 注解的話,則要經過消息代理。
  • 在應用的任意地方發送消息:
    spring-websocket 定義了一個 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以實現自由的向任意目的地發送消息,並且訂閱此目的地的所有用戶都能收到消息。

我們在service實現發送,需要使用上述第二種方法。

新建類service/ChatService:

package cn.monitor4all.springbootwebsocketdemo.service;

import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;

@Service
public class ChatService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);

    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;

    public void sendMsg(@Payload ChatMessage chatMessage) {
        LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }

}

我們在哪里調用這個service呢,我們需要在監聽到消息后調用,所以我們就要有下面的Redis監聽消息處理專用類

新建類redis/RedisListenerHandle:

package cn.monitor4all.springbootwebsocketdemo.redis;

import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import cn.monitor4all.springbootwebsocketdemo.service.ChatService;
import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * Redis訂閱頻道處理類
 * @author yangzhendong01
 */
@Component
public class RedisListenerHandle extends MessageListenerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;

    @Value("${server.port}")
    private String serverPort;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private ChatService chatService;

    /**
     * 收到監聽消息
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String rawMsg;
        String topic;
        try {
            rawMsg = redisTemplate.getStringSerializer().deserialize(body);
            topic = redisTemplate.getStringSerializer().deserialize(channel);
            LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return;
        }


        if (msgToAll.equals(topic)) {
            LOGGER.info("Send message to all users:" + rawMsg);
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            // 發送消息給所有在線Cid
            chatService.sendMsg(chatMessage);
        } else {
            LOGGER.warn("No further operation with this topic!");
        }
    }
}

7. 看看效果

這樣,我們的改造就基本完成了!我們看一下效果

我們將服務器運行在8080上,然后打開localhost:8080,起名Alice進入聊天室

隨后,我們在application.properties中將端口server.port=8081

再次運行程序(別忘了開啟IDEA的“允許啟動多個並行服務”設置,不然會覆蓋掉你的8080服務,如下圖),在8081啟動一個聊天室,起名Bob進入聊天室。

如下兩圖,我們已經可以在不同端口的兩個聊天室,互相聊天了!(注意看url)

在互相發送消息是,我們還可以使用命令行監聽下Redis的頻道websocket.msgToAll,可以看到雙方傳送的消息。如下圖:

我們還可以打開Chrome的F12控制台,查看前端的控制台發送消息的log,如下圖:

大功告成了嗎?

功能實現了,但是並不完美!你會發現,Bob的加入並沒有提醒Bob進入了聊天室(在單機版是有的),這是因為我們在“加入聊天室”的代碼還沒有修改,在加入時,只有Bob的服務器B里的其他用戶知道Bob加入了聊天室。我們還能再進一步!

功能二/功能三:集群用戶上下線通知,集群用戶信息存儲

我們需要彌補上面的不足,將用戶上線下線的廣播發送到所有服務器上。

此外,我還希望以后能夠查詢集群中所有的在線用戶,我們在redis中添加一個set,來保存用戶名,這樣就可以隨時得到在線用戶的數量和名稱。

1. 在application.properties添加頻道名定義

# Redis定義
redis.channel.userStatus = websocket.userStatus
redis.set.onlineUsers = websocket.onlineUsers

我們增加兩個定義

  • 第一個是新增redis頻道websocket.userStatus用來廣播用戶上下線消息

  • 第二個是redis的set,用來保存在線用戶信息

2. 在RedisListenerBean添加新頻道監聽

container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));

3. 在ChatService中添加

public void alertUserStatus(@Payload ChatMessage chatMessage) {
        LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }

在service中我們向本服務器的用戶廣播消息,用戶上線或者下線的消息都通過這里傳達。

4. 修改ChatController中的addUser方法

@MessageMapping("/chat.addUser")
    public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {

        LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
        try {
            headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
            redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
            redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

我們修改了addUser方法,在這里往redis中廣播用戶上線的消息,並把用戶名username寫入redis的set中(websocket.onlineUsers)

5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法

@EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {

        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String username = (String) headerAccessor.getSessionAttributes().get("username");

        if(username != null) {
            LOGGER.info("User Disconnected : " + username);
            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setType(ChatMessage.MessageType.LEAVE);
            chatMessage.setSender(username);
            try {
                redisTemplate.opsForSet().remove(onlineUsers, username);
                redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }

        }
    }

在用戶關閉網頁時,websocket會調用該方法,我們在這里需要把用戶從redis的在線用戶set里刪除,並且向集群發送廣播,說明該用戶退出聊天室。

6. 修改Redis監聽類RedisListenerHandle

 else if (userStatus.equals(topic)) {
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            if (chatMessage != null) {
                chatService.alertUserStatus(chatMessage);
            }

在監聽類中我們接受了來自userStatus頻道的消息,並調用service

7. 看看效果

此外,我們還可以在Reids中查詢到用戶信息:

WebSocket集群還有哪些可能性

有了這兩篇文章的基礎, 我們當然還能實現以下的功能:

  • 某用戶A單獨私信給某用戶B,或者私信給某用戶群(用戶B和C)
  • 系統提供外部調用接口,給指定用戶/用戶群發送消息,實現消息推送
  • 系統提供外部接口,實時獲取用戶數據(人數/用戶信息)

感興趣的同學可以自己試試看。

參考文獻

深入淺出Websocket(二)分布式Websocket集群

https://juejin.im/post/6844903584929153032

Spring消息之STOMP:

https://www.cnblogs.com/jmcui/p/8999998.html

總結

我們在本文中把單機版的聊天室改為了分布式聊天室,大大提高了聊天室可用性。

本文工程源代碼:

單機版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/單機版

集群版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集群版

如果您覺得這個教程對您有用,請關注我的技術公眾號:Rude3Knife,不定時更新技術點滴。

關注我

我目前是一名后端開發工程師。主要關注后端開發,數據安全,爬蟲,邊緣計算等方向。

微信:yangzd1102(請注明來意)

Github:@qqxx6661

個人博客:

原創博客主要內容

  • Java知識點復習全手冊
  • Leetcode算法題解析
  • 劍指offer算法題解析
  • SpringCloud菜鳥入門實戰系列
  • SpringBoot菜鳥入門實戰系列
  • 爬蟲相關技術文章
  • 后端開發相關技術文章

個人公眾號:后端技術漫談

個人公眾號:后端技術漫談

如果文章對你有幫助,不妨收藏起來並轉發給您的朋友們~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM