Springboot+Websocket+JWT實現的即時通訊模塊


場景

目前做了一個接口:邀請用戶成為某課程的管理員,於是我感覺有能在用戶被邀請之后能有個立馬通知他本人的機(類似微博、朋友圈被點贊后就有立馬能收到通知一樣),於是就琢磨琢磨搞了一套。

涉及技術棧

  • Springboot
  • Websocket 協議
  • JWT
  • (非必要)RabbitMQ 消息中間件

Websocket 協議

⭐推薦閱讀:Websocket 協議簡介

WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。
image.png

為什么使用Websocket?
因為普通的http協議一個最大的問題就是:通信只能由客戶端發起,服務器響應(半雙工),而我們希望可以全雙工通信。

因此一句話總結就是:建立websocket(以下簡稱為ws)連接是為了讓服務器主動向前端發消息,而無需等待前端的發起請求調用接口。

業務邏輯

我們現在有:

  • 用戶A
  • 用戶B
  • Springboot服務器
  • 場景:用戶A調用接口邀請用戶B成為課程成員
  • 涉及數據庫MySQL的數據表:
    • course_member_invitation,記錄課程邀請記錄,其形式如下(忽略時間等列):
id course_id account_id admin_id is_accepted bind_message_id
邀請id 課程id 受邀用戶id 邀請人id(因其本身為課程管理員) 受邀用戶是否接受了邀請 綁定的消息id
  • course_message,記錄消息記錄,其形式如下(忽略時間等列):
id type account_id source_id is_read is_ignored
消息id 消息類型 收信人用戶id 發信人用戶id 是否已讀 收信人是否忽略
  • (圖中沒有體現)course_message_type,記錄消息類型,其形式如下
id name description
消息類型id 消息類型名稱 描述
  • 涉及RabbitMQ(因不是重點,所以此處暫不討論,最后一章敘述)

rabbitMQ.png
業務步驟主要涉及兩個方法addCourseMemberInvitationsendMessage和一個組件CourseMemberInvitationListener,分別做:
addCourseMemberInvitation:

  1. 用戶A調用接口,邀請用戶B成為某門課程的管理員
  2. Springboot服務器收到請求,將這一請求生成邀請記錄、消息記錄,寫入下表:
    • course_member_invitation
    • course_message
  3. 寫入DB后,調用sendMessage處理發送消息的業務。
  4. 將執行的結果返回給用戶A

sendMessage

  1. 將消息記錄放入RabbitMQ中對應的消息隊列。

CourseMemberInvitationListener:

  1. 持續監聽其綁定的消息隊列
  2. 一旦消息隊列中有新消息,就嘗試通過ws連接發送消息。
    1. 用戶B在線,則可發送。
    2. 否則,則消費掉該消息,待用戶上線后從DB中讀入。

在Springboot中配置Websocket

  • pom.xml文件
<!-- WebSocket相關 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
  • Websocket Server組件配置初步:com.xxxxx.course.webSocket.WebSocketServer
/**
 * 進行前后端即時通信
 * https://blog.csdn.net/qq_33833327/article/details/105415393
 * session: https://www.codeleading.com/article/6950456772/
 * @author jojo
 */
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //響應路徑為 /ws/{uid} 的連接請求
@Component
public class WebSocketServer {
    /**
     * 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的
     */
    private static int onlineCount = 0;

    /**
     * concurrent 包的線程安全Set,用來存放每個客戶端對應的 myWebSocket對象
     * 根據 用戶id 來獲取對應的 WebSocketServer 示例
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

    /**
     * 與某個客戶端的連接會話,需要通過它來給客戶端發送數據
     */
    private Session session;

    /**
     * 用戶id
     */
    private String accountId ="";

    /**
     * logger
     */
    private static Logger LOGGER = LoggerUtil.getLogger();


    /**
     * 連接建立成功調用的方法
     *
     * @param session
     * @param uid 用戶id
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("uid") String uid) {

        this.session = session;

        //設置超時,同httpSession
        session.setMaxIdleTimeout(3600000);

        this.accountId = uid;

        //存儲websocket連接,存在內存中,若有同一個用戶同時在線,也會存,不會覆蓋原有記錄
        webSocketMap.put(accountId, this);
        LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));

        addOnlineCount(); // 在線數 +1
        LOGGER.info("有新窗口開始監聽:" + accountId + ",當前在線人數為" + getOnlineCount());

        try {
            sendMessage(JSON.toJSONString("連接成功"));
        } catch (IOException e) {
            e.printStackTrace();
            throw new ApiException("websocket IO異常!!!!");
        }

    }

    /**
     * 關閉連接
     */

    @OnClose
    public void onClose() {
        if (webSocketMap.get(this.accountId) != null) {
            webSocketMap.remove(this.accountId);
            subOnlineCount(); // 人數 -1
            LOGGER.info("有一連接關閉,當前在線人數為:" + getOnlineCount());
        }
    }

    /**
     * 收到客戶端消息后調用的方法
     * 這段代碼尚未有在使用,可以先不看,在哪天有需求時再改寫啟用
    * @param message 客戶端發送過來的消息
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        LOGGER.info("收到來自用戶 [" + this.accountId + "] 的信息:" + message);

        if (!StringTools.isNullOrEmpty(message)) {
            try {
                // 解析發送的報文
                JSONObject jsonObject = JSON.parseObject(message);
                // 追加發送人(防竄改)
                jsonObject.put("fromUserId", this.accountId);
                String toUserId = jsonObject.getString("toUserId");
                // 傳送給對應 toUserId 用戶的 WebSocket
                if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) {
                    webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                } else {
                    // 否則不在這個服務器上,發送到 MySQL 或者 Redis
                    LOGGER.info("請求的userId:" + toUserId + "不在該服務器上");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        LOGGER.error("用戶錯誤:" + this.accountId + ",原因:" + error);
    }

    /**
     * 實現服務器主動推送
     *
     * @param message 消息字符串
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        //需要使用同步機制,否則多並發時會因阻塞而報錯
        synchronized(this.session) {
            try {
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                LOGGER.error("發送給用戶 ["+this.accountId +"] 的消息出現錯誤",e.getMessage());
                throw e;
            }
        }
    }


    /**
     * 點對點發送
     * 指定用戶id
     * @param message 消息字符串
     * @param userId 目標用戶id
     * @throws IOException
     */
    public static void sendInfo(String message, String userId) throws Exception {

        Iterator entrys = webSocketMap.entrySet().iterator();
        while (entrys.hasNext()) {
            Map.Entry entry = (Map.Entry) entrys.next();
            if (entry.getKey().toString().equals(userId)) {
                webSocketMap.get(entry.getKey()).sendMessage(message);
                LOGGER.info("發送消息到用戶id為 [" + userId + "] ,消息:" + message);
                return;
            }
        }
        //錯誤說明用戶沒有在線,不用記錄log
        throw new Exception("用戶沒有在線");
    }


    private static synchronized int getOnlineCount() {
        return onlineCount;
    }

    private static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    private static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

幾點說明:

  • onOpen方法:服務器與前端建立ws連接成功時自動調用。
  • sendInfo方法:是服務器通過用戶id向指定用戶發送消息的方法,其為靜態公有方法,因此可供各service調用。調用的例子:
// WebSocket 通知前端
try {
    //調用WebsocketServer向目標用戶推送消息
    WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString());
    LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString());
} 
  • @ServerEndpoint注解:
@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //響應路徑為 /ws/{uid} 的連接請求

這么注解之后,前端只用發起 ws://xxx.xxx:xxxx/ws/{uid} 即可開啟ws連接(或者wss協議,增加TLS), 比如前端js代碼這么寫:

<script>
    var socket;

	/* 啟動ws連接 */
    function openSocket() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的瀏覽器不支持WebSocket");
        }else{
            console.log("您的瀏覽器支持WebSocket");
            
            //實現化WebSocket對象,指定要連接的服務器地址與端口  建立連接
            var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val();
            socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //轉換成ws協議
            console.log("正在連接:"+socketUrl);
            if(socket!=null){
                socket.close();
                socket=null;
            }
            socket = new WebSocket(socketUrl);
            
            /* websocket 基本方法 */
            
            //打開事件
            socket.onopen = function() {
                console.log(new Date()+"websocket已打開,正在連接...");
                //socket.send("這是來自客戶端的消息" + location.href + new Date());
            };
            
            //獲得消息事件
            socket.onmessage = function(msg) {
                console.log(msg.data);
                //發現消息進入    開始處理前端觸發邏輯
            };
            
            //關閉事件
            socket.onclose = function() {
                console.log(new Date()+"websocket已關閉,連接失敗...");
                //重新請求token
            };
            
            //發生了錯誤事件
            socket.onerror = function() {
                console.log("websocket連接發生發生了錯誤");
            }
        }
    
    }

	/* 發送消息 */
    function sendMessage() {
        if(typeof(WebSocket) == "undefined") {
            console.log("您的瀏覽器不支持WebSocket");
        }else {
            console.log("您的瀏覽器支持WebSocket");
            console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
        }
    }
</script>

存在的問題

一切看起來很順利,我只要放個用戶id進去,就可以想跟誰通訊就跟誰通訊咯!
但設想一個場景, 我是小明,uid為250,我想找uid為520的小花聊天,理論上我只要發起ws://xxx.xxx:xxxx/ws/250請求與服務器連接,小花也發起ws://xxx.xxx:xxxx/ws/520與服務器建立ws連接,我們就能互發消息了吧!
這時候出現了uid為1的小黃,他竟然想挖牆腳!?他竟然學過js,自己發了ws://xxx.xxx:xxxx/ws/520跟服務器建立ws連接,而小花根本不想和我發消息,所以實際上是小黃冒充了小花,把小花NTR了(實際上人家並不在乎😥),跟我愉快地聊天?!
那怎么辦啊?我怎么才能知道在跟我Websocket的究竟是美女小花還是黃毛小黃啊??!
這就引入了JWT!

JWT——JSON WEB TOKEN

可以看到后端會響應/ws/{token}的連接請求,前端可以發/ws/{token}的連接請求,一開始寫的時候看網上的都是用/ws/{userId}來建立該id的用戶與服務器的ws連接,但這樣的話可能就很不安全,無法保證使用某個id建立的ws確實就是真實用戶發起的連接。(小花被小黃NTR的悲慘故事)
所以在調研了很多公開的解決方案,看到有改用令牌(token)來建立ws連接的說法,同時驗證用戶身份(事實上一些其他接口也可以用令牌(token)來保證接口安全性),於是打算自己試試看,未必是最好的,甚至可能有點頭痛醫頭,腳痛醫腳,但總歸是個經驗,記錄一下。。

//Websocket Server
@ServerEndpoint(value = "/ws/{token}",configurator = WebSocketConfig.class) //響應路徑為 /ws/{token} 的連接請求
@Component
public class WebSocketServer {
    ...
}

js:

var socketUrl="http://xxx.xxx.xxx.xxx:xxxx/ws/"+$("#token").val();
socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //轉換成ws協議
....
socket = new WebSocket(socketUrl);

業務邏輯

image.png

為什么用JWT

最初考慮的是用/ws/{userId}來建立ws連接,然后在后台拿session中的user來對比用戶id,判斷合法性。
結果發現ws的session和http的session是不同的,還不好拿,可能得想辦法把http的session存到redis或者DB(也可以存在內存中,只是可能又要消耗內存資源),在建立ws連接之前去拿出來驗證合法性。后面查到了還有JWT這種好東西。

JWT好在哪里?

⭐推薦閱讀:什么是 JWT -- JSON WEB TOKEN

我的總結:

  • token可以過期
  • 驗證token可以不用存在redis或者DB或者內存,完全依賴算法即可
  • 只要從前端請求中拿到token,后端就可以根據封裝好的算法驗證這個token合不合法,有沒有被篡改過(這點很重要),過期了沒有
  • 可以將用戶id、用戶名等非敏感數據一同封裝到token中,后端拿到token后可以解碼拿到數據,只要這個token合法,這些發來的數據就是可信的(小黃就算自己發明了token也不作數),是沒有被篡改的(小黃就算把我小花的token偷走把用戶id改成自己的也沒用,后台可以算出來被改過),可以建立ws連接,調用websocket server進行通訊。

    教程
    JWT教程

    整合到本項目中

pom.xml

<!-- JWT 相關     -->
<dependency>
    <groupId>com.auth0</groupId>
    <artifactId>java-jwt</artifactId>
    <version>3.12.1</version>
</dependency>

<!--   base64     -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.12</version>
</dependency>

token的前兩個部分是由base64編碼的,所以需要codec進行解碼。

實現一個JWT工具類

目前基本當作工具使用
com.xxxx.course.util.JWTUtil

/**
 * @author jojo
 * JWT 令牌工具類
 */
public class JWTUtil {

    /**
     * 默認本地密鑰
     * @notice: 非常重要,請勿泄露
     */
    private static final String SECRET = "doyoulikevanyouxi?" //亂打的

    /**
     * 默認有效時間單位,為分鍾
     */
    private static final int TIME_TYPE = Calendar.MINUTE;

    /**
     * 默認有效時間長度,同http Session時長,為60分鍾
     */
    private static final int TIME_AMOUNT = 600;

    /**
     * 全自定生成令牌
     * @param payload payload部分
     * @param secret 本地密鑰
     * @param timeType 時間類型:按Calender類中的常量傳入:
     *         Calendar.YEAR;
     *         Calendar.MONTH;
     *         Calendar.HOUR;
     *         Calendar.MINUTE;
     *         Calendar.SECOND;等
     * @param expiredTime 過期時間,單位由 timeType 決定
     * @return 令牌
     */
    public static String generateToken(Map<String,String> payload,String secret,int timeType,int expiredTime){
        JWTCreator.Builder builder = JWT.create();

        //payload部分
        payload.forEach((k,v)->{
            builder.withClaim(k,v);
        });
        Calendar instance = Calendar.getInstance();
        instance.add(timeType,expiredTime);

        //設置超時時間
        builder.withExpiresAt(instance.getTime());

        //簽名
        return builder.sign(Algorithm.HMAC256(secret)).toString();
    }

    /**
     * 生成token
     * @param payload payload部分
     * @return 令牌
     */
    public static String generateToken(Map<String,String> payload){
        return generateToken(payload,SECRET,TIME_TYPE,TIME_AMOUNT);
    }


    省略了重載方法....
        

    /**
     * 驗證令牌合法性
     * @param token 令牌
     * @return
     */
    public static void verify(String token) {
        //如果有任何驗證異常,此處都會拋出異常
        JWT.require(Algorithm.HMAC256(SECRET)).build().verify(token);
    }

    /**
     * 自定義密鑰解析
     * @param token 令牌
     * @param secret 密鑰
     * @return 結果
     */
    public static DecodedJWT parseToken(String token,String secret) {
        DecodedJWT decodedJWT = JWT.require(Algorithm.HMAC256(secret)).build().verify(token);
        return decodedJWT;
    }

    /**
     * 解析令牌
     * 當令牌不合法將拋出錯誤
     * @param token
     * @return
     */
    public static DecodedJWT parseToken(String token) {
        return parseToken(token,SECRET);
    }

    /**
     * 解析令牌獲得payload,值為claims形式
     * @param token
     * @param secret
     * @return
     */
    public static Map<String,Claim> getPayloadClaims(String token,String secret){
        DecodedJWT decodedJWT = parseToken(token,secret);
        return decodedJWT.getClaims();
    }

    /**
     * 默認解析令牌獲得payload,值為claims形式
     * @param token 令牌
     * @return
     */
    public static Map<String,Claim> getPayloadClaims(String token){
        return getPayloadClaims(token,SECRET);
    }

    /**
     * 解析令牌獲得payload,值為String形式
     * @param token 令牌
     * @return
     */
    public static Map<String,String> getPayloadString(String token,String secret){
        Map<String, Claim> claims = getPayloadClaims(token,secret);
        Map<String,String> payload = new HashMap<>();
        claims.forEach((k,v)->{
            if("exp".equals(k)){
                payload.put(k,v.asDate().toString());
            }
            else {
                payload.put(k, v.asString());
            }
        });

        return payload;
    }
    /**
     * 默認解析令牌獲得payload,值為String形式
     * @param token 令牌
     * @return
     */
    public static Map<String,String> getPayloadString(String token){
        return getPayloadString(token,SECRET);
    }


    /**
     * 通過用戶實體生成令牌
     * @param user 用戶實體
     * @return
     */
    public static String generateUserToken(Account user){
        return generateUserToken(user.getId());
    }

    /**
     * 通過用戶id生成令牌
     * @param accountId 用戶id
     * @return
     */
    public static String generateUserToken(Integer accountId){
        return generateUserToken(accountId.toString());
    }


        /**
         *  通過用戶id生成令牌
         * @param accountId 用戶id
         * @return
         */
    public static String generateUserToken(String accountId){
        Map<String,String> payload = new HashMap<>();
        payload.put("accountId",accountId);
        return generateToken(payload);
    }

    /**
     * 從令牌中解析出用戶id
     * @param token 令牌
     * @return
     */
    public static String parseUserToken(String token){
        Map<String, String> payload = getPayloadString(token);
        return payload.get("accountId");
    }

}

調整登陸 service 中,登陸時返回一個token

com.xxxx.course.service.impl.AccountServiceImpl

public JSONObject login(){
    登陸成功...
    ...
    //生成並放入通信令牌token,令牌中帶有用戶id,用以鑒別身份
    String token = JWTUtil.generateUserToken(user);
    jsonObject.put("token",token);
    ...
    后續操作...
    return jsonObject; 
}

WebSocket 連接握手時進行身份驗證

之后前端只要攜帶token進行ws連接即可,寫了個ws的配置類,繼承了一個websocket連接的監聽器ServerEndpointConfig.Configurator,進行token的驗證。
com.XXXXX.course.config.webSocket.WebSocketConfig

/**
 * 開啟 WebSocket 支持,進行前后端即時通訊
 * https://blog.csdn.net/qq_33833327/article/details/105415393
 * session配置:https://www.codeleading.com/article/6950456772/
 * @author jojo
 */
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator implements WebSocketConfigurer {

    /**
     * logger
     */
    private static final Logger LOGGER = LoggerUtil.getLogger();

    /**
     * 監聽websocket連接,處理握手前行為
     * @param sec
     * @param request
     * @param response
     */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {

        String[] path = request.getRequestURI().getPath().split("/");
        String token = path[path.length-1];

        //todo 驗證用戶令牌是否有效
        try {
            JWTUtil.verify(token);
        } catch (Exception e) {
            LOGGER.info("攔截了非法連接",e.getMessage());
            return;
        }
        super.modifyHandshake(sec, request, response);
    }


    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

    ...
}

這樣,每次服務器建立ws連接前,都要驗證token的合法性,僅僅通過JWTUtil.verify(token);即可!當token不合法,就會拋出異常。

再配合重寫websocket serveronOpen方法,應該就能進行身份可信的通信了!

/**
     * 連接建立成功調用的方法
     *
     * @param session
     * @param token 用戶令牌
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token) {

        this.session = session;
        this.token = token;

        //設置超時,同httpSession
        session.setMaxIdleTimeout(3600000);

        //解析令牌,拿取用戶信息
        Map<String, String> payload = JWTUtil.getPayloadString(token);
        String accountId = payload.get("accountId");
        this.accountId = accountId;

        //存儲websocket連接,存在內存中,若有同一個用戶同時在線,也會存,不會覆蓋原有記錄
        webSocketMap.put(accountId, this);
        LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString()));

        addOnlineCount(); // 在線數 +1
        LOGGER.info("有新窗口開始監聽:" + accountId + ",當前在線人數為" + getOnlineCount());
        ...

(非必須)RabbitMQ消息中間件

教程

為什么我要用RabbitMQ

  • 正經理由:
    • 可以將寫DB與發送消息兩件事情異步處理,這樣響應會更快。
    • 未來可以拓展為集群
  • 真正理由:
    • 人傻
    • 閑的

整合到本項目中

pom.xml

<!-- rabbitMQ 相關-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ 配置類

  • com.xxxx.course.config.rabbitMQ.RabbitMQConfig
/**
 * @author jojo
 */
@Configuration
public class RabbitMQConfig {
	
    /**
    * 指定環境
    */
    @Value("${spring.profiles.active}")
    private String env; 

    /**
     * logger
     */
    public static final Logger LOGGER = LoggerUtil.getLogger();

    /**
     * 交換機名
     */
    public String MEMBER_INVITATION_EXCHANGE = RabbitMQConst.MEMBER_INVITATION_EXCHANGE;


    /**
     * 交換機隊列
     */
    public String MEMBER_INVITATION_QUEUE = RabbitMQConst.MEMBER_INVITATION_QUEUE;


    /**
     * 聲明 課程成員邀請消息 交換機
     * @return
     */
    @Bean("memberInvitationDirectExchange")
    public Exchange memberInvitationDirectExchange(){
        //根據項目環境起名,比如開發環境會帶dev字樣
        String exchangeName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_EXCHANGE);
        return ExchangeBuilder.directExchange(exchangeName).durable(true).build();
    }
	
    

    /**
     * 聲明 課程成員邀請消息 隊列
     * @return
     */
    @Bean("memberInvitationQueue")
    public Queue memberInvitationQueue(){
        //同上
        String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
        return QueueBuilder.durable(queueName).build();
    }

    /**
     * 課程成員邀請消息的隊列與交換機綁定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding memberInvitationBinding(@Qualifier("memberInvitationQueue") Queue queue,@Qualifier("memberInvitationDirectExchange") Exchange exchange){
        String queueName = RabbitUtil.generateRabbitName(env,MEMBER_INVITATION_QUEUE);
        return BindingBuilder.bind(queue).to(exchange).with(queueName).noargs();
    }
    
    /**
    * Springboot啟動時, 驗證隊列名根據環境命名正確
    */
    @Bean
    public void verify(){
        Queue memberInvitationQueue = SpringUtil.getBean("memberInvitationQueue", Queue.class);
        Exchange memberInvitationDirectExchange = SpringUtil.getBean("memberInvitationDirectExchange", Exchange.class);

        LOGGER.info("消息隊列 ["+memberInvitationQueue.getName()+"] 創建成功");
        LOGGER.info("消息交換器 ["+memberInvitationDirectExchange.getName()+"] 創建成功");

        //放入映射中存儲
        RabbitMQConst.QUEUE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationQueue.getName());
        RabbitMQConst.EXCHANGE_MAP.put(MessageConst.MEMBER_INVITATION,memberInvitationDirectExchange.getName());
    }

    /**
     * 自定義messageConverter使得消息中攜帶的pojo序列化成json格式
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

}

項目運行后,在 RabbitMQ服務器 中就會出現剛剛注冊的隊列與交換器(圖是舊的,沒有體現根據環境命名隊列,但是其實做到了):
image.png

課程管理成員邀請接口

  • com.xxxx.course.service.impl.CourseMemberInvitationServiceImpl
    • 首先在接口中注入操作rabbitMQ的Bean
@Autowired
RabbitTemplate rabbitTemplate;
  • 在課程管理員業務代碼中加入向rabbitMQ發送消息的邏輯
    • CourseMemberInvitationServiceImpl
@Override
@Transactional(rollbackFor = Exception.class) //開啟事務,以防萬一
public JSONObject addCourseMemberInvitation(Integer courseId, String username, String requestIp) {

    //檢查課程是否存在
    courseService.hasCourse(courseId);

    //檢查用戶是否已加入課程平台
    accountService.hasAccount(username);

    /* 若存在則查看邀請記錄是否已經存在 */

    //獲取用戶id
    Account account = accountService.getAccountByUsernameOrEmail(username);

    //檢查用戶名是否存在
    if(account==null){
        JSONObject result = new JSONObject();
        result.put(RESULT,FAILED);
        result.put(MSG,"用戶不存在");
        return result;
    }

    Integer accountId = account.getId();

    //獲得發出邀請人的id
    Account user = (Account) SecurityUtils.getSubject().getSession().getAttribute("user");
    Integer adminId = user.getId();

    //檢查是否自己邀請自己,是則不再執行
    hasInvitedOneself(accountId,adminId);

    //檢查是否已經邀請過,是則不再執行
    hasInvited(courseId,accountId,adminId);

    /* 若不存在則新建邀請記錄 */

    CourseMemberInvitation courseMemberInvitation = new CourseMemberInvitation();
    courseMemberInvitation.setCourseId(courseId);
    courseMemberInvitation.setAccountId(accountId);
    courseMemberInvitation.setAdminId(adminId);
    courseMemberInvitation.setCreateTime(new Date());
    courseMemberInvitation.setCreateIp(requestIp);

    //新建消息
    CourseMessage courseMessage = courseMessageService.newMessage(MessageConst.MEMBER_INVITATION, accountId, adminId);

    //綁定邀請記錄與消息記錄
    courseMemberInvitation.setBindMessageId(courseMessage.getId());

    //插入數據庫(這里用的是MybatisPlus)
    int insertResult = courseMemberInvitationDao.insert(courseMemberInvitation);

    //根據數據庫插入返回值封裝json
    JSONObject result = insertCourseMemberInvitationResult(insertResult, courseMemberInvitation);

    if(result.get(RESULT).equals(FAILED)){
        //若數據庫操作沒有成功,則直接返回json
        return result;
    }


    /* 發送消息 */
    courseMessageService.sendMessage(courseMessage);

    //根據插入情況返回json
    return result;
}
  • courseMessageService中實現的sendMessage方法:
@Autowired
RabbitTemplate rabbitTemplate;

@Override
public void sendMessage(CourseMessage courseMessage) {
    //嘗試發送
    //將消息放入rabbitMQ
    storeInRabbitMQ(courseMessage);
}

private void storeInRabbitMQ(CourseMessage courseMessage){
    //將消息放入rabbitMQ
    String exchangeName = (String) RabbitMQConst.EXCHANGE_MAP.get(courseMessage.getType());
    String routeKey = (String) RabbitMQConst.QUEUE_MAP.get(courseMessage.getType());
    try {
        //送到rabbitMQ隊列中
        rabbitTemplate.convertAndSend(exchangeName,routeKey,courseMessage);
    }
    catch (Exception e){
        LOGGER.error("插入rabbitMQ失敗",e);
    }
}
  • com.xxxx.course.rabbitMQ.listener.CourseMemberInvitationListener

該類是用以監聽_課程成員邀請_消息的,即是在rabbitMQ服務器建立的member_invitation隊列。

/**
 * @author jojo
 */
@Component
public class CourseMemberInvitationListener {

    @Autowired
    MessageHandler messageHandler;

    /**
     * logger
     */
    public static final Logger LOGGER = LoggerUtil.getLogger();

    /**
     * spEL表達式
     * 一旦隊列中有新消息,這個方法就會被觸發
     */
    @RabbitListener(queues = "#{memberInvitationQueue.name}")
    public void listenCourseMemberInvitation(Message message){
        messageHandler.handleMessage(message);
    }


}
  • com.xxxx.course.rabbitMQ.MessageHandler, 該類是用來處理監聽事件的:
@Service
public class MessageHandler {

    @Autowired
    MessageConverter messageConverter;

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * logger
     */
    public static final Logger LOGGER = LoggerUtil.getLogger();

    /**
     * 隊列消息處理業務
     * @param message
     */
    public void handleMessage(Message message){
        CourseMessage courseMessage = (CourseMessage) messageConverter.fromMessage(message);

        // WebSocket 通知前端
        try {
            //將消息發給指定用戶
            WebSocketServer.sendInfo(JSON.toJSONString(courseMessage),courseMessage.getAccountId().toString());
        } catch (Exception e) {
            //消息存在數據庫中了,待用戶上線后再獲取
            LOGGER.info("發送消息id為 ["+courseMessage.getId()+"] 的消息給->消息待收方id為 ["+courseMessage.getAccountId().toString()+"] 的用戶,但其不在線上。");
        }
    }
}

這樣做應該就可以用RabbitMQ了。

為什么不用HttpSession來驗證用戶真實性?

如果要驗證一個用戶的真實性,為什么不直接用HttpSession來驗證?比如在登錄時,將用戶的id存入HttpSession中,然后在Websocket握手連接之前,驗明前端通過/ws/{userId}中的userId與HttpSession中的用戶id進行比較就行了

這屬實是靈魂拷問了

經過自己實驗之后,發現按照很多方案,都沒辦法在ws連接的時候中,拿到HttpSession,比如在WebSocketConfig中按如下代碼:

	@Override
20     public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
21         ...
    	   //嘗試獲取HttpSession
    	   HttpSession httpSession = (HttpSession)request.getHttpSession();
22         sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
    	   ...
23     }

調試一番發現httpSession都是null,於是跟前端佬聊了聊,得到結論是wshttp連接是兩個連接,當然拿不到啦
哎,我太菜了,就先這么認為好了

那或者能否把sessionId對應的userid存數據庫(mysql或者redis),每次ws連接時再驗證?
那就不如JWT了,JWT不需要存儲。

更理想的情況是,我能在WebSocketServer這個模塊中拿到httpSession的實體,這樣子驗證用戶真實性就很好做了,但是暫時還沒找到方法,需要再研究研究。
暫時想到這些,如果有更好的做法會再更新

總結

本文的難點是ws的認證問題,雖然用超級好用的JWT解決了,但是隨之而來的還有很多問題,比如:

  1. 注銷登錄等場景下 token 還有效
    與之類似的具體相關場景有:
    1. 退出登錄;
    2. 修改密碼;
    3. 服務端修改了某個用戶具有的權限或者角色;
    4. 用戶的帳戶被刪除/暫停。
    5. 用戶由管理員注銷;
  2. token 的續簽問題
    token 有效期一般都建議設置的不太長,那么 token 過期后如何認證,如何實現動態刷新 token,避免用戶經常需要重新登錄?

這些問題還是有待解決是😂
本文就當記錄一下自己的胡作非為吧😂

總之,至少小花再也不怕被小黃NTR了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM