RabbitMQ的消息確認機制有兩種:
1)消息發送確認
這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
ConfirmCallback
通過實現ConfirmCallBack接口,消息發送到交換器Exchange后觸發回調。 使用該功能需要開啟確認,spring-boot中配置如下:
spring.rabbitmq.publisher-confirms = true
ReturnCallback
通過實現ReturnCallback接口,如果消息從交換器發送到對應隊列失敗時觸發(比如根據發送消息時指定的routingKey找不到隊列時會觸發) 使用該功能需要開啟確認,spring-boot中配置如下: spring.rabbitmq.publisher-returns = true
2)是消費接收確認
這種是確認消費者是否成功消費了隊列中的消息。
確認模式
AcknowledgeMode.NONE:不確認 AcknowledgeMode.AUTO:自動確認 AcknowledgeMode.MANUAL:手動確認 spring-boot中配置方法: spring.rabbitmq.listener.simple.acknowledge-mode = manual
手動確認
未確認的消息數
上圖為channel中未被消費者確認的消息數。
通過RabbitMQ的host地址加上默認端口號15672訪問管理界面。
(2.1)成功確認
void basicAck(long deliveryTag, boolean multiple) throws IOException; deliveryTag:該消息的index multiple:是否批量. true:將一次性ack所有小於deliveryTag的消息。
消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。
(2.2)失敗確認
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; deliveryTag:該消息的index。 multiple:是否批量. true:將一次性拒絕所有小於deliveryTag的消息。 requeue:被拒絕的是否重新入隊列。 void basicReject(long deliveryTag, boolean requeue) throws IOException; deliveryTag:該消息的index。 requeue:被拒絕的是否重新入隊列。
channel.basicNack 與 channel.basicReject 的區別在於basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。
思考:
(1)手動確認模式,消息手動拒絕中如果requeue為true會重新放入隊列,但是如果消費者在處理過程中一直拋出異常,會導致入隊-》拒絕-》入隊的循環,該怎么處理呢?
第一種方法是根據異常類型來選擇是否重新放入隊列。
第二種方法是先成功確認,然后通過channel.basicPublish()重新發布這個消息。重新發布的消息網上說會放到隊列后面,進而不會影響已經進入隊列的消息處理。
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
(2)消息確認的作用是什么?
為了防止消息丟失。消息丟失分為發送丟失和消費者處理丟失,相應的也有兩種確認機制。