一個關於客戶端(消費者)開啟自動應答,重啟后"未處理消息丟失"的小坑。(主要是對RabbitMQ理解不夠)
首先,申明一下: 本文所謂的 "丟失消息" 不是指服務器宕機、重啟等原因導致內存中消息丟失,也就是說不是關於消息持久化的問題。
使用C# 編寫測試。
問題表象: 消費者開啟自動應答,某時,消費者掉線(關閉/崩潰等),屆時重啟消費者,發現消費者未處理完的消息丟失。
條件: 服務器不宕機、不重啟,只有一個消費者、一個生產者。
消息流向: 消息--->生產者--->交換器--->隊列--->消費者
問題的處理: 消費者開啟手動應答,若再出現之前情況,消息不丟失。
先給個代碼。
生產者代碼如下:
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //申明廣播類型交換器 channel.ExchangeDeclare(exchange: "ex1", type: "fanout"); //申明隊列 channel.QueueDeclare(queue: "test1", durable: false, exclusive: false, autoDelete: false, arguments: null); int count = 0; while (true) { count++; var body = Encoding.UTF8.GetBytes(count.ToString());
//向key為 p 的交換器 ex1 上推數據 channel.BasicPublish(exchange: "ex1", routingKey: "p", basicProperties: null, body: body); Console.WriteLine($"send msg {count}"); System.Threading.Thread.Sleep(1000); } } }
消費者代碼如下(開啟自動應答):
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //隊列與交換器綁定 channel.QueueBind(queue: "test1", exchange: "ex1", routingKey: "p"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息 -- {message}"); System.Threading.Thread.Sleep(2000); }; channel.BasicConsume(queue: "test1", autoAck: true, consumer: consumer); Console.ReadLine(); } }
*交換器、隊列必須先申明,兩樣都存在后才能進行綁定。
生產者向隊列推送消息,隊列中展現狀態為ready的數據就是未被消費的消息。
推送90個消息:
隊列中存了90個未應答的消息
打開消費者:
發現現在消息已經全部被自動應答,隊列已清空。
再啟動消費者:
如預料,空空一片。
小結 :開啟消費者(開啟自動應答),發現隊列中狀態為Ready的消息全部被應答,隊列中狀態為Ready的消息清空,不等消費者處理完這些消息,關閉消費者,然后再開啟消費者,消費者不會再收到消息,出現消費者"未處理"完的消息丟失的問題。
同之前先屯90個消息。
然后關閉自動應答。
開啟消費者:
消息狀態一次性全部變成unacked。 因為沒有寫手動處理消息的邏輯,所以unacked狀態的消息不會變少。
然后關閉消費者:
RabbitMQ 未刪除無應答的消息,消息重新轉為Ready狀態,繼續等待連接消費者處理。
再開啟消費者:
沒有出現丟失未處理完消息的情況。
小結:開啟消費者(關閉自動應答),發現隊列中狀態為Ready的消息狀態全部轉變為unacked,隊列中狀態為ready的消息清空,隨消費者應答,隊列中狀態為unacked的消息逐漸減少,關閉消費者,發現隊列中狀態為unacked的消息重新改變回ready狀態,
結論:
關閉自動應答可避免這種消息"丟失的情況"。
另外在開啟自動應答 ack=true 的情況下,需要保證一定有消費者在線,才能保證消息都被接收處理。開啟手動應答必然消耗更多資源,因為 RabbitMQ 需要根據應答標號去刪除隊列中對應的消息。
以上僅個人理解,若有錯誤,歡迎指正~