RabbitMQ的消息確認機制


一、RabbitMQ消息確認機制

RabbitMQ的消息確認有兩種:
1、對生產端發送消息的確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
2、對消費端消費消息的確認。這種是確認消費者是否成功消費了隊列中的消息。

二、RabbitMQ對生產端發送消息的確認

 rabbitmq對生產端發送消息的確認分為事務和實現confirm機制。不過一般不使用事務,性能消耗太大。對生產端的confirm機制參見:https://www.cnblogs.com/alan6/p/11483419.html

三、消費端消費消息后對RabbitMQ的確認

為了保證消息能可靠到達消費端,RabbitMQ也提供了消費端的消息確認機制。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號后才從內存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。
采用消息確認機制后,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者顯式調用basicAck為止。

消費端消息的確認分為:自動確認(默認)、手動確認、不確認

AcknowledgeMode.NONE:不確認
AcknowledgeMode.AUTO:自動確認
AcknowledgeMode.MANUAL:手動確認

手動確認在spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual

1、消費成功手動確認方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:該消息的index
multiple:是否批量確認。true:將一次性ack所有小於deliveryTag的消息。
消費者成功處理消息后,手動調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行消費確認。

try {
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動確認消息
      System.out.println("投遞消息確認成功,tag:"+message.getMessageProperties().getDeliveryTag());
} catch (IOException e) {
      e.printStackTrace();
}

 

2、消費失敗手動確認方法:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:該消息的index。
multiple:是否批量. true:將一次性拒絕所有小於deliveryTag的消息。
requeue:被拒絕的是否重新入隊列。

void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:該消息的index。
requeue:被拒絕的是否重新入隊列。

channel.basicNack 方法與 channel.basicReject 方法區別在於basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。

四、消費者手動確認可能出現的問題

1、 消息無法ack

消費端在消費消息過程中出現異常,不能回復ack應答,消息將變成unacked狀態,並且一直處於隊列中。如果積壓的過多將會導致程序無法繼續消費數據。
消費端服務重啟,斷開rabbitmq的連接后,unacked的消息狀態會重新變為ready等待消費。但是如果不重啟消費端服務,消息將一直駐留在MQ中。

所以,可以捕獲異常,然后調用Nack確認,然后消息進入隊列重新消費。

2、無效消息循環重入隊列

在上一個問題中,如果消費端捕獲異常,並進行basicNack應答,並將消息重新放入隊列中,可能會出現另一個問題:

如果消息或者代碼本身有bug,每次處理這個消息都會報異常,那消息將一直處於消費——>報異常——>重入隊列——>繼續消費——>報異常。。。的死循環過程。 

以上兩個問題其實屬於同一類問題,都需要我們確保代碼在消費消息后,一定要通知MQ,不然消息將一直駐留在MQ中。如果消息成功消費,則調用channel.basicAck正常通知mq;如果消費失敗,則調用channel.basicNack或者channel.basicReject確認消費失敗。

但防止死循環有兩種處理辦法:

1、根據處理過程中報的不同異常類型,選擇消息要不要重入隊列。

enum Action {
  ACCEPT,  // 處理成功
  RETRY,   // 可以重試的錯誤
  REJECT,  // 無需重試的錯誤
}

Action action = Action.RETRY; 
try {
    // 如果成功完成則action=Action.ACCEPT
}
catch (Exception e) {
   // 根據異常種類決定是ACCEPT、RETRY還是 REJECT
}
finally {
  // 通過finally塊來保證Ack/Nack會且只會執行一次
  if (action == Action.ACCEPT) {
    channel.basicAck(tag);
  } else if (action == Action.RETRY) {
     channel.basicNack(tag, false, true);
  } else {
     channel.basicNack(tag, false, false);
  }  
} 

2、將處理失敗的消息放入另一個隊列中,手動取出處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM