由於生產者和消費者不直接通信,生產者只負責把消息發送到隊列,消費者只負責從隊列獲取消息(不管是push還是pull).
消息被"消費"后,是需要從隊列中刪除的.那怎么確認消息被"成功消費"了呢?
是消費者從隊列獲取到消息后,broker 就從隊列中刪除該消息?
那如果消費者收到消息后,還沒來得及"消費"它,或者說還沒來得及進行業務邏輯處理時,消費者所在的信道或者連接因某種原因斷開了,
那這條消息豈不是就被無情的拋棄了...
我們更期望的是,消費者從隊列獲取到消息后,broker 暫時不刪除該條消息,
等到消費者"成功消費"掉該消息后,再刪除它.
所以需要一個機制來確認生產者發送的消息被消費者"成功消費".
RabbitMQ 提供了一種叫做"消費者確認"的機制.
消費者確認
消費者確認分兩種:自動確認和手動確認.
在自動確認模式中,消息在發送到消費者后即被認為"成功消費".這種模式可以降低吞吐量(只要消費者可以跟上),以降低交付和消費者處理的安全性.這種模式通常被稱為“即發即忘”.與手動確認模型不同,如果消費者的TCP連接或通道在真正的"成功消費"之前關閉,則服務器發送的消息將丟失.因此,自動消息確認應被視為不安全,並不適用於所有工作負載.
使用自動確認模式時需要考慮的另一件事是消費者過載.手動確認模式通常與有界信道預取(BasicQos方法)一起使用,該預取限制了信道上未完成(“進行中”)的消息的數量.但是,自動確認沒有這種限制.因此,消費者可能會被消息的發送速度所淹沒,可能會導致消息積壓並耗盡堆或使操作系統終止其進程.某些客戶端庫將應用TCP反壓(停止從套接字讀取,直到未處理的交付積壓超過某個限制).因此,僅建議能夠以穩定的速度有效處理消息的消費者使用自動確認模式.
1.自動確認 autoAck : true
下面是消費者的部分代碼,我們故意每次只推送一條消息,並且讓每條消息的處理都超過10秒.
channel.BasicQos(0, 1, false);//將Qos預取值設置為1,這表示設置broker每次只推送隊列里面的一條消息到消費者,只有在確認這條消息"成功消費"后,才會繼續推送 consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Thread.Sleep(10000); Console.WriteLine("consumer1 receive : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: true, consumer: consumer);
下面是生產者的部分代碼
for (byte i = 0; i < 5; i++) { string msg = "hello world " + i; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); }
運行結果:
從管理后台可以看到,消費者還沒打印"receive"那句話,該隊列中就已經沒有任何消息了.
2.手動確認 autoAck : false
手動確認又分兩種:肯定確認和否定確認.
1)肯定確認 BasicAck
消費者部分代碼:
channel.BasicQos(0, 1, false);//設置broker每次只推送隊列里面的一條消息到消費者,只有在確認這條消息"成功消費"后,才會繼續推送 consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Console.WriteLine("consumer1 receive : " + str); Thread.Sleep(30000); //deliveryTag 傳遞標簽,ulong 類型.它的范圍隸屬於每個信道.因此必須在收到消息的相同信道上確認.不同的信道將導致“未知的傳遞標簽”協議異常並關閉通道. //multiple 確認一條消息還是多條.false 表示只確認 e.DelivertTag 這條消息,true表示確認 小於等於 e.DelivertTag 的所有消息 channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); Console.WriteLine("consumer1 Ack : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
生產者代碼不變.
當消費者收到一條消息,但是還沒有肯定確認時,從管理后台可以清晰的看到,隊列中一共有5條消息,其中4條尚未推送,1條已經推送但尚未確認.
當消費者確認后(立馬又接收了一條),這時候,隊列中一共只有4條了,"成功消費"的那條已經被broker從隊列中刪掉了.剩余3條尚未推送,1條已推送但尚未確認.
2)否定確認 BasicNack , BasicReject
否定確認的場景不多,但有時候某個消費者因為某種原因無法立即處理某條消息時,就需要否定確認了.
否定確認時,需要指定是丟棄掉這條消息,還是讓這條消息重新排隊,過一會再來,又或者是讓這條消息重新排隊,並盡快讓另一個消費者接收並處理它.
i.丟棄 requeue: false
消費者部分代碼:
channel.BasicQos(0, 1, false); consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Thread.Sleep(10000); channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false); Console.WriteLine("consumer1 Nack : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
ii.重新排隊 requeue: true
消費者部分代碼:
channel.BasicQos(0, 1, false);//設置broker每次只從推送隊列里面的一條消息到消費者,只有在確認這條消息"成功消費"后,才會繼續推送 consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Thread.Sleep(5000); channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true); Console.WriteLine("consumer1 Nack : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
運行結果:
可以看到,消費者收到的一直是"hello world 0"這條消息,而管理后台一直顯示 4,1,5.這是為什么呢?
首先,我們設置的是每次只推送一條消息給消費者,否定確認中我們選擇的是重新排隊,所以"hello world 0"這條消息被否定確認后,被broker安排去重新排隊了.當消息被重新排隊時,如果可能的話,它將被放置在其隊列中的原始位置.也就是說"hello world 0"這條消息又被放到了隊列頭,因為它的原始位置就是隊列頭.所以結果就變成了消費之一直在消費"hello world 0",並且一直在否定確認.
感覺這種方式的代價是不是有點大...消息重新排隊,還要回到之前的位置,還要重新發送一次....感覺代價有點小貴啊...而且其他消息貌似永遠只有ready...
但,如果多個消費者共享隊列時,該消息將被重新排隊到更靠近隊列頭的位置,並且會被聰明的broker從隊列中推送到其他隊列.
測試:
我們重新創建兩個消費者:consumer1 否定確認,3秒一次;consumer2 肯定確認,1秒一次.兩個消費共享一個隊列(公平分發)
channel.BasicQos(0, 1, false); consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Thread.Sleep(3000); channel.BasicNack(e.DeliveryTag, false, true); Console.WriteLine($"{DateTime.Now} consumer1 Nack : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
channel.BasicQos(0, 1, false); consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Thread.Sleep(1000); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); Console.WriteLine($"{DateTime.Now} consumer2 Ack : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
運行結果:
一切盡在圖中.
BasicReject 方法和 BasicNack 方法基本一樣,唯一的區別是沒有 multiple 這個入參.
消費者確認模式,預取和吞吐量
確認模式和QoS預取值對消費者吞吐量具有顯着影響。通常,增加預取將提高向消費者傳遞消息的速率。自動確認模式可以產生最佳的交付率。但是,在這兩種情況下,已傳送但尚未處理的消息的數量也將增加,從而增加了消費者的RAM消耗。
應謹慎使用具有無限預取功能的自動確認模式或手動確認模式。在沒有確認的情況下消耗大量消息的消費者將導致他們所連接的節點上的內存消耗增長。找到合適的預取值需要不斷試驗,並且會因工作負載而異。100到300范圍內的值通常可提供最佳吞吐量,並且不會面臨壓倒性消費者的重大風險。較高的價值往往會影響收益遞減規律。
預取值1是最保守的。它將顯着降低吞吐量,特別是在消費者連接延遲較高的環境中。對於許多應用來說,更高的值是合適的和最佳的。
當消費者失敗或失去連接時:自動重新排隊
使用手動確認時,除了我們主動讓消息重新排隊外,任何未確認的消息都將在關閉發生傳遞的信道(或連接)時自動重新排隊。這包括客戶端的TCP連接丟失,消費者應用程序(進程)故障和通道級協議異常.請注意,檢測不可用的客戶端需要一段時間。
由於這種行為,消費者必須准備好處理重新發送,否則就要考慮到冪等性。BasicDeliverEventArgs 有一個特殊的布爾屬性 : Redelivered,如果該消息是第一次交付,它將被設置為false.否則為 true.
測試:
還是借用上一個測試的代碼,只是分別加了一句話:
Console.WriteLine($"{str} 是否是重復發送 : " + e.Redelivered);
運行結果:
這里要特別注意,consumer2 收到 "hello world 0"的時候, Redelivered 的值依然是 true . 因為 Redelivered 屬性的維度是消息,不是消費者.