解決分布式下Websocket共享問題
解決方案有2種,一個是redis,一個是mq。其中redis沒仔細研究,就直接用了mq。項目中用F5代理了2台應用服務器,如果發生方和接受方不在同一個服務器,就會出現有問題。
下面就直接上代碼
bo類
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class Message {
private Integer id;
private String msg;
/**
* 消息狀態,1-未讀,2-已讀
*/
private Integer status;
private Date sendDate;
private Date readDate;
private String from;
private String to;
}
配置類
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private MyHandler myHandler;
@Autowired
private MessageHandshakeInterceptor myHandshakeInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(this.myHandler, "/ws/{uid}")
.setAllowedOrigins("*")
.addInterceptors(this.myHandshakeInterceptor);
}
}
核心類
/**
* @author WGR
* @create 2021/1/20 -- 21:55
*/
@Component
public class MessageHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
String path = serverHttpRequest.getURI().getPath();
String[] ss = path.split("/");
map.put("uid",ss[2]);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
@Component
public class MyHandler extends TextWebSocketHandler {
private static final Map<Integer,WebSocketSession> SESSIONS = new ConcurrentHashMap<>();
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 用於接收消息的方法
* destination: 隊列的名稱或主題的名稱
*/
@JmsListener(destination = "topic01")
public void receiveMessage(javax.jms.Message message){
if(message instanceof javax.jms.TextMessage){
javax.jms.TextMessage textMessage = (javax.jms.TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
com.dalianpai.websocket.bo.Message msg = JSONObject.parseObject(textMessage.getText(), com.dalianpai.websocket.bo.Message.class);
WebSocketSession session = SESSIONS.get(Integer.valueOf(msg.getTo()));
if ((session != null && session.isOpen())) {
try {
session.sendMessage(new TextMessage(msg.getMsg()));
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message)
throws IOException {
System.out.println("獲取到消息 >> " + message.getPayload());
Message msg = JSONObject.parseObject(message.getPayload(),Message.class);
System.out.println(msg);
Object uid = session.getAttributes().get("uid");
System.out.println(uid);
//說明在這台服務器上
String toId = msg.getTo();
WebSocketSession toSession = SESSIONS.get(Integer.valueOf(toId));
if ((toSession != null && toSession.isOpen())) {
//TODO 具體格式需要和前端對接
toSession.sendMessage(new
TextMessage(message.getPayload()));
}else{
jmsMessagingTemplate.convertAndSend("topic01",message.getPayload());
}
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws
Exception {
Integer uid = Integer.valueOf((String)session.getAttributes().get("uid"));
session.sendMessage(new TextMessage(uid+", 你好!歡迎連接到ws服務"));
SESSIONS.put(uid, session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
throws Exception {
Integer uid = Integer.valueOf((String)session.getAttributes().get("uid"));
SESSIONS.remove(uid);
System.out.println("斷開連接!");
}
}
測試