1.引言
RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其為消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現,其內部結構如下:
RabbitMQ作為一個消息代理,主要和消息打交道,負責接收並轉發消息。RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。
下面我們就來學習下RabbitMQ。
2. 環境搭建
本文主要基於Windows下使用Vs Code 基於.net core進行demo演示。開始之前我們需要准備好以下環境。
- 安裝Erlang運行環境
下載安裝Erlang。 - 安裝RabbitMQ
下載安裝Windows版本的RabbitMQ。 - 啟動RabbitMQ Server
點擊Windows開始按鈕,輸入RabbitMQ找到RabbitMQ Comman Prompt
,以管理員身份運行。 - 依次執行以下命令啟動RabbitMQ服務
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
- 執行
rabbitmqlctl status
檢查RabbitMQ狀態 - 安裝管理平台插件
執行rabbitmq-plugins enable rabbitmq_management
即可成功安裝,使用默認賬號密碼(guest/guest)登錄http://localhost:15672/即可。
3. Hello RabbitMQ
在開始之前我們先來了解下消息模型:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,隊列再將消息發送到監聽的消費者。
下面我們我們通過demo來了解RabbitMQ的基本用法。
3.1.消息的發送和接收
創建RabbitMQ文件夾,打開命令提示符,分別創建兩個控制台項目Send、Receive。
dotnet new console --name Send //創建發送端控制台應用
cd Send //進入Send目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復包
dotnet new console --name Receive //創建接收端控制台應用
cd Receive //進入Receive目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復包
我們先來添加消息發送端邏輯:
//Send.cs
public static void Main(string[] args)
{
//1.1.實例化連接工廠
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立連接
using (var connection = factory.CreateConnection())
{
//3. 創建信道
using (var channel = connection.CreateModel())
{
//4. 申明隊列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//5. 構建byte消息數據包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
再來完善消息接收端邏輯:
//Receive.cs 省略部分代碼
public static void Main()
{
//1.實例化連接工廠
var factory = new ConnectionFactory() { HostName = "localhost" };
//2. 建立連接
using (var connection = factory.CreateConnection())
{
//3. 創建信道
using (var channel = connection.CreateModel())
{
//4. 申明隊列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//5. 構造消費者實例
var consumer = new EventingBasicConsumer(channel);
//6. 綁定消息接收后的事件委托
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(6000);//模擬耗時
Console.WriteLine (" [x] Done");
};
//7. 啟動消費者
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
先運行消息接收端,再運行消息發送端,結果如下圖。
從上面的代碼中可以看出,發送端和消費端的代碼前4步都是一樣的。主要的區別在於發送端調用channel.BasicPublish
方法發送消息;而接收端需要實例化一個EventingBasicConsumer
實例來進行消息處理邏輯。另外一點需要注意的是:消息接收端和發送端的隊列名稱(queue)必須保持一致,這里指定的隊列名稱為hello。
3.2. 循環調度
使用工作隊列的好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了。我們先啟動兩個接收端,等待消息接收,再啟動一個發送端進行消息發送。
我們增加運行一個消費端后的運行結果:
從圖中可知,我們循環發送4條信息,兩個消息接收端按順序被循環分配。
默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每個消費者將獲得相同數量的消息。這種分發消息的方式叫做循環(round-robin)。
3.3. 消息確認
按照我們上面的demo,一旦RabbitMQ將消息發送到消費端,消息就會立即從內存中移出,無論消費端是否處理完成。在這種情況下,消息就會丟失。
為了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息並且處理完成后,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號后,就可以刪除掉這條已經處理的消息任務。但如果消費端掛掉了(比如,通道關閉、連接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會重新將消息入隊,如果有另外一個消費端在線,就會快速的重新發送到另外一個消費端。
RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ才會重新分發消息。
微調下Receive中的代碼邏輯:
//5. 構造消費者實例
var consumer = new EventingBasicConsumer(channel);
//6. 綁定消息接收后的事件委托
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(6000);//模擬耗時
Console.WriteLine(" [x] Done");
// 7. 發送消息確認信號(手動消息確認)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//8. 啟動消費者
//autoAck:true;自動進行消息確認,當消費端接收到消息后,就自動發送ack信號,不管消息是否正確處理完畢
//autoAck:false;關閉自動消息確認,通過調用BasicAck方法手動進行消息確認
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
主要改動的是將 autoAck:true
修改為autoAck:fasle
,以及在消息處理完畢后手動調用BasicAck
方法進行手動消息確認。
從圖中可知,消息發送端連續發送4條消息,其中消費端1先被分配處理第一條消息,消費端2被循環分配第二條消息,第三條消息由於沒有空閑消費者仍然在隊列中。
在消費端2未處理完第一條消息之前,手動中斷(ctrl+c)。我們可以發現RabbitMQ在下一次分發時,會優先將被中斷的消息分發給消費端1處理。
3.4. 消息持久化
消息確認確保了即使消費端異常,消息也不會丟失能夠被重新分發處理。但是如果RabbitMQ服務端異常,消息依然會丟失。除非我們指定durable:true
,否則當RabbitMQ退出或奔潰時,消息將依然會丟失。通過指定durable:true
,並指定Persistent=true
,來告知RabbitMQ將消息持久化。
//send.cs
//4. 申明隊列(指定durable:true,告知rabbitmq對消息進行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//將消息標記為持久性 - 將IBasicProperties.SetPersistent設置為true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 構建byte消息數據包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
將消息標記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息並且還沒有保存時,仍然有一個很短的時間窗口。RabbitMQ 可能只是將消息保存到了緩存中,並沒有將其寫入到磁盤上。持久化是不能夠一定保證的,但是對於一個簡單任務隊列來說已經足夠。如果需要確保消息隊列的持久化,可以使用publisher confirms.
3.5. 公平分發
RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閑忙情況。這就可能出現某個消費端一直處理耗時任務處於阻塞狀態,某個消費端一直處理一般任務處於空置狀態,而只是它們分配的任務數量一樣。
但我們可以通過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1
來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時,不再分配任務。
//Receive.cs
//4. 申明隊列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
這時你需要注意的是如果所有的消費端都處於忙碌狀態,你的隊列可能會被塞滿。你需要注意這一點,要么添加更多的消費端,要么采取其他策略。
4. Exchange
細心的你也許發現上面的demo,生產者和消費者直接是通過相同隊列名稱進行匹配銜接的。消費者訂閱某個隊列,生產者創建消息發布到隊列中,隊列再將消息轉發到訂閱的消費者。這樣就會有一個局限性,即消費者一次只能發送消息到某一個隊列。
那消費者如何才能發送消息到多個消息隊列呢?
RabbitMQ提供了Exchange,它類似於路由器的功能,它用於對消息進行路由,將消息發送到多個隊列上。Exchange一方面從生產者接收消息,另一方面將消息推送到隊列。但exchange必須知道如何處理接收到的消息,是將其附加到特定隊列還是附加到多個隊列,還是直接忽略。而這些規則由exchange type定義,exchange的原理如下圖所示。
常見的exchange type 有以下幾種:
- direct(明確的路由規則:消費端綁定的隊列名稱必須和消息發布時指定的路由名稱一致)
- topic (模式匹配的路由規則:支持通配符)
- fanout (消息廣播,將消息分發到exchange上綁定的所有隊列上)
下面我們就來一一這介紹它們的用法。
4.1 fanout
本着先易后難的思想,我們先來了解下fanout的廣播路由機制。fanout的路由機制如下圖,即發送到 fanout 類型exchange的消息都會分發到所有綁定該exchange的隊列上去。
生產者示例代碼:
// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發布到指定exchange,fanout類型無需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);
消費者示例代碼:
//申明fanout類型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明隨機隊列名稱
var queuename = channel.QueueDeclare ().QueueName;
//綁定隊列到指定fanout類型exchange,無需指定路由鍵
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");
4.2. direct
direct相對於fanout就屬於完全匹配、單播的模式,路由機制如下圖,即隊列名稱和消息發送時指定的路由完全匹配時,消息才會發送到指定隊列上。
生產者示例代碼:
// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發布到direct類型exchange,必須指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);
消費者示例代碼:
//申明direct類型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//綁定隊列到direct類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");
4.3. topic
topic是direct的升級版,是一種模式匹配的路由機制。它支持使用兩種通配符來進行模式匹配:符號#
和符號*
。其中*
匹配一個單詞, #
則表示匹配0個或多個單詞,單詞之間用.
分割。如下圖所示。
生產者示例代碼:
// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發布到topic類型exchange,必須指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);
消費者示例代碼:
//申明topic類型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明隨機隊列名稱
var queuename = channel.QueueDeclare ().QueueName;
//綁定隊列到topic類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");
5. RPC
RPC——Remote Procedure Call,遠程過程調用。
那RabbitMQ如何進行遠程調用呢?示意圖如下:
第一步,主要是進行遠程調用的客戶端需要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。
遠程調用客戶端:
//申明唯一guid用來標識此次發送的遠程調用請求
var correlationId = Guid.NewGuid().ToString();
//申明需要監聽的回調隊列
var replyQueue = channel.QueueDeclare().QueueName;
var properties = channel.CreateBasicProperties();
properties.ReplyTo = replyQueue;//指定回調隊列
properties.CorrelationId = correlationId;//指定消息唯一標識
string number = args.Length > 0 ? args[0] : "30";
var body = Encoding.UTF8.GetBytes(number);
//發布消息
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
Console.WriteLine($"[*] Request fib({number})");
// //創建消費者用於處理消息回調(遠程調用返回結果)
var callbackConsumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
callbackConsumer.Received += (model, ea) =>
{
//僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。
if (ea.BasicProperties.CorrelationId == correlationId)
{
var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
Console.WriteLine($"[x]: {responseMsg}");
}
};
遠程調用服務端:
//申明隊列接收遠程調用請求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//請求處理邏輯
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
int n = int.Parse(message);
Console.WriteLine($"Receive request of Fib({n})");
int result = Fib(n);
//從請求的參數中獲取請求的唯一標識,在消息回傳時同樣綁定
var properties = ea.BasicProperties;
var replyProerties = channel.CreateBasicProperties();
replyProerties.CorrelationId = properties.CorrelationId;
//將遠程調用結果發送到客戶端監聽的隊列上
channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
//手動發回消息確認
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
6. 總結
基於上面的demo和對幾種不同exchange路由機制的學習,我們發現RabbitMQ主要是涉及到以下幾個核心概念:
- Publisher:生產者,消息的發送方。
- Connection:網絡連接。
- Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。
- Exchange:交換器(路由器),負責消息的路由到相應隊列。
- Binding:隊列與交換器間的關聯綁定。消費者將關注的隊列綁定到指定交換器上,以便Exchange能准確分發消息到指定隊列。
- Queue:隊列,消息的緩沖存儲區。
- Virtual Host:虛擬主機,虛擬主機提供資源的邏輯分組和分離。包含連接,交換,隊列,綁定,用戶權限,策略等。
- Broker:消息隊列的服務器實體。
- Consumer:消費者,消息的接收方。
這次作為入門就講到這里,下次我們來講解下EventBus + RabbitMQ如何實現事件的分發。