一:確認種類
RabbitMQ的消息確認有兩種。
一種是消息發送確認,用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
第二種是消費接收確認。確認消費者是否成功消費了隊列中的消息。
二:消息發送確認
(1)ConfirmCallback
通過實現ConfirmCallBack接口,消息發送到交換器Exchange后觸發回調。
使用該功能需要開啟確認,spring-boot中配置如下: spring.rabbitmq.publisher-confirms = true
(2)ReturnCallback
通過實現ReturnCallback接口,如果消息從交換器發送到對應隊列失敗時觸發,比如根據發送消息時指定的routingKey找不到隊列時會觸發。
使用該功能需要開啟確認,spring-boot中配置如下: spring.rabbitmq.publisher-returns = true
三:消息接收確認
(1)確認模式
- AcknowledgeMode.NONE:不確認
- AcknowledgeMode.AUTO:自動確認
- AcknowledgeMode.MANUAL:手動確認
spring-boot中配置方法:spring.rabbitmq.listener.simple.acknowledge-mode = manual
(2)手動確認
圖 未確認的消息數
上圖為channel中未被消費者確認的消息數。通過RabbitMQ的host地址加上默認端口號15672訪問管理界面。
(2.1)成功確認
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:該消息的index
multiple:是否批量. true:將一次性ack所有小於deliveryTag的消息。
消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。
(2.2)失敗確認
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:該消息的index。
multiple:是否批量。true:將一次性拒絕所有小於deliveryTag的消息。
requeue:被拒絕的是否重新入隊列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:該消息的index。
requeue:被拒絕的是否重新入隊列。
channel.basicNack 與 channel.basicReject 的區別在於basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。
消息隊列的基礎知識可以參考:消息隊列RabbitMQ基礎知識詳解
四:思考
(1)手動確認模式,消息手動拒絕中如果requeue為true會重新放入隊列,但是如果消費者在處理過程中一直拋出異常,會導致入隊-》拒絕-》入隊的循環,該怎么處理呢?
第一種方法是根據異常類型來選擇是否重新放入隊列。
第二種方法是先成功確認,然后通過channel.basicPublish()重新發布這個消息。重新發布的消息網上說會放到隊列后面,進而不會影響已經進入隊列的消息處理。
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
(2)消息確認的作用是什么?
為了防止消息丟失。消息丟失分為發送丟失和消費者處理丟失,相應的也有兩種確認機制。
(3)Redis和 rabbit MQ 都可以做消息隊列,為什么選擇了Rabbit MQ ?
RabbitMQ和Redis都可以做隊列,但是他們還是有區別的。比如,Redis的消息隊列,如果在從隊列pop出去的時候,worker處理失敗的話,數據不會回到隊列中,需要從業務中手動把失敗的處理數據push到隊列中,而RabbitMQ可以自動處理失敗的worker使數據不丟失;RabbitMQ還可以保證數據在傳輸過程中持久化,在通道和隊列中的數據可以設置為持久化。另外,Redis嚴格來說並不是消息隊列,它是一個內存數據庫,不過因為其某些特性適合用來充當隊列,所以也多被用於做簡單的MQ, Redis之父倒是開發了個真正的消息隊列disque,有興趣可以看看。
相比起Redis,RabbitMQ有更加完善的MQ機制,這里我們僅討論消息的durable(持久性),后續一系列其他機制有時間再交流。
RabbitMQ有一個消息確認機制來保證消息的不丟失:客戶端從隊列中取出消息之后,可能需要一段時間才能處理完成,如果在這個過程中,客戶端出錯了,異常退出了,而數據還沒有處理完成,那么非常不幸,這段數據就丟失了,因為RabbitMQ默認會把此消息標記為已完成,然后從隊列中移除,消息確認是客戶端從RabbitMQ中取出消息,並處理完成之后,會發送一個ack告訴RabbitMQ,消息處理完成,當RabbitMQ收到客戶端的獲取消息請求之后,或標記為處理中,當再次收到ack之后,才會標記為已完成,然后從隊列中刪除。當RabbitMQ檢測到客戶端和自己斷開鏈接之后,還沒收到ack,則會重新將消息放回消息隊列,交給下一個客戶端處理,保證消息不丟失,也就是說,RabbitMQ給了客戶端足夠長的時間來做數據處理。
五:消息持久化
- RabbitMQ在服務端沒有聲明隊列queue持久化(durable=True)時,隊列是存在內存中的,服務端down機了,隊列也隨之消失。
- RabbitMQ在服務端只聲明queue持久化,但是在發送消息時,沒有聲明持久化(properties=pika.BasicProperties(delivery_mode=2,)),服務器down機,只保留隊列,但是不保留消息。
- RabbitMQ在服務端聲明queue持久化,並且在發消息時也聲明了持久化,服務down機重啟后,隊列和消息都能得到保存。
- 服務端聲明持久化,客戶端想接受消息的話,必須也要聲明queue時,也要聲明持久化,不然的話,客戶端執行會報錯。
Reference:
