解決分布式下Websocket共享問題


解決分布式下Websocket共享問題

解決方案有2種,一個是redis,一個是mq。其中redis沒仔細研究,就直接用了mq。項目中用F5代理了2台應用服務器,如果發生方和接受方不在同一個服務器,就會出現有問題。

下面就直接上代碼

bo類

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class Message {

    private Integer id;
    private String msg;
    /**
     * 消息狀態,1-未讀,2-已讀
     */
    private Integer status;
    private Date sendDate;
    private Date readDate;
    private String from;
    private String to;

}

配置類

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private MyHandler myHandler;

    @Autowired
    private MessageHandshakeInterceptor myHandshakeInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(this.myHandler, "/ws/{uid}")
                .setAllowedOrigins("*")
                .addInterceptors(this.myHandshakeInterceptor);
    }
}

核心類

/**
 * @author WGR
 * @create 2021/1/20 -- 21:55
 */
@Component
public class MessageHandshakeInterceptor implements HandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
        String path = serverHttpRequest.getURI().getPath();
        String[] ss =  path.split("/");
        map.put("uid",ss[2]);
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

    }
}


@Component
public class MyHandler extends TextWebSocketHandler {

    private static final Map<Integer,WebSocketSession> SESSIONS = new ConcurrentHashMap<>();

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 用於接收消息的方法
     *  destination: 隊列的名稱或主題的名稱
     */
    @JmsListener(destination  = "topic01")
    public void receiveMessage(javax.jms.Message message){
        if(message instanceof javax.jms.TextMessage){
            javax.jms.TextMessage textMessage = (javax.jms.TextMessage)message;
            try {
                System.out.println("接收消息:"+textMessage.getText());
                com.dalianpai.websocket.bo.Message msg = JSONObject.parseObject(textMessage.getText(), com.dalianpai.websocket.bo.Message.class);
                WebSocketSession session = SESSIONS.get(Integer.valueOf(msg.getTo()));
                if ((session != null && session.isOpen())) {
                    try {
                        session.sendMessage(new TextMessage(msg.getMsg()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

            } catch (JMSException e) {
                e.printStackTrace();
            }

        }
    }

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message)
            throws IOException {
        System.out.println("獲取到消息 >> " + message.getPayload());
        Message msg = JSONObject.parseObject(message.getPayload(),Message.class);
        System.out.println(msg);
        Object uid = session.getAttributes().get("uid");
        System.out.println(uid);
        //說明在這台服務器上
            String toId = msg.getTo();
            WebSocketSession toSession = SESSIONS.get(Integer.valueOf(toId));
            if ((toSession != null && toSession.isOpen())) {
                //TODO 具體格式需要和前端對接
                toSession.sendMessage(new
                        TextMessage(message.getPayload()));
            }else{
                jmsMessagingTemplate.convertAndSend("topic01",message.getPayload());
            }



    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws
            Exception {
            Integer uid =  Integer.valueOf((String)session.getAttributes().get("uid"));
            session.sendMessage(new TextMessage(uid+", 你好!歡迎連接到ws服務"));
            SESSIONS.put(uid, session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
            throws Exception {
            Integer uid =  Integer.valueOf((String)session.getAttributes().get("uid"));
            SESSIONS.remove(uid);
            System.out.println("斷開連接!");

    }
}

測試

image-20210121105619342

image-20210121105639145


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM