Rabbit--ack機制


  消息應答時執行一個任務可能需要花費幾秒鍾,你可能會擔心如果一個消費者在執行任務過程中掛掉了。
  一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。

  但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那么我們應該將分發給它的任務交付給另一個消費者去處理。

  為了確保消息不會丟失,RabbitMQ支持消息應答

  消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。

  如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。

  沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。

  消息應答是默認打開的。我們通過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同於ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。

代碼示例:

生產者端代碼不變,消費者端代碼這部分就是用於開啟手動應答模式的。

// 監聽隊列,手動返回完成  
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); 

第二個參數為false則表示關閉自動應答機制,改為手動應答

// 返回確認狀態  
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

在處理完消息時,返回應答狀態。

消費者端完整代碼:

 1 static void Main(string[] args)
 2         {
 3             var factory = new ConnectionFactory();
 4             factory.HostName = " ";
 5             factory.Port = 5672;
 6             factory.UserName = " ";
 7             factory.Password = " ";
 8             factory.AutomaticRecoveryEnabled = true;
 9             using (var connection = factory.CreateConnection())
10             {
11                 using (var channel = connection.CreateModel())
12                 {
13                     channel.QueueDeclare(queue: "Test",
14                         durable: true,
15                         exclusive: false,
16                         autoDelete: false,
17                         arguments: null);
18                  //count設置等待數量,size:消息大小,global設置channel是否與connetion同級
19                     channel.BasicQos(prefetchCount: 3, prefetchSize: 0, global: false);
20                     var property= channel.CreateBasicProperties();
21                     property.Persistent = true;
22                     Console.WriteLine(" [*] waiting for msg ");
23                     var consumer = new EventingBasicConsumer(channel);
24                     consumer.Received += (model, ea) =>
25                     {
26                         var body = ea.Body;
27                         var msg = Encoding.UTF8.GetString(body);
28                         Console.WriteLine("[x] reciverd {0} ", msg);
29                         int dots = msg.Split('.').Length - 1;
30                         Thread.Sleep(dots * 3000);
31 
32                         Console.WriteLine(" [x] Done ");
33                       //手動的消息回執
34                         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
35                     };
36                    //改為手動消息應答
37                     channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
38 
39                     Console.WriteLine(" Press [enter] to exit ");
40                     Console.ReadLine();
41                 }
42             }
43         }    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM