RabbitMQ之消費接收確認機制


 RabbitMQ的消息確認機制有兩種:

1)消息發送確認

這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。

ConfirmCallback 

通過實現ConfirmCallBack接口,消息發送到交換器Exchange后觸發回調。

使用該功能需要開啟確認,spring-boot中配置如下: 

spring.rabbitmq.publisher-confirms = true

ReturnCallback

通過實現ReturnCallback接口,如果消息從交換器發送到對應隊列失敗時觸發(比如根據發送消息時指定的routingKey找不到隊列時會觸發)

使用該功能需要開啟確認,spring-boot中配置如下:

spring.rabbitmq.publisher-returns = true

2)是消費接收確認

這種是確認消費者是否成功消費了隊列中的消息。

確認模式

AcknowledgeMode.NONE:不確認

AcknowledgeMode.AUTO:自動確認

AcknowledgeMode.MANUAL:手動確認

spring-boot中配置方法:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

手動確認

 

未確認的消息數

上圖為channel中未被消費者確認的消息數。

通過RabbitMQ的host地址加上默認端口號15672訪問管理界面。

(2.1)成功確認

void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:該消息的index

multiple:是否批量. true:將一次性ack所有小於deliveryTag的消息。

消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。

(2.2)失敗確認

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

throws IOException;

deliveryTag:該消息的index。

multiple:是否批量. true:將一次性拒絕所有小於deliveryTag的消息。

requeue:被拒絕的是否重新入隊列。

void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:該消息的index。

requeue:被拒絕的是否重新入隊列。

channel.basicNack 與 channel.basicReject 的區別在於basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。

 思考:

(1)手動確認模式,消息手動拒絕中如果requeue為true會重新放入隊列,但是如果消費者在處理過程中一直拋出異常,會導致入隊-》拒絕-》入隊的循環,該怎么處理呢?

第一種方法是根據異常類型來選擇是否重新放入隊列。

第二種方法是先成功確認,然后通過channel.basicPublish()重新發布這個消息。重新發布的消息網上說會放到隊列后面,進而不會影響已經進入隊列的消息處理。

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)

throws IOException;

(2)消息確認的作用是什么?

為了防止消息丟失。消息丟失分為發送丟失和消費者處理丟失,相應的也有兩種確認機制。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM