消息應答時執行一個任務可能需要花費幾秒鍾,你可能會擔心如果一個消費者在執行任務過程中掛掉了。
一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ支持消息應答。
消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。
如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。
沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。
消息應答是默認打開的。我們通過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同於ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。
代碼示例:
生產者端代碼不變,消費者端代碼這部分就是用於開啟手動應答模式的。
// 監聽隊列,手動返回完成 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
第二個參數為false則表示關閉自動應答機制,改為手動應答
// 返回確認狀態 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
在處理完消息時,返回應答狀態。
消費者端完整代碼:
1 static void Main(string[] args) 2 { 3 var factory = new ConnectionFactory(); 4 factory.HostName = " "; 5 factory.Port = 5672; 6 factory.UserName = " "; 7 factory.Password = " "; 8 factory.AutomaticRecoveryEnabled = true; 9 using (var connection = factory.CreateConnection()) 10 { 11 using (var channel = connection.CreateModel()) 12 { 13 channel.QueueDeclare(queue: "Test", 14 durable: true, 15 exclusive: false, 16 autoDelete: false, 17 arguments: null); 18 //count設置等待數量,size:消息大小,global設置channel是否與connetion同級 19 channel.BasicQos(prefetchCount: 3, prefetchSize: 0, global: false); 20 var property= channel.CreateBasicProperties(); 21 property.Persistent = true; 22 Console.WriteLine(" [*] waiting for msg "); 23 var consumer = new EventingBasicConsumer(channel); 24 consumer.Received += (model, ea) => 25 { 26 var body = ea.Body; 27 var msg = Encoding.UTF8.GetString(body); 28 Console.WriteLine("[x] reciverd {0} ", msg); 29 int dots = msg.Split('.').Length - 1; 30 Thread.Sleep(dots * 3000); 31 32 Console.WriteLine(" [x] Done "); 33 //手動的消息回執 34 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 35 }; 36 //改為手動消息應答 37 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); 38 39 Console.WriteLine(" Press [enter] to exit "); 40 Console.ReadLine(); 41 } 42 } 43 }