spring websocket集群問題的簡單記錄


前言

最近公司里遇到一個問題,在集群中一些websocket的消息丟失了。
產生問題的原理很簡單,發送消息的服務和接收者連接的服務不是同一個服務。

解決方案

用中間件(mq, redis etc.)來在服務之間進行通信。

不直接發送websocket消息,而是將消息放在mq或者redis的list中。
並在redis中維護連接信息,服務根據連接信息來判斷自己是否需要處理消息,或者將消息發給接收者連接的服務。

代碼示例

我們的項目中使用的是Spring WebSocket,並且使用了STOMP協議,可以去官網查看文檔。

代碼示例只做維護連接信息的代碼示例,其他部分就不放上來了。

維護連接信息的代碼示例

想要在維護STOMP協議的連接信息,可以查看文檔的這一部分Listening To ApplicationContext Events and Intercepting Messages

這里的連接信息只要是能夠標識出不同的服務就OK。

一下是監聽了訂閱事件的Listener的部分代碼:

package cn.fjhdtp.websocket.interceptor;

import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
            WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
			//握手前,往attributes中增加所需信息


        Object loginBean = ...;//獲取登錄的用戶信息(或其他信息)
        attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean);

        return super.beforeHandshake(request, response, wsHandler, attributes);
    }
}
package cn.fjhdtp.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;

import java.util.Map;

@Component
public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {

    @Autowired
    @Qualifier("serversideMessageTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Autowired
    private IMessageHandler messageHandler;

    @Override
    public void onApplicationEvent(SessionSubscribeEvent event) {
			//獲取訂閱的destination
          String destination = (String) event.getMessage().getHeaders().get("simpDestination");
            //獲取登錄信息
            Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
			//TODO 向redis中增加連接信息
    }
}
package cn.fjhdtp.message.listener;

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;


import java.util.Map;

@Component
public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {

    @Override
    public void onApplicationEvent(SessionDisconnectEvent event) {
        // stomp連接斷開,清除連接信息
		//從attributes中獲取登錄信息(或其他信息)
        Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);

        //從redis中移除連接信息
    }
}

當然,有些情況下可能不會正常的觸發斷開連接的事件(在was下就不會有這個事件),因此還會需要HeartBeat。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM