RabbitMQ 的七種消息傳遞形式


今天這篇文章比較簡單,來和小伙伴們分享一下 RabbitMQ 的七種消息傳遞形式。一起來看看。

大部分情況下,我們可能都是在 Spring Boot 或者 Spring Cloud 環境下使用 RabbitMQ,因此本文我也主要從這兩個方面來和大家分享 RabbitMQ 的用法。

1. RabbitMQ 架構簡介

一圖勝千言,如下:

 
1587705504342

這張圖中涉及到如下一些概念:

  1. 生產者(Publisher):發布消息到 RabbitMQ 中的交換機(Exchange)上。
  2. 交換機(Exchange):和生產者建立連接並接收生產者的消息。
  3. 消費者(Consumer):監聽 RabbitMQ 中的 Queue 中的消息。
  4. 隊列(Queue):Exchange 將消息分發到指定的 Queue,Queue 和消費者進行交互。
  5. 路由(Routes):交換機轉發消息到隊列的規則。

2. 准備工作

大家知道,RabbitMQ 是 AMQP 陣營里的產品,Spring Boot 為 AMQP 提供了自動化配置依賴 spring-boot-starter-amqp,因此首先創建 Spring Boot 項目並添加該依賴,如下:

 
image

項目創建成功后,在 application.properties 中配置 RabbitMQ 的基本連接信息,如下:

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

接下來進行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生產者提交的消息都會交由 Exchange 進行再分配,Exchange 會根據不同的策略將消息分發到不同的 Queue 中。

RabbitMQ 官網介紹了如下幾種消息分發的形式:

 
image

 
image

 
image

這里給出了七種,其中第七種是消息確認,消息確認這塊松哥之前發過相關的文章,傳送門:

所以這里我主要和大家介紹前六種消息收發方式。

3. 消息收發

3.1 Hello World

咦?這個咋沒有交換機?這個其實是默認的交換機,我們需要提供一個生產者一個隊列以及一個消費者。消息傳播圖如下:

 
image

來看看代碼實現:

先來看看隊列的定義:

@Configuration public class HelloWorldConfig { public static final String HELLO_WORLD_QUEUE_NAME = "hello_world_queue"; @Bean Queue queue1() { return new Queue(HELLO_WORLD_QUEUE_NAME); } } 

再來看看消息消費者的定義:

@Component public class HelloWorldConsumer { @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(String msg) { System.out.println("msg = " + msg); } } 

消息發送:

@SpringBootTest class RabbitmqdemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); } } 

這個時候使用的其實是默認的直連交換機(DirectExchange),DirectExchange 的路由策略是將消息隊列綁定到一個 DirectExchange 上,當一條消息到達 DirectExchange 時會被轉發到與該條消息 routing key 相同的 Queue 上,例如消息隊列名為 “hello-queue”,則 routingkey 為 “hello-queue” 的消息會被該消息隊列接收。

3.2 Work queues

這種情況是這樣的:

一個生產者,一個默認的交換機(DirectExchange),一個隊列,兩個消費者,如下圖:

 
image

一個隊列對應了多個消費者,默認情況下,由隊列對消息進行平均分配,消息會被分到不同的消費者手中。消費者可以配置各自的並發能力,進而提高消息的消費能力,也可以配置手動 ack,來決定是否要消費某一條消息。

先來看並發能力的配置,如下:

@Component public class HelloWorldConsumer { @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(String msg) { System.out.println("receive = " + msg); } @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10") public void receive2(String msg) { System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName()); } } 

可以看到,第二個消費者我配置了 concurrency 為 10,此時,對於第二個消費者,將會同時存在 10 個子線程去消費消息。

啟動項目,在 RabbitMQ 后台也可以看到一共有 11 個消費者。

 
image

此時,如果生產者發送 10 條消息,就會一下都被消費掉。

消息發送方式如下:

@SpringBootTest class RabbitmqdemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, "hello"); } } } 

消息消費日志如下:

 
image

可以看到,消息都被第一個消費者消費了。但是小伙伴們需要注意,事情並不總是這樣(多試幾次就可以看到差異),消息也有可能被第一個消費者消費(只是由於第二個消費者有十個線程一起開動,所以第二個消費者消費的消息占比更大)。

當然消息消費者也可以開啟手動 ack,這樣可以自行決定是否消費 RabbitMQ 發來的消息,配置手動 ack 的方式如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費代碼如下:

@Component public class HelloWorldConsumer { @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME) public void receive(Message message,Channel channel) throws IOException { System.out.println("receive="+message.getPayload()); channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true); } @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency = "10") public void receive2(Message message, Channel channel) throws IOException { System.out.println("receive2 = " + message.getPayload() + "------->" + Thread.currentThread().getName()); channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true); } } 

此時第二個消費者拒絕了所有消息,第一個消費者消費了所有消息。

這就是 Work queues 這種情況。

3.3 Publish/Subscrite

再來看發布訂閱模式,這種情況是這樣:

一個生產者,多個消費者,每一個消費者都有自己的一個隊列,生產者沒有將消息直接發送到隊列,而是發送到了交換機,每個隊列綁定交換機,生產者發送的消息經過交換機,到達隊列,實現一個消息被多個消費者獲取的目的。需要注意的是,如果將消息發送到一個沒有隊列綁定的 Exchange上面,那么該消息將會丟失,這是因為在 RabbitMQ 中 Exchange 不具備存儲消息的能力,只有隊列具備存儲消息的能力,如下圖:

 
image

這種情況下,我們有四種交換機可供選擇,分別是:

  • Direct
  • Fanout
  • Topic
  • Header

我分別來給大家舉一個簡單例子看下。

3.3.1 Direct

DirectExchange 的路由策略是將消息隊列綁定到一個 DirectExchange 上,當一條消息到達 DirectExchange 時會被轉發到與該條消息 routing key 相同的 Queue 上,例如消息隊列名為 “hello-queue”,則 routingkey 為 “hello-queue” 的消息會被該消息隊列接收。DirectExchange 的配置如下:

@Configuration public class RabbitDirectConfig { public final static String DIRECTNAME = "javaboy-direct"; @Bean Queue queue() { return new Queue("hello-queue"); } @Bean DirectExchange directExchange() { return new DirectExchange(DIRECTNAME, true, false); } @Bean Binding binding() { return BindingBuilder.bind(queue()) .to(directExchange()).with("direct"); } } 
  • 首先提供一個消息隊列Queue,然后創建一個DirectExchange對象,三個參數分別是名字,重啟后是否依然有效以及長期未用時是否刪除。
  • 創建一個Binding對象將Exchange和Queue綁定在一起。
  • DirectExchange和Binding兩個Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一個Queue的實例即可。

再來看看消費者:

@Component public class DirectReceiver { @RabbitListener(queues = "hello-queue") public void handler1(String msg) { System.out.println("DirectReceiver:" + msg); } } 

通過 @RabbitListener 注解指定一個方法是一個消息消費方法,方法參數就是所接收到的消息。然后在單元測試類中注入一個 RabbitTemplate 對象來進行消息發送,如下:

@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void directTest() { rabbitTemplate.convertAndSend("hello-queue", "hello direct!"); } } 

最終執行結果如下:

 
image

3.3.2 Fanout

FanoutExchange 的數據交換策略是把所有到達 FanoutExchange 的消息轉發給所有與它綁定的 Queue 上,在這種策略中,routingkey 將不起任何作用,FanoutExchange 配置方式如下:

@Configuration public class RabbitFanoutConfig { public final static String FANOUTNAME = "sang-fanout"; @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUTNAME, true, false); } @Bean Queue queueOne() { return new Queue("queue-one"); } @Bean Queue queueTwo() { return new Queue("queue-two"); } @Bean Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } } 

在這里首先創建 FanoutExchange,參數含義與創建 DirectExchange 參數含義一致,然后創建兩個 Queue,再將這兩個 Queue 都綁定到 FanoutExchange 上。接下來創建兩個消費者,如下:

@Component public class FanoutReceiver { @RabbitListener(queues = "queue-one") public void handler1(String message) { System.out.println("FanoutReceiver:handler1:" + message); } @RabbitListener(queues = "queue-two") public void handler2(String message) { System.out.println("FanoutReceiver:handler2:" + message); } } 

兩個消費者分別消費兩個消息隊列中的消息,然后在單元測試中發送消息,如下:

@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void fanoutTest() { rabbitTemplate .convertAndSend(RabbitFanoutConfig.FANOUTNAME, null, "hello fanout!"); } } 

注意這里發送消息時不需要 routingkey,指定 exchange 即可,routingkey 可以直接傳一個 null

最終執行日志如下:

 
image

3.3.3 Topic

TopicExchange 是比較復雜但是也比較靈活的一種路由策略,在 TopicExchange 中,Queue 通過 routingkey 綁定到 TopicExchange 上,當消息到達 TopicExchange 后,TopicExchange 根據消息的 routingkey 將消息路由到一個或者多個 Queue 上。TopicExchange 配置如下:

@Configuration public class RabbitTopicConfig { public final static String TOPICNAME = "sang-topic"; @Bean TopicExchange topicExchange() { return new TopicExchange(TOPICNAME, true, false); } @Bean Queue xiaomi() { return new Queue("xiaomi"); } @Bean Queue huawei() { return new Queue("huawei"); } @Bean Queue phone() { return new Queue("phone"); } @Bean Binding xiaomiBinding() { return BindingBuilder.bind(xiaomi()).to(topicExchange()) .with("xiaomi.#"); } @Bean Binding huaweiBinding() { return BindingBuilder.bind(huawei()).to(topicExchange()) .with("huawei.#"); } @Bean Binding phoneBinding() { return BindingBuilder.bind(phone()).to(topicExchange()) .with("#.phone.#"); } } 
  • 首先創建 TopicExchange,參數和前面的一致。然后創建三個 Queue,第一個 Queue 用來存儲和 “xiaomi” 有關的消息,第二個 Queue 用來存儲和 “huawei” 有關的消息,第三個 Queue 用來存儲和 “phone” 有關的消息。
  • 將三個 Queue 分別綁定到 TopicExchange 上,第一個 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 開頭的,都將被路由到名稱為 “xiaomi” 的 Queue 上,第二個 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 開頭的,都將被路由到名稱為 “huawei” 的 Queue 上,第三個 Binding 中的 “#.phone.#” 則表示消息的 routingkey 中凡是包含 “phone” 的,都將被路由到名稱為 “phone” 的 Queue 上。

接下來針對三個 Queue 創建三個消費者,如下:

@Component public class TopicReceiver { @RabbitListener(queues = "phone") public void handler1(String message) { System.out.println("PhoneReceiver:" + message); } @RabbitListener(queues = "xiaomi") public void handler2(String message) { System.out.println("XiaoMiReceiver:"+message); } @RabbitListener(queues = "huawei") public void handler3(String message) { System.out.println("HuaWeiReceiver:"+message); } } 

然后在單元測試中進行消息的發送,如下:

@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void topicTest() { rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news","小米新聞.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.news","華為新聞.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.phone","小米手機.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.phone","華為手機.."); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "phone.news","手機新聞.."); } } 

根據 RabbitTopicConfig 中的配置,第一條消息將被路由到名稱為 “xiaomi” 的 Queue 上,第二條消息將被路由到名為 “huawei” 的 Queue 上,第三條消息將被路由到名為 “xiaomi” 以及名為 “phone” 的 Queue 上,第四條消息將被路由到名為 “huawei” 以及名為 “phone” 的 Queue 上,最后一條消息則將被路由到名為 “phone” 的 Queue 上。

3.3.4 Header

HeadersExchange 是一種使用較少的路由策略,HeadersExchange 會根據消息的 Header 將消息路由到不同的 Queue 上,這種策略也和 routingkey無關,配置如下:

@Configuration public class RabbitHeaderConfig { public final static String HEADERNAME = "javaboy-header"; @Bean HeadersExchange headersExchange() { return new HeadersExchange(HEADERNAME, true, false); } @Bean Queue queueName() { return new Queue("name-queue"); } @Bean Queue queueAge() { return new Queue("age-queue"); } @Bean Binding bindingName() { Map<String, Object> map = new HashMap<>(); map.put("name", "sang"); return BindingBuilder.bind(queueName()) .to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge() { return BindingBuilder.bind(queueAge()) .to(headersExchange()).where("age").exists(); } } 

這里的配置大部分和前面介紹的一樣,差別主要體現的 Binding 的配置上,第一個 bindingName 方法中,whereAny 表示消息的 Header 中只要有一個 Header 匹配上 map 中的 key/value,就把該消息路由到名為 “name-queue” 的 Queue 上,這里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 實際上對應了一個名為 x-match 的屬性。bindingAge 中的配置則表示只要消息的 Header 中包含 age,不管 age 的值是多少,都將消息路由到名為 “age-queue” 的 Queue 上。

接下來創建兩個消息消費者:

@Component public class HeaderReceiver { @RabbitListener(queues = "name-queue") public void handler1(byte[] msg) { System.out.println("HeaderReceiver:name:" + new String(msg, 0, msg.length)); } @RabbitListener(queues = "age-queue") public void handler2(byte[] msg) { System.out.println("HeaderReceiver:age:" + new String(msg, 0, msg.length)); } } 

注意這里的參數用 byte 數組接收。然后在單元測試中創建消息的發送方法,這里消息的發送也和 routingkey 無關,如下:

@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void headerTest() { Message nameMsg = MessageBuilder .withBody("hello header! name-queue".getBytes()) .setHeader("name", "sang").build(); Message ageMsg = MessageBuilder .withBody("hello header! age-queue".getBytes()) .setHeader("age", "99").build(); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg); rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg); } } 

這里創建兩條消息,兩條消息具有不同的 header,不同 header 的消息將被發到不同的 Queue 中去。

最終執行效果如下:

 
image

3.4 Routing

這種情況是這樣:

一個生產者,一個交換機,兩個隊列,兩個消費者,生產者在創建 Exchange 后,根據 RoutingKey 去綁定相應的隊列,並且在發送消息時,指定消息的具體 RoutingKey 即可。

如下圖:

 
image

這個就是按照 routing key 去路由消息,我這里就不再舉例子了,大家可以參考 3.3.1 小結。

3.5 Topics

這種情況是這樣:

一個生產者,一個交換機,兩個隊列,兩個消費者,生產者創建 Topic 的 Exchange 並且綁定到隊列中,這次綁定可以通過 *# 關鍵字,對指定 RoutingKey 內容,編寫時注意格式 xxx.xxx.xxx 去編寫。

如下圖:

 
image

這個我也就不舉例啦,前面 3.3.3 小節已經舉過例子了,不再贅述。

3.6 RPC

RPC 這種消息收發形式,松哥前兩天剛剛寫了文章和大家介紹,這里就不多說了,傳送門:

3.7 Publisher Confirms

這種發送確認松哥之前有寫過相關文章,傳送門:

4. 小結

好啦,今天這篇文章主要是和小伙伴們整理了 RabbitMQ 中消息收發的七種形式,感興趣的小伙伴可以試試哦~



https://www.jianshu.com/p/708e4be95a74


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM