RabbitMQ-消費者"未處理完的消息"丟失


一個關於客戶端(消費者)開啟自動應答,重啟后"未處理消息丟失"的小坑。(主要是對RabbitMQ理解不夠)

首先,申明一下: 本文所謂的 "丟失消息" 不是指服務器宕機、重啟等原因導致內存中消息丟失,也就是說不是關於消息持久化的問題

 

  使用C# 編寫測試。

  問題表象:  消費者開啟自動應答,某時,消費者掉線(關閉/崩潰等),屆時重啟消費者,發現消費者未處理完的消息丟失

  條件: 服務器不宕機、不重啟,只有一個消費者、一個生產者。

  消息流向:  消息--->生產者--->交換器--->隊列--->消費者

  問題的處理: 消費者開啟手動應答,若再出現之前情況,消息不丟失。

  先給個代碼。

  生產者代碼如下:

static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {           //申明廣播類型交換器 channel.ExchangeDeclare(exchange: "ex1", type: "fanout");           //申明隊列 channel.QueueDeclare(queue: "test1", durable: false, exclusive: false, autoDelete: false, arguments: null); int count = 0; while (true) { count++; var body = Encoding.UTF8.GetBytes(count.ToString());
            //向key為 p 的交換器 ex1 上推數據 channel.BasicPublish(exchange: "ex1", routingKey: "p", basicProperties: null, body: body); Console.WriteLine($"send msg {count}"); System.Threading.Thread.Sleep(1000); } } }

 

 消費者代碼如下(開啟自動應答):

static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) {           //隊列與交換器綁定 channel.QueueBind(queue: "test1", exchange: "ex1", routingKey: "p"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息 -- {message}"); System.Threading.Thread.Sleep(2000); }; channel.BasicConsume(queue: "test1", autoAck: true, consumer: consumer); Console.ReadLine(); } }

*交換器、隊列必須先申明,兩樣都存在后才能進行綁定。 

 

 

生產者向隊列推送消息,隊列中展現狀態為ready的數據就是未被消費的消息。

推送90個消息:

 

隊列中存了90個未應答的消息

 

打開消費者:

發現現在消息已經全部被自動應答,隊列已清空。

 

再啟動消費者:

如預料,空空一片。

 

        小結 :開啟消費者(開啟自動應答),發現隊列中狀態為Ready的消息全部被應答,隊列中狀態為Ready的消息清空,不等消費者處理完這些消息,關閉消費者,然后再開啟消費者,消費者不會再收到消息,出現消費者"未處理"完的消息丟失的問題。

 

同之前先屯90個消息。

然后關閉自動應答。

 

開啟消費者:

 

消息狀態一次性全部變成unacked。 因為沒有寫手動處理消息的邏輯,所以unacked狀態的消息不會變少。

 

然后關閉消費者:

RabbitMQ 未刪除無應答的消息,消息重新轉為Ready狀態,繼續等待連接消費者處理。

 

再開啟消費者:

沒有出現丟失未處理完消息的情況。

 

     小結:開啟消費者(關閉自動應答),發現隊列中狀態為Ready的消息狀態全部轉變為unacked,隊列中狀態為ready的消息清空,隨消費者應答,隊列中狀態為unacked的消息逐漸減少,關閉消費者,發現隊列中狀態為unacked的消息重新改變回ready狀態,

       

  結論:

  關閉自動應答可避免這種消息"丟失的情況"。

  另外在開啟自動應答 ack=true 的情況下,需要保證一定有消費者在線,才能保證消息都被接收處理。開啟手動應答必然消耗更多資源,因為 RabbitMQ 需要根據應答標號去刪除隊列中對應的消息。

 

以上僅個人理解,若有錯誤,歡迎指正~

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM