說明該篇文章內容包括有rabbitMq相關的一些簡單理論介紹,provider消息推送實例,consumer消息消費實例,Direct、Topic、Fanout的使用。
在安裝完rabbitMq后,輸入http://ip:15672/ ,是可以看到一個簡單后台管理界面的。
在這個界面里面我們可以做些什么?
可以手動創建虛擬host,創建用戶,分配權限,創建交換機,創建隊列等等,還有查看隊列消息,消費效率,推送效率等等。
以上這些管理界面的操作在這篇暫時不做擴展描述,我想着重介紹后面實例里會使用到的。
首先先介紹一個簡單的一個消息推送到接收的流程,提供一個簡單的圖:
黃色的圈圈就是我們的消息推送服務,將消息推送到 中間方框里面也就是 rabbitMq的服務器,然后經過服務器里面的交換機、隊列等各種關系(后面會詳細講)將數據處理入列后,最終右邊的藍色圈圈消費者獲取對應監聽的消息。
常用的交換機有以下三種,因為消費者是從隊列獲取信息的,隊列是綁定交換機的(一般),所以對應的消息推送/接收模式也會有以下幾種:
Direct Exchange
直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶着路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。
Fanout Exchange
扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
Topic Exchange
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:
* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 *.TT.* 隊列Q2綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;
主題交換機是非常強大的,為啥這么膨脹?
當一個隊列的綁定鍵為 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。
另外還有 Header Exchange 頭交換機 ,Default Exchange 默認交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述。
本次實例教程需要創建2個springboot項目,一個生產者,一個消費者。
首先創建 生產者,
pom.xml里用到的jar依賴:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml:
#本機端口 server: port: 8082 spring: #給項目來個名字 application: name: rabbitmq-provider #配置rabbitMq 服務器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
然后先使用下direct exchange(直連型交換機),創建DirectRabbitConfig.java
package com.wx.test.rtmdemo.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Configuration public class DirectRabbitConfig { //隊列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { //隊列的三個參數講解 // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設置一下隊列的持久化就好,其余兩個就是默認false return new Queue("TestDirectQueue",true); } //Direct交換機 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("TestDirectExchange",true,false); } //綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
然后寫個簡單的接口進行消息推送:
@GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "ok"; }
將項目運行起來,調用這個接口:
此時,在rabbitMq管理頁面可以看到,有一條推送的消息。
然后有一個消息等待被消費。
此時,說明我們的生產者的消息已經生產成功,只需要等待消費者消費即可!
接下來創建消費者服務:
首先是pom.xml文件的依賴:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
然后是application.yml:
server: port: 8083 spring: #給項目來個名字 application: name: rabbitmq-consumer #配置rabbitMq 服務器 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
然后是創建消息接收監聽類,DirectReceiver.java:
package com.wx.test.consumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString()); } }
然后啟動項目,可以看到生產者的消息,在這邊被成功消費了。
接下來是Topic Exchange交換機,配置文件以及依賴包不變,新建一個TopicRabbitConfig.java:
package com.wx.test.rtmdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Configuration public class TopicRabbitConfig { //綁定鍵 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.# // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
然后在創建兩個接口,用於測試:
@GetMapping("/sendTopicMessage1") public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok"; } @GetMapping("/sendTopicMessage2") public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman123", womanMap); return "ok"; }
然后在創建一個消費者TopicManReceiver.java::
package com.wx.test.consumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Component @RabbitListener(queues = "topic.man") public class TopicManReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicManReceiver消費者收到消息 : " + testMessage.toString()); } }
然后再創建一個消費者TopicTotalReceiver.java::
package com.wx.test.consumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Component @RabbitListener(queues = "topic.woman") public class TopicTotalReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicTotalReceiver消費者收到消息 : " + testMessage.toString()); } }
啟動項目,調用sendTopicMessage1這個接口:
可以看到,兩個消費者都消費了這條消息。
注意:
TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.man
再調用sendTopicMessage2這個接口:
這時,只有TopicTotalReceiver成功消費了。
然后是Fanout Exchang交換機:
首先,創建生產者:
package com.wx.test.rtmdemo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Configuration public class FanoutRabbitConfig { /** * 創建三個隊列 :fanout.A fanout.B fanout.C * 將三個隊列都綁定在交換機 fanoutExchange 上 * 因為是扇型交換機, 路由鍵無需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
同樣寫一個接口,用於生產一條消息:
@GetMapping("/sendFanoutMessage") public String sendFanoutMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: testFanoutMessage "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "ok"; }
接下來在消費者里面創建一個類,用於接收消息:
FanoutReceiverA.java:
package com.wx.test.consumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverA消費者收到消息 : " +testMessage.toString()); } }
FanoutReceiverB.java:
package com.wx.test.consumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverB消費者收到消息 : " +testMessage.toString()); } }
FanoutReceiverC.java:
package com.wx.test.consumer.config; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author : laz * @CreateTime : 2021/11/14 * @Description : **/ @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverC消費者收到消息 : " +testMessage.toString()); } }
最后啟動項目,調用我們生產者的接口,查看消息消費情況:
可以看到,這三個類都消費到消息。
好了,這篇Springboot整合rabbitMq教程就暫且到此。本文若有錯誤,還請各路大佬指正指正!