一、背景
生產端向rabbitmq發送消息時,由於網絡等原因可能導致消息發送失敗。所以,rabbitmq必須有機制確保消息能准確到達mq,如果不能到達,必須反饋給生產端進行重發。
RabbitMQ消息的可靠性投遞主要兩種實現:
1、通過實現消費的重試機制,通過@Retryable來實現重試,可以設置重試次數和重試頻率;
2、生產端實現消息可靠性投遞。
兩種方法消費端都可能收到重復消息,要求消費端必須實現冪等性消費。
二、消息投遞到exchange的確認模式
rabbitmq的消息投遞的過程為:
producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer
1、生產端發送消息到rabbitmq broker cluster后,異步接受從rabbitmq返回的ack確認信息。
2、生產端收到返回的ack確認消息后,根據ack是true還是false,調用confirmCallback接口進行處理。
在application.yml中開啟生產端confirm模式
spring: rabbitmq: publisher-confirms: true
實現ConfirmCallback接口中的confirm方法,ack為true表示消息發送成功,ack為false表示消息發送失敗
@Component @Slf4j public class RabbitTemplateConfig implements ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); // 指定 ConfirmCallback } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { //try to resend msg } else { //delete msg in db } } }
注意:在confirmCallback回調接口中是沒有消息數據的,所以即使消息發送失敗,生產端也無法在這個回調接口中直接重發,confirmCallback只能起到一個通知的作用。
三、消息投遞失敗的重發機制
如果rabbitmq返回ack失敗,生產端也無法確認消息是否真的發送成功,也會造成數據丟失。最好的辦法是使用rabbitmq的事務機制,但是rabbitmq的事務機制效率極低,每秒鍾處理的消息僅幾百條,不適合並發量大的場景。
另外一種實現思路:
1、生產端保存每次發送的消息,如果發送成功就刪除消息;
2、如果發送失敗就取出消息重新發送;
3、如果超時還沒有收到mq返回的ack,同樣取出消息重新發送。
這樣就可以避免消息丟失的風險。
以使用redis保存消息msg為例,具體實現方案為:
1、生產端在發送消息之前,生成ack唯一確認的id;
2、以ackId為鍵,消息為value,保存進redis緩存,設置超時時間;
3、redis實現超時觸發接口,當key過期時,重發消息並再次執行第2步;
4、生產端實現ConfirmCallback接口;
5、ConfirmCallback接口觸發時,若ack為true,則直接刪除此次ackId對應的msg;若ack為false,則將該ackId對應的msg取出重發;
網上另外的實現方案:
不通過設置redis超時時間觸發超時事件進行重發,而是取出消息放入一個ackFailList中,然后開啟定時任務,掃描ackFailList,重發失敗的msg。
網上的這套方案思路上和上一個方案差不多,但是是采用的額外的List來保存發送失敗的消息,由於List保存在內存中,不具備持久化的功能,所以這樣並不安全,如果生產端程序異常退出將導致消息丟失。可以考慮保存到數據庫中。
四、消息未投遞到queue的退回模式
生產端通過實現ReturnCallback接口,啟動消息失敗返回,消息路由不到隊列時會觸發該回調接口。
在application.yml中開啟return模式
spring: rabbitmq: publisher-returns: true
實現ReturnCallback接口,可以獲取消息主體內容,實現消息重發
@Component @Slf4j public class RabbitTemplateConfig implements ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體 message : {}", message); log.info("消息主體 message : {}", replyCode); log.info("描述:{}", replyText); log.info("消息使用的交換器 exchange : {}", exchange); log.info("消息使用的路由鍵 routing : {}", routingKey); } }
