一、RabbitMQ消息確認機制
RabbitMQ的消息確認有兩種:
1、對生產端發送消息的確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
2、對消費端消費消息的確認。這種是確認消費者是否成功消費了隊列中的消息。
二、RabbitMQ對生產端發送消息的確認
rabbitmq對生產端發送消息的確認分為事務和實現confirm機制。不過一般不使用事務,性能消耗太大。對生產端的confirm機制參見:https://www.cnblogs.com/alan6/p/11483419.html
三、消費端消費消息后對RabbitMQ的確認
為了保證消息能可靠到達消費端,RabbitMQ也提供了消費端的消息確認機制。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號后才從內存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。
采用消息確認機制后,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者顯式調用basicAck為止。
消費端消息的確認分為:自動確認(默認)、手動確認、不確認
AcknowledgeMode.NONE:不確認
AcknowledgeMode.AUTO:自動確認
AcknowledgeMode.MANUAL:手動確認
手動確認在spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
1、消費成功手動確認方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:該消息的index
multiple:是否批量確認。true:將一次性ack所有小於deliveryTag的消息。
消費者成功處理消息后,手動調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行消費確認。
try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動確認消息 System.out.println("投遞消息確認成功,tag:"+message.getMessageProperties().getDeliveryTag()); } catch (IOException e) { e.printStackTrace(); }
2、消費失敗手動確認方法:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:該消息的index。
multiple:是否批量. true:將一次性拒絕所有小於deliveryTag的消息。
requeue:被拒絕的是否重新入隊列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:該消息的index。
requeue:被拒絕的是否重新入隊列。
channel.basicNack 方法與 channel.basicReject 方法區別在於basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。
四、消費者手動確認可能出現的問題
1、 消息無法ack
消費端在消費消息過程中出現異常,不能回復ack應答,消息將變成unacked狀態,並且一直處於隊列中。如果積壓的過多將會導致程序無法繼續消費數據。
消費端服務重啟,斷開rabbitmq的連接后,unacked的消息狀態會重新變為ready等待消費。但是如果不重啟消費端服務,消息將一直駐留在MQ中。
所以,可以捕獲異常,然后調用Nack確認,然后消息進入隊列重新消費。
2、無效消息循環重入隊列
在上一個問題中,如果消費端捕獲異常,並進行basicNack應答,並將消息重新放入隊列中,可能會出現另一個問題:
如果消息或者代碼本身有bug,每次處理這個消息都會報異常,那消息將一直處於消費——>報異常——>重入隊列——>繼續消費——>報異常。。。的死循環過程。
以上兩個問題其實屬於同一類問題,都需要我們確保代碼在消費消息后,一定要通知MQ,不然消息將一直駐留在MQ中。如果消息成功消費,則調用channel.basicAck正常通知mq;如果消費失敗,則調用channel.basicNack或者channel.basicReject確認消費失敗。
但防止死循環有兩種處理辦法:
1、根據處理過程中報的不同異常類型,選擇消息要不要重入隊列。
enum Action { ACCEPT, // 處理成功 RETRY, // 可以重試的錯誤 REJECT, // 無需重試的錯誤 } Action action = Action.RETRY; try { // 如果成功完成則action=Action.ACCEPT } catch (Exception e) { // 根據異常種類決定是ACCEPT、RETRY還是 REJECT } finally { // 通過finally塊來保證Ack/Nack會且只會執行一次 if (action == Action.ACCEPT) { channel.basicAck(tag); } else if (action == Action.RETRY) { channel.basicNack(tag, false, true); } else { channel.basicNack(tag, false, false); } }
2、將處理失敗的消息放入另一個隊列中,手動取出處理。