目錄
前言
最近公司里遇到一個問題,在集群中一些websocket的消息丟失了。
產生問題的原理很簡單,發送消息的服務和接收者連接的服務不是同一個服務。
解決方案
用中間件(mq, redis etc.)來在服務之間進行通信。
不直接發送websocket消息,而是將消息放在mq或者redis的list中。
並在redis中維護連接信息,服務根據連接信息來判斷自己是否需要處理消息,或者將消息發給接收者連接的服務。
代碼示例
我們的項目中使用的是Spring WebSocket,並且使用了STOMP協議,可以去官網查看文檔。
代碼示例只做維護連接信息的代碼示例,其他部分就不放上來了。
維護連接信息的代碼示例
想要在維護STOMP協議的連接信息,可以查看文檔的這一部分Listening To ApplicationContext Events and Intercepting Messages
這里的連接信息只要是能夠標識出不同的服務就OK。
一下是監聽了訂閱事件的Listener的部分代碼:
package cn.fjhdtp.websocket.interceptor;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//握手前,往attributes中增加所需信息
Object loginBean = ...;//獲取登錄的用戶信息(或其他信息)
attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean);
return super.beforeHandshake(request, response, wsHandler, attributes);
}
}
package cn.fjhdtp.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import java.util.Map;
@Component
public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {
@Autowired
@Qualifier("serversideMessageTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private IMessageHandler messageHandler;
@Override
public void onApplicationEvent(SessionSubscribeEvent event) {
//獲取訂閱的destination
String destination = (String) event.getMessage().getHeaders().get("simpDestination");
//獲取登錄信息
Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
//TODO 向redis中增加連接信息
}
}
package cn.fjhdtp.message.listener;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import java.util.Map;
@Component
public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
// stomp連接斷開,清除連接信息
//從attributes中獲取登錄信息(或其他信息)
Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
//從redis中移除連接信息
}
}
當然,有些情況下可能不會正常的觸發斷開連接的事件(在was下就不會有這個事件),因此還會需要HeartBeat。