rabbitmq+websocket(SpringBoot版)實現分布式消息推送


本來想用websocket做一個消息推送 可是分布式環境下不支持session共享因為服務器不同

所以采用 rabbitMQ+webSocket實現分布式消息推送

生產者將消息 發送給 rabbitMQ 的 virtual-host:/(頂極路由) 再由它路由到交換機 最終由交換機通過路由鍵指定具體的管道

消費者監聽指定的管道獲取消息

最終將獲取的消息 交給 webSocket 被@OnMessage注解標識的方法

每次消費一條消息交給 被@OnMessage注解標識的方法 返回給前台 

實現分布式實時推送

1.配置rabbitMQ

消息生產者

1.1pom.xml

1 <!--引入rabbitmq依賴-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>
 1 server:
 2   port: 5002
 3 
 4 spring:
 5   rabbitmq:
 6     host: localhost
 7     #賬號密碼 默認有的
 8     username: guest
 9     password: guest
10     #rbbitmq虛擬主機路徑
11     virtual-host: /
12     #rabbitmq的端口號 也是默認的
13     port: 5672
 1 @SpringBootApplication
 2 @MapperScan(basePackages = "com.supplychain.dao")
 3 @EnableRabbit/**開啟rabbitmq*/
 4 public class ThumbsupServer5002_App {
 5 
 6     public static void main(String[]args){
 7 
 8         SpringApplication.run(ThumbsupServer5002_App.class,args);
 9 
10     }
11 
12     /**消息的轉換器
13      * 設置成json 並放入到Spring中
14      * */
15     @Bean
16     public MessageConverter messageConverter(){
17 
18         return new Jackson2JsonMessageConverter();
19 
20     }
21 }

測試發送消息

 1 @RunWith(SpringRunner.class)
 2 @SpringBootTest
 3 public class ThumbsupServer5002_AppTest {
 4 
 5 
 6     @Autowired
 7     private RabbitTemplate rabbitTemplate;
 8 
 9     @Test
10     public void contextLoads() {
11 
12         UserTest userTest = new UserTest("hao", "651238730@qq.com");
13 
14         /**1.指定發送的交換機
15          *      發送的消息會先發送給 virtual-host: /(頂級路由) 再由它到交換機
16          *      由交換機通過路由鍵指定給具體的管道
17          *
18          * 2.路由鍵
19          *      有的交換機需要路由鍵 有的不需要(發送給交換機的消息會被發送給所有管道)
20          *
21          * 3.發送的消息
22          *      如果是對象的話必須實現序列化接口因為網絡傳輸只能傳二進制
23          *
24          * */
25         rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest);
26     }
27 
28 }

2.消息消費者

同樣是pom.xml需要引入rabbitMQ依賴

1 <!--引入rabbitmq依賴-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>

同樣需要配置application.yml

 1 spring:
 2   rabbitmq:
 3     host: 127.0.0.1
 4     #賬號密碼 默認有的
 5     username: guest
 6     password: guest
 7     #rbbitmq虛擬主機路徑
 8     virtual-host: /
 9     #rabbitmq的端口號 也是默認的
10     port: 5672
11     listener:
12       simple:
13         acknowledge-mode: manual #手動接受數據
14         #max-concurrency: 10 #最大並發
15         #prefetch: 1 #限流

同樣主啟動類中需要開啟RabbitMQ

 1 @SpringBootApplication
 2 @EnableRabbit
 3 public class MessageServer5003_App {
 4 
 5     public static void main(String[]args){
 6 
 7         SpringApplication.run(MessageServer5003_App.class,args);
 8 
 9     }
10 
11     /**這里也需要設置消息轉換類型
12      * 和發送的消息類型一定要對應
13      * 不然對象接受json啟動主程序類時就會報錯
14      * */
15     @Bean
16     public MessageConverter messageConverter(){
17 
18         return new Jackson2JsonMessageConverter();
19 
20     }
21 
22 }

下面到了整合的環節了

  1 @ServerEndpoint(value = "/websocket")
  2 @Component
  3 public class WebSocketServer {
  4 
  5     //靜態變量 用於記錄當前在線連接數 應該把它設計成線程安全
  6     private static int onlineCount=0;
  7 
  8     /**Concurrent包下的 寫時復制Set 用它作於存儲客戶端對應的MyWebSocket對象*/
  9     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet= new CopyOnWriteArraySet<WebSocketServer>();
 10 
 11 
 12     /**與某個客戶端的鏈接會話,需要通過它來給客戶端發送數據*/
 13 
 14     private Session session;
 15     /**
 16      * 參數1:Message 可以獲得消息的內容字節 還可以獲得消息的其他屬性
 17      * 參數2:可以寫確定接受的參數類型比如User
 18      * 參數3:Channel 通道
 19      *      com.rabbitmq.client.Channel必須是這個包下
 20      *      通過這個參數可以拒絕消息
 21      *      讓rabbitmq再發給別的消費者
 22      *
 23      * 使用@RabbitListener 可以綁定交換機  路由鍵 管道
 24      *
 25      */
 26     @RabbitListener(bindings = @QueueBinding(
 27          value = @Queue(value = "userTest-queue",durable = "true"),
 28          exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"),
 29          key = "userTest-key"
 30         )
 31     )
 32     @RabbitHandler//注解意思:如果有消息過來 需要消費的時候才會調用該方法
 33     /**如果已知傳遞的參數是 UserTest對象可以通過該注解
 34      * 消息頭需要用map接受
 35      * 既然是手動接受消息 就需要設置channel
 36      * */
 37     public void receiveUserMessage(@Payload UserTest userTest, @Headers Map<String,Object> headers, Channel channel) throws IOException {
 38         //sendMessage(message.toString());
 39         System.out.println("UserTest對象"+userTest);
 40         onMessage(userTest.toString());//調用消息方法將數據船體給他
 41 
 42         Long deliveryTag= (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
 43         //手動接受並告訴rabbitmq消息已經接受了  deliverTag記錄接受消息 false不批量接受
 44         channel.basicAck(deliveryTag,false);
 45 
 46         /**
 47          * basicReject()
 48          * 參數1: 消息標簽
 49          * 參數2: true 將消息從新放入隊列  false 接受到並將消息拋棄
 50          *
 51          *
 52         try {
 53             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
 54             System.out.println(message);
 55         } catch (IOException e) {
 56             e.printStackTrace();
 57         }
 58      */
 59 
 60     }
 61 
 62     /**服務器端推送消息*/
 63     public void sendMessage(String message){
 64         try {
 65             System.out.println("session可否顯示出來"+session);
 66             this.session.getBasicRemote().sendText(message);
 67         } catch (IOException e) {
 68             e.printStackTrace();
 69         }
 70     }
 71 
 72     /**
 73      * 連接建立成功調用的方法
 74      * */
 75     @OnOpen
 76     public void onOpen(Session session){
 77         this.session=session;
 78         webSocketSet.add(this);
 79         System.out.println("有新的連接加入!當前在線人數為"+getOnlineCount());
 80         System.out.println(session);
 81     }
 82 
 83     /**
 84      * 連接關閉調用的方法
 85      * */
 86     @OnClose
 87     public void onClose(){
 88         /**從安全Set中 移除當前連接對象*/
 89         webSocketSet.remove(this);
 90         subOnlineCount();
 91         System.out.println("有一連接關閉!當前在線人數為"+getOnlineCount());
 92     }
 93 
 94 
 95 
 96     @OnMessage
 97     public void onMessage(String message){
 98 
 99         System.out.println("來自客戶端的消息:"+message);
100 
101         for (WebSocketServer webSocketServer:webSocketSet){
102             webSocketServer.sendMessage(message);
103         }
104 
105     }
106 
107 
108     public static int getOnlineCount() {
109         return onlineCount;
110     }
111 
112     public static synchronized void addOnlineCount() {
113         WebSocketServer.onlineCount++;
114     }
115 
116     public static synchronized void subOnlineCount() {
117         WebSocketServer.onlineCount--;
118     }
119 
120 
121 
122 }

websocket前端

websocket是html5提出的協議屬於雙工通信 前端發送一次請求告訴服務器需要將http協議升級成tcp長連接

后面服務端直接給前端推送消息就可以了 從以前的一次請求一次響應 服務端被動式 變成 一次請求服務端可以無限響應

 1 <script>
 2         var socket;
 3         console.log(typeof socket)
 4         if (typeof(WebSocket)=="undefined"){
 5             alert("您的瀏覽器不支持WebSocket");
 6         }else{
 7             alert("您的瀏覽器支持WebSocket");
 8 
 9            socket=new WebSocket("ws://localhost:5003/websocket");
10 
11            socket.onopen=function () {
12                console.log("Socket 已打開");
13            };
14 
15             //獲得消息事件
16             socket.onmessage = function(msg) {
17                 console.log(msg.data);
18                 //發現消息進入    調后台獲取
19                 //getCallingList();
20             };
21 
22             //關閉事件
23             socket.onclose = function() {
24                 console.log("Socket已關閉");
25             };
26             //發生了錯誤事件
27             socket.onerror = function() {
28                 alert("Socket發生了錯誤");
29             };
30             /**
31             $(window).unload(function(){
32                 socket.close();
33             });
34              */
35         }
36     </script>

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM