Ø 簡介
在之前的 C# 消息隊列之 RabbitMQ 基礎入門 中介紹了 RabbitMQ 的基本用法,其實要更全面的掌握 RabbitMQ 這個消息隊列服務,我們還需要掌握以下內容:
1. 輪詢分發
2. 消息響應
3. 公平分發
4. 消息持久化
1) 輪詢分發
默認情況下,RabbitMQ 會按照消息的順序依次分發給每個消費者,也就是每個消費者接收到的消息基本是平均的,這種分發方式稱之為輪詢分發。話不多說看示例:
1) 生產者代碼(其他代碼省略)
//隨機一個“生產者”名稱
string pname = $"[P{(new Random()).Next(1, 1000)}]";
Console.WriteLine($"生產者{pname}已啟動:");
for (int i = 0; i < 6; i++)
{
string message;
if (i == 1) //第二條消息,需要耗時10秒
message = $"{pname}, task{i + 1}, time of 10 seconds";
else
message = $"{pname}, task{i + 1}, time of 1 seconds";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "myQueue1", properties, body);
Console.WriteLine($"生產者{message}\t{DateTime.Now.ToString("HH:mm:ss fff")}");
}
2) 消費者代碼(其他代碼省略)
//隨機一個“消費者”名稱
string cname = $"[C{(new Random()).Next(1, 1000)}]";
Console.WriteLine($"消費者{cname}已開啟");
consumer.Received += (sender, e) =>
{
byte[] body = e.Body; //消息字節數組
string message = Encoding.UTF8.GetString(body); //消息內容
Console.WriteLine($"消費者{cname}接收到消息:{message}\t{DateTime.Now.ToString("HH:mm:ss fff")},開始處理...");
//模擬處理耗時操作
string second = Regex.Replace(message, ".+time of ", "");
second = Regex.Replace(second, " seconds", "");
System.Threading.Thread.Sleep(1000 * int.Parse(second));
};
3) 運行代碼
首先,開啟兩個消費者,再打開一個生產者發送6條消息,運行結果如下:
從以上結果中可以得出以下結論:
1. 一共6條消息,2個消費者接收的消息數量是一致的(各3條);
2. 盡管 Task2 消息處理時間較長,也會等待該消息處理完成之后,再處理被依次分發的消息,所以導致了 Task4 的處理時間在 Task5 之后;
3. 同一時間段一個消費者只會處理一條消息,只有當該消息處理完成之后,才會處理下一條消息(或者說接收下一條消息),並不會同時處理多條消息。
2) 消息響應
問題:如果一個消費者在接收到消息后,處理到一半出現了異常,沒有正常完成消息的處理,比如:給用戶發送短信。這時 RabbitMQ 認為消息已經被接收,就將該消息刪除了。這樣一來是不是導致數據丟失了呢?因為沒有正常的完成業務流程吶!
好,這時消息響應就可以大展拳腳了,它就可以解決以上這種丟失數據的問題。就是當一條消息發送給消費者后,該消息必須得到消費者的“確認”后,RabbitMQ 才會將該消息刪除。
實現該功能比較簡單,首先將“消費者”中的代碼:
channel.BasicConsume("myQueue1", true, consumer);
改為
channel.BasicConsume("myQueue1", false, consumer); //表示開啟消息響應的功能
然后,在消息處理完成后回傳該消息標記,添加以下代碼:
channel.BasicAck(e.DeliveryTag, false); //只有當響應此消息標記后,該消息才會在消息隊列中刪除
3) 公平分發
在之前的“輪詢分發”模式下,似乎發現不是很合理吧?因為如果一個消費者當前正在處理比較耗時的“消息”,再次將消息發送給它,該消息就進入了等待被處理的狀態。此時,另一個消費者正處於閑置狀態。這樣就照成了分發不合理(好比工作中:小張開發一個功能需要一周,而小王現在沒事兒干,領導還會把任務分配給小張嗎?肯定不會吧!)。
理論就是這樣的,那在 RabbitMQ 中如何去實現這樣的分發機制呢,其實要借助於之前講的“消息響應”機制。只有當消費者回傳消息標記后,才會將下一個消息發送給它,否則將消息分發給其它空閑的消費者。講了這么多,其實只需一行代碼就可以完成,設置消息通道的基礎 Qos 參數:
channel.BasicQos(0, prefetchCount: 1, false); //prefetchCount:1 表示告訴 RabbitMQ, 在未接收到消費者確認消息之前,不在分發消息
從圖中可以看到,Task3、4、5、6沒有輪詢分發了,而是一直發給了比較空閑的消費者(P549),這樣就達到了合理分發的目的。
4) 消息持久化
在之前的“消息響應”機制中其實隱藏了另一個功能,就是當消息發送給消費者后,未回傳該消息標記情況下,該消息就不會被刪除。那么這些消息就一直會保存在 RabbitMQ Server 中嗎,當然不是。當 RabbitMQ Server 奔潰或者重啟后,這些消息任然會丟失。要將這些消息持久化保存在磁盤中,只需修改兩個地方:
1) 設置 QueueDeclare() 方法的 durable 參數為 true
channel.QueueDeclare("myQueue1", durable: true, false, false, null);
注意:一個消息隊列不允許有不同的參數進行設置,所以可以創建另一個消息隊列,或者先刪除當前消息隊列在進行設置:
channel.QueueDelete("myQueue1"); //注意:當前消息隊列被刪除后,正在接收該隊列的消費者將再也不會接收到消息,就算再次創建同名的隊列
2) 設置 BasicProperties 類的 Persistent 屬性為 true
var properties = channel.CreateBasicProperties() as BasicProperties;
properties.Persistent = true; //默認為false
l 需要注意的是,消息持久化並不代表一定不會丟失任何消息,在消息持久化的過程中也會存在一小段的時間間隔,在此之間發生 RabbitMQ 服務奔潰、服務器斷電等情況,任然可能丟失少量的消息。但是在消息存儲實時性沒那么高的情況下,這已經足夠了。如果對消息持久化有更高要求,可以使用:publisher confirms
Ø 更多參考