SpringBoot+WebSocket+RabbitMQ整合實現消息實時


SpringBoot+WebSocket+RabbitMQ整合實現消息實時

========>可能影響項目運行,訪問不到靜態資源文件,整合WebSocket也可參考:
https://www.cnblogs.com/yu-si/articles/15075737.html

WebSocket+RabbitMQ基於Spring boot實現

1.配置

maven導包
SpringBoot 與webSocket的關聯

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

SpringBoot與Rabbit MQ的關聯

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置application.yml文件

spring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest
    #虛擬主機
    virtual-host: /
    port: 5672

編寫RabbitMQ服務端

配置服務端的config

@Configuration
public class SendConfig {
    @Bean("queue")
    public Queue queueMsg(){
        return new Queue("queue");
    }
    @Bean("exchange")
    public TopicExchange exchange(){
        return new TopicExchange("exchange");
    }
    @Bean
    Binding bindingExchangeAndQueuemsg(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("a");
    }
}

編寫服務端的Controller

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {
    @Autowired
    private AmqpTemplate amqpTemplate;
    @RequestMapping("/send")
    public void SennerMsg(String msg){
        amqpTemplate.convertAndSend("exchange","a",msg);
    }
}

到這啟動MQ后運行程序便可以看到有一個新的郵件

箭頭所指的為准備的狀態1個

或者用命令查看

這里寫圖片描述

編寫MQ的客戶端(與WebSocket結合)
配置Websocket—做請求攔截

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.Map;

public class Hank implements HandshakeInterceptor{
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
                      String jspCode = ((ServletServerHttpRequest) request).getServletRequest().getParameter("jspCode");
                if (jspCode != null) {
                    map.put("jspCoe", jspCode);
                } else {
                    return false;
                }

                return true;
            }
    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

    }
}

配置Websocket的Handler

@Component
public class MyWebSocketHandler implements WebSocketHandler {

    private static final Map<String ,WebSocketSession> userMap=new HashMap<>();
    @Override
    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
       String jspCode = (String )webSocketSession.getAttributes().get("jspCode");

        //if(userMap.get(jspCode)==null){
            System.out.println(jspCode);
            userMap.put(jspCode,webSocketSession);
        //}
    }

    @Override
    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
    }

    @Override
    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
    }

    @Override
    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    public void sendMsgToJsp(final TextMessage message, String type) throws  Exception{
        Iterator<Map.Entry<String ,WebSocketSession>> it=userMap.entrySet().iterator();
        while (it.hasNext()){
            final Map.Entry<String ,WebSocketSession> entry=it.next();
            System.out.println(entry.getValue().isOpen());
            System.out.println(entry.getKey().contains(type));
            if(entry.getValue().isOpen()&&entry.getKey().contains(type)){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            if(entry.getValue().isOpen()){
                                entry.getValue().sendMessage(message);
                            }
                        }catch (IOException e){
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        }
    }
}

配置WebSocket的config

@Component
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
    @Resource
    private MyWebSocketHandler handler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(handler, "/wsMy").addInterceptors(new HandshakeInterceptor() {
            @Override
            public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
                String jspCode = ((ServletServerHttpRequest) request).getServletRequest().getParameter("jspCode");
                if (jspCode != null) {
                    map.put("jspCode", jspCode);
                } else {
                return false;
            }

                return true;
            }

            @Override
            public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

            }
        });
    }
}

配置客戶端的cortorller(二者關聯的部分)

  @Autowired
    public MyWebSocketHandler handler;

   @RabbitListener(queues = "queue")
    public void Recive(String msg) throws Exception{
        handler.sendMsgToJsp(new TextMessage(msg), "A");
    }

到此便可以啟動測試了

這里寫圖片描述

另附上前端頁面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
</body>
<script type="text/javascript">
    //1.創建websocket客戶端
    var wsServerUrl = 'ws://127.0.0.1/';
    var limitConnect = 3;  // 斷線重連次數
    var timeConnect = 0;
    webSocketInit(wsServerUrl);
    //socket初始化鏈接
    function webSocketInit(wsServerUrl) {
        // 首先判斷是否 支持 WebSocket
        if ('WebSocket' in window) {
            websocket = new WebSocket("ws://localhost:80/wsMy?jspCode=AA");
        } else if ('MozWebSocket' in window) {
            websocket = new MozWebSocket("ws://localhost:80/wsMy?jspCode=AA");
        } else {
            websocket = new SockJS("ws://localhost:80/wsMy?jspCode=AA");
        }
        // 打開連接時
        websocket.onopen = function (event) {
            console.log("已連接TCP服務器");
        };
        // 收到消息時
        websocket.onmessage = function (event) {
            console.log("收到一條消息" + event.data);
            alert(event.data);
        };
        websocket.onerror = function (event) {
            console.log("服務器報錯:");
            reconnect()
        };
        websocket.onclose = function (event) {
            console.log('服務器已經斷開');
            reconnect()
        };
        // 重連
        function reconnect(wsServerUrl) {
            // lockReconnect加鎖,防止onclose、onerror兩次重連
            if (limitConnect > 0) {
                if (localStorage.getItem('lockReconnect') != true) {
                    localStorage.setItem("lockReconnect", 1);
                    limitConnect--;
                    timeConnect++;
                    console.log("第" + timeConnect + "次重連");
                    // 進行重連
                    setTimeout(function () {
                        webSocketInit(wsServerUrl);
                        localStorage.removeItem("lockReconnect");
                    }, 2000);
                }
            } else {
                console.log("TCP連接已超時");
            }
        }
        // 心跳 * 回應
        setInterval(function () {
            websocket.send('');
        }, 1000 * 100);
    }

</script>

</html>

原文鏈接:https://blog.csdn.net/lilin0800/article/details/80884950


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM