什么是RabbitMQ?
RabbitMQ是基於 AMQP 0-9-1 協議模型實現的一個消息隊列服務,消息流轉符合下圖基本原則

生產者(producer)將消息發送至RabbitMQ中的 交換機(exchange), 交換機會根據不同的路由規則將消息轉發至 隊列(queue),隊列再將消息投遞給隊列的 消費者(consumer)
交換機(Exchange)
交換機可以理解為是一個中轉站,它負責將消息根據一定的條件和規則分發到隊列(queue)上
交換機存在的價值
你可能會想,從最基本的生產消費角度出發,生產者完全可以直接將消息發送到隊列,隊列將消息投遞給消費者即可,為什么還需要通過交換機來進行消息轉發呢?
事實上,它是可以間接的實現這種需求的,通過將消息投遞到一個空的交換機,並設置路由鍵(routing_key)為隊列名稱時,RabbitMQ會將消息直接分發給路由鍵同名的隊列中。
我們考慮這樣一個場景,生產者需要將消息發送到多個隊列時,如果沒有交換機,生產者則需要書寫冗余的代碼來將消息發送至多個隊列,這顯然不夠優雅,而交換機的存在則可以解決這個問題,生產者只需將消息發送至交換機,由交換機來決定將消息分發給一個或多個隊列。
我們考慮另外一個場景,如果生產者直接將消息發送至隊列,當我們需要更換隊列時,我們需要更改發送端的代碼來進行隊列切換,而交換機的存在可以讓我們輕松的實現這個需求,我們只需要將原隊列從交換機上解綁,並將新的隊列綁定到這個交換機上即可。
從多個實際場景可以看出,交換機的存在是有價值的,並且交換機還有多個不同的類型。下面給大家介紹一下RabbitMQ中存在的交換機有哪些!
交換機類型
直連交換機 (Direct exchange)
這是最常用的交換機類型,我們可以將隊列綁定到直連交換機上,並指定一個路由鍵(routing_key),當交換機收到匹配路由鍵的消息時,直接將消息轉發至綁定的0個或多個隊列中,這取決於消息攜帶的路由鍵存在多少個綁定隊列,這意味着直連交換機可以實現廣播的能力,不過單純的廣播我們可以直接使用 扇形交換機,后面會進行介紹,那將更加優雅。
需要注意的是,RabbitMQ存在一個默認的交換機(名稱是一個空字符串),所有隊列被聲明時都會自動綁定到這個交換機,綁定的路由鍵就是隊列名,這意味着生產者可以將消息發送至這個默認交換機,並通過指定路由鍵的方式來自由地將消息直接發送到指定隊列,如下圖所示

扇形交換機 (Fanout exchange)
不同於直連交換機,扇形交換機完全不會理會路由鍵與綁定關系,而是一股腦兒的將消息轉發至綁定自己的所有隊列,即使生產者發送時指定了路由鍵也是一樣。
扇形交換機用於廣播消息,多個消費者分別聲明自己的專屬隊列,並將隊列綁定到該交換機,即可實現一個消息被多個消費者一起消息。

主題交換機 (Topic exchange)
相較於 直連交換機 的路由鍵匹配模式 與 扇形交換機 的無腦廣播模式 來說,主題交換機就相對智能的多,它既可以實現指定路由鍵的轉發,也可以實現優雅的廣播機制,更重要的是,它可以實現模糊匹配路由鍵的訂閱機制

發送到主題交換機的消息路由鍵必須是一個由
.
分隔開的詞語列表,如:"big.dog", "small.cat", "black.small.dog"。詞語的個數可以隨意,但是不要超過255字節。
隊列綁定到交換機時的綁定鍵也必須是這樣的格式。但是綁定鍵支持使用兩個通配符:
*
(星號) 用來表示一個單詞.
#
(井號) 用來表示任意數量(零個或多個)單詞。
我們回頭分析下上面的圖例,已知一個主題交換機(xxx),綁定了三個隊列queue1,queue2,queue3,綁定鍵分別為 big.*,small.*,# 。當我們發送一個消息給交換機,並且指定路由鍵為 big.dog 時,消息將被轉發至 queue1 與 queue3,當指定路由鍵為 small.cat 時,消息將被轉發至 queue2 與 queue3
頭交換機 (Headers exchange)
大部分場景下,以上三種交換機已經足以滿足需求,但是在某些復雜場景下還是不能滿足,比如當我們的消息路由很復雜時,難以使用一個路由鍵和綁定鍵來描述完整的路由規則。
RabbitMQ提供了基於消息頭(Headers)的路由模式,它可以在將隊列綁定至交換機時,指定僅當消息頭與規則任意匹配 或者 完全匹配時才將消息轉發至該隊列
通過對比 直連交換機 我們可以更好的理解 頭交換機,相比來說,直連交換機使用額外的路由鍵進行規則匹配,而 頭交換機 則使用 消息頭 進行規則匹配。
交換機的屬性
我們在聲明交換機時,可以為其指定相應的屬性來滿足相關需求
- Name(交換機名稱)
- Durability (是否持久化交換機,這個參數將決定MQ服務重啟后,交換機是否還存在)
- Auto-delete (當所有與之綁定的消息隊列都完成了對此交換機的使用后,刪掉它)
- Arguments(額外的參數,可被相關插件使用)
隊列(Queue)
隊列中存儲着從交換機轉發過來的消息,並將消息投遞給訂閱此隊列的消費者,當一個消息被投遞成功后,將從隊列中被刪除
隊列的屬性
- Name (隊列名稱)
- Durable(是否持久化隊列,這將決定MQ服務重啟后,隊列是否還存在)
- Exclusive(是否獨占,只被一個連接(connection)使用,而且當連接關閉后隊列即被刪除)
- Auto-delete(當最后一個消費者退訂后即被刪除)
- Arguments(一些消息代理用他來完成類似與TTL的某些額外功能)
需要注意的是,在聲明一個隊列時,如果隊列已經存在,並且屬性完全相同,那么此次聲明不會對原有隊列產生任何影響。如果聲明中的屬性與已存在隊列的屬性有差異,那么一個錯誤代碼為406的通道級異常就會被拋出。
綁定交換機(Binding)
在一個隊列能被投入使用之前,我們需要將它綁定到至少一個交換機上,並指定一個綁定鍵(Binding),交換機在收到消息時,匹配消息攜帶的路由鍵與綁定鍵是否匹配,如果匹配,則會將消息轉發至該隊列,如果不匹配任何綁定鍵,則不會轉發到任何隊列上,並將消息退回給生產者或者直接忽略該消息。
消費者
知道了RabbitMQ的基本工作原理后,我們繼續來看一個應用實例是如何完成隊列的訂閱與消息消費的。
單隊列消費
我們考慮一個最簡單的場景:消費單個隊列,整個流程模型可以參考下圖,解釋一下,應用與MQ服務建立一個連接(Connection),並在連接上建立一個管道(Channel),通過管道訂閱一個隊列(queue)並綁定消費函數(Consumer)

下面是這個場景的代碼實現示例(這里貼了相對簡潔的Python代碼):
#!/opt/homebrew/bin/python3 # -*- coding: UTF-8 -*- import pika import _thread import time MQ_CONFIG = { "host": "10.80.20.209", "port": 5672, "vhost": "/", "user": "dc_base", "passwd": "xxxxxx" } exchange = 'python-test-exchange' queue = 'python-test-queue' routing_key = 'tester' credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) # 與rabbitMQ建立Connection連接 connection = pika.BlockingConnection(parameters) # 在Connnction上創建一個Channel管道 channel = connection.channel() # 聲明交換機 channel.exchange_declare(exchange=exchange, exchange_type='direct') # 聲明隊列 channel.queue_declare(queue=queue) # 隊列綁定交換機 channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key) # 這是消息的消費邏輯 def callback(ch, method, properties, body): print(body.decode()) # 通過channel向rabbitMQ訂閱這個隊列 channel.basic_consume(queue, callback, True) # 開始監聽 channel.start_consuming()
多隊列消費
我們繼續考慮一個稍微復雜一點的場景,當一個應用需要同時消費多個隊列時,我們就需要在連接(Connection)上創建多個管道(Channel),一般情況下,每個管道都有一個專屬的線程進行管理維護,在管道中訂閱隊列(queue)並綁定消費函數(Consumer)

下面是這個場景的代碼實現示例,示例中我們訂閱了兩個不同的隊列,並且其中一個隊列采取了雙消費者來實現並發消費。以下代碼結構整體與圖中的結構雷同。
package com.idanchuang.component.mq.amqp.rabbit; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; /** * @author yjy * Created at 2022/3/23 1:56 下午 */ public class ComplexConsumer { private static final CountDownLatch LATCH = new CountDownLatch(1); public static void main(String[] args) throws Exception { // 通過ConnectionFactory創建Connection ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("dc_base"); connectionFactory.setPassword("xxxxx"); connectionFactory.setHost("10.80.20.209"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 聲明交換機 String exchange = "python-test-exchange"; Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, "direct"); // 綁定鍵 String bindingKey = "tester"; // 通過channel聲明一個隊列,並綁定至交換機 channel = connection.createChannel(); String queue1 = "amqp-test-queue1-" + System.currentTimeMillis(); channel.queueDeclare(queue1, false, true, true, new HashMap<>()); channel.queueBind(queue1, exchange, bindingKey); // 通過這個channel中訂閱聲明的queue,並通過多線程添加兩個消費者,實現並發消費 startConsume(queue1, new MyConsumer(channel), channel); startConsume(queue1, new MyConsumer(channel), channel); // 通過一個新的channel聲明一個新的隊列,並綁定至交換機 channel = connection.createChannel(); String queue2 = "amqp-test-queue2-" + System.currentTimeMillis(); channel.queueDeclare(queue2, false, true, true, new HashMap<>()); channel.queueBind(queue2, exchange, bindingKey); // 通過這個channel中訂閱聲明的queue,並配置一個消費者 startConsume(queue2, new MyConsumer(channel), channel); System.out.println("Listening..."); LATCH.await(); } private static void startConsume(String queue, Consumer consumer, Channel channel) { Thread thread1 = new Thread(() -> { try { channel.basicConsume(queue, true, consumer); } catch (Exception e) { e.printStackTrace(); } }); thread1.start(); } private static class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 處理消息 System.out.println(getConsumerTag() + " > 收到消息: " + new String(body)); } } }
多vhost消費
讓我們考慮這樣一個場景,一個應用(application)想要消費兩個隊列(queue)的消息,但是這兩個隊列卻不在同一個vhost下,這時候一個連接(Connection)就無法解決問題了,我們必須創建多個連接來適配這個需求了,如下圖

多vhost的消費代碼整體上與單vhost是相同的,只是每個vhost需要創建單獨的鏈接(Connection),剩下的管道與隊列等邏輯完全相同,如下:
package com.idanchuang.component.mq.amqp.rabbit; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; /** * @author yjy * Created at 2022/3/23 1:56 下午 */ public class ComplexVhostConsumer { private static final CountDownLatch LATCH = new CountDownLatch(1); public static void main(String[] args) throws Exception { // 通過ConnectionFactory創建Connection ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("dc_base"); connectionFactory.setPassword("xxxx"); connectionFactory.setHost("10.80.20.209"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); runVhost(connection); // 通過ConnectionFactory創建Connection connectionFactory.setVirtualHost("test-amqp"); Connection connection2 = connectionFactory.newConnection(); runVhost(connection2); System.out.println("Listening..."); LATCH.await(); } private static void runVhost(Connection connection) throws Exception { // 聲明交換機 String exchange = "python-test-exchange"; Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, "direct"); // 綁定鍵 String bindingKey = "tester"; // 通過channel聲明一個隊列,並綁定至交換機 channel = connection.createChannel(); String queue1 = "amqp-test-queue1-" + System.currentTimeMillis(); channel.queueDeclare(queue1, false, true, true, new HashMap<>()); channel.queueBind(queue1, exchange, bindingKey); // 通過這個channel中訂閱聲明的queue,並通過多線程添加兩個消費者,實現並發消費 startConsume(queue1, new MyConsumer(channel), channel); startConsume(queue1, new MyConsumer(channel), channel); // 通過一個新的channel聲明一個新的隊列,並綁定至交換機 channel = connection.createChannel(); String queue2 = "amqp-test-queue2-" + System.currentTimeMillis(); channel.queueDeclare(queue2, false, true, true, new HashMap<>()); channel.queueBind(queue2, exchange, bindingKey); // 通過這個channel中訂閱聲明的queue,並配置一個消費者 startConsume(queue2, new MyConsumer(channel), channel); } private static void startConsume(String queue, Consumer consumer, Channel channel) { Thread thread1 = new Thread(() -> { try { channel.basicConsume(queue, true, consumer); } catch (Exception e) { e.printStackTrace(); } }); thread1.start(); } private static class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 處理消息 System.out.println(getConsumerTag() + " > 收到消息: " + new String(body)); } } }
確認消費
RabbitMQ在消息投遞/消費時,存在一個確認機制(Acknowledgement,簡稱ack),這是為了讓MQ服務端明確的知道是否可以將這條消息進行刪除。
RabbitMQ有兩種消息確認機制,手動ACK / 自動ACK
手動ACK
服務端將消息投遞給消費者后,由消費者來決定是否消費完成,如果確認消息已被消費,需要主動向服務端發送一個ACK消息,屆時服務端會將這條消息標記為已消費的狀態。


自動ACK
服務端將消息投遞給消費者后,服務端直接將消息標記為已消費的狀態,至於消息有沒有被消費成功,服務端已不再關心,這意味着在極端情況下,消息不能保證一定被消費成功。

拒絕消費
在開啟手動ACK的情況下,當消費者收到一條消息時,大部分時候可以處理成功並愉快的發送ACK給服務端,但是也可能會有處理失敗的情況,假如消費者認為該消息自己處理不了,可以向服務端發送 NACK,來告知服務端重新投遞該消息
生產者
相對於消費者來說,生產者則簡單得多,但需要注意的是,應該避免在多線程環境下使用同一個管道進行消息的發送,因為這可能導致一些意想不到的問題出現(雖然RabbitMQ的Channel默認實現中在basicPublish內添加了同步鎖保證了消息發送的並發安全,但是我們不能保證Channel中的其他所有功能都是並發安全的),在實際業務中,我們可以維護一個消息發送線程池,為每個線程綁定一個特定的管道,來實現並發發送。
消息發送

我們可以將消息發送至任意一個管道(Channel),並指定目標交換機(Exchange)與 路由鍵(RoutingKey)即可,MQ服務端的交換機將負責消息的下一步去向!客戶端發送的案例代碼如下(未涉及多線程):
#!/opt/homebrew/bin/python3 # -*- coding: UTF-8 -*- import pika MQ_CONFIG = { "host": "10.80.20.209", "port": 5672, "vhost": "test-amqp", "user": "dc_base", "passwd": "xxx" } credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() exchange = 'python-test-exchange' channel.exchange_declare(exchange=exchange, exchange_type='direct') def send(body, exchange, routing_key): channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print("start send") send('hello world 1', exchange, 'tester') send('hello world 2', exchange, 'tester') send('hello world 3', exchange, 'tester') send('hello world 4', exchange, 'tester') send('hello world 5', exchange, 'tester') connection.close()
確認發送成功
對於生產者客戶端來說,當一個消息被成功寫入管道(Channel)后,一般情況下生產者就認為消息已經發送成功了,事實上,消息還未被真正到達交換機,對於可靠性較高的消息而言,生產者可能需要確認消息已經被成功發送到交換機中中,這時候就需要AMQP的Confirm機制出馬了。
我們在管道上綁定了一個回調函數(ConfirmListener),並在發送消息前,通過 confirmSelect 通知服務端回調下一個消息的發送結果,隨后發送真實的消息。
當消息成功到達交換機后,服務端會返回一個 ACK 來觸發客戶端的 handleAck 函數,反之則會觸發 handleNack 函數
package com.idanchuang.component.mq.amqp.rabbit; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; /** * @author yjy * Created at 2022/3/23 1:56 下午 */ public class SimpleSender { public static void main(String[] args) throws Exception { // 通過ConnectionFactory創建Connection ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("dc_base"); connectionFactory.setPassword("xxxx"); connectionFactory.setHost("10.80.20.209"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 聲明交換機 String exchange = "python-test-exchange"; Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, "direct"); // 路由鍵 String routingKey = "tester"; final Semaphore semaphore = new Semaphore(0); final AtomicBoolean success = new AtomicBoolean(); // 給管道設置消息發送成功確認監聽器 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { success.set(true); semaphore.release(); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { success.set(false); semaphore.release(); } }); // 發送消息前告知服務端,回調下一個消息的發送結果 channel.confirmSelect(); // 發送消息 channel.basicPublish(exchange, routingKey, null, "hello world1!".getBytes(StandardCharsets.UTF_8)); // 等待結果 semaphore.acquire(); System.out.println("消息發送 > " + (success.get() ? "成功" : "失敗")); // 再來一次 channel.confirmSelect(); channel.basicPublish(exchange, routingKey, null, "hello world2!".getBytes(StandardCharsets.UTF_8)); semaphore.acquire(); System.out.println("消息發送 > " + (success.get() ? "成功" : "失敗")); // 結束 connection.close(); } }
確認路由成功
當生產者將消息成功發送到交換機后,一般情況下生產者就認為消息已經發送成功了,事實上,消息還需要經過路由到達對應的queue才算真正的成功,對於可靠性較高的消息而言,生產者可能需要確認消息已經被路由到隊列中,這時候就需要AMQP的返回機制出馬了。
我們在管道上綁定了一個回調函數(ReturnListener),並在發送消息時指定 mandatory 參數為 true,
當交換機找不到可以路由的隊列時(比如消息指定的路由鍵未綁定任何隊列),將會觸發 handleReturn 函數,此時業務可以對這個無法被路由轉發的消息進行后續處理,或者告警。
package com.idanchuang.component.mq.amqp.rabbit; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; /** * @author yjy * Created at 2022/3/23 1:56 下午 */ public class ReturnableSender { public static void main(String[] args) throws Exception { // 通過ConnectionFactory創建Connection ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("dc_base"); connectionFactory.setPassword("xxxxx"); connectionFactory.setHost("10.80.20.209"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); // 聲明交換機 String exchange = "python-test-exchange1"; Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, "direct"); // 路由鍵 String routingKey = "tester"; // 給管道設置消息路由失敗處理函數 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息路由失敗,被退回來啦! body: " + new String(body)); } }); // 發送消息 channel.basicPublish(exchange, routingKey, true, null, "hello world1!".getBytes(StandardCharsets.UTF_8)); Thread.sleep(3000L); // 結束 connection.close(); } }
參考
RabbitMQ中文文檔:http://rabbitmq.mr-ping.com/description.html