Rabbitmq的可靠消息投遞


一、背景

生產端向rabbitmq發送消息時,由於網絡等原因可能導致消息發送失敗。所以,rabbitmq必須有機制確保消息能准確到達mq,如果不能到達,必須反饋給生產端進行重發。

RabbitMQ消息的可靠性投遞主要兩種實現:
1、通過實現消費的重試機制,通過@Retryable來實現重試,可以設置重試次數和重試頻率;
2、生產端實現消息可靠性投遞。

兩種方法消費端都可能收到重復消息,要求消費端必須實現冪等性消費。

二、消息投遞到exchange的確認模式

rabbitmq的消息投遞的過程為:
producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer

1、生產端發送消息到rabbitmq broker cluster后,異步接受從rabbitmq返回的ack確認信息。

2、生產端收到返回的ack確認消息后,根據ack是true還是false,調用confirmCallback接口進行處理。

 

在application.yml中開啟生產端confirm模式

spring:
  rabbitmq:
    publisher-confirms: true

 

實現ConfirmCallback接口中的confirm方法,ack為true表示消息發送成功,ack為false表示消息發送失敗

@Component
@Slf4j
public class RabbitTemplateConfig implements ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {        
        rabbitTemplate.setConfirmCallback(this);   // 指定 ConfirmCallback
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
       if (!ack) {
          //try to resend msg
       } else {
          //delete msg in db
       }
   }
}

注意:在confirmCallback回調接口中是沒有消息數據的,所以即使消息發送失敗,生產端也無法在這個回調接口中直接重發,confirmCallback只能起到一個通知的作用。

三、消息投遞失敗的重發機制

如果rabbitmq返回ack失敗,生產端也無法確認消息是否真的發送成功,也會造成數據丟失。最好的辦法是使用rabbitmq的事務機制,但是rabbitmq的事務機制效率極低,每秒鍾處理的消息僅幾百條,不適合並發量大的場景。 

 

另外一種實現思路:

1、生產端保存每次發送的消息,如果發送成功就刪除消息;

2、如果發送失敗就取出消息重新發送;

3、如果超時還沒有收到mq返回的ack,同樣取出消息重新發送。

這樣就可以避免消息丟失的風險。

 

以使用redis保存消息msg為例,具體實現方案為:

1、生產端在發送消息之前,生成ack唯一確認的id;

2、以ackId為鍵,消息為value,保存進redis緩存,設置超時時間;

3、redis實現超時觸發接口,當key過期時,重發消息並再次執行第2步;

4、生產端實現ConfirmCallback接口;

5、ConfirmCallback接口觸發時,若ack為true,則直接刪除此次ackId對應的msg;若ack為false,則將該ackId對應的msg取出重發;

 

網上另外的實現方案:

不通過設置redis超時時間觸發超時事件進行重發,而是取出消息放入一個ackFailList中,然后開啟定時任務,掃描ackFailList,重發失敗的msg。

網上的這套方案思路上和上一個方案差不多,但是是采用的額外的List來保存發送失敗的消息,由於List保存在內存中,不具備持久化的功能,所以這樣並不安全,如果生產端程序異常退出將導致消息丟失。可以考慮保存到數據庫中。

四、消息未投遞到queue的退回模式

生產端通過實現ReturnCallback接口,啟動消息失敗返回,消息路由不到隊列時會觸發該回調接口。

 

在application.yml中開啟return模式

spring:
  rabbitmq:
    publisher-returns: true

 

實現ReturnCallback接口,可以獲取消息主體內容,實現消息重發

@Component
@Slf4j
public class RabbitTemplateConfig implements ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {        
        rabbitTemplate.setReturnCallback(this);   //指定 ReturnCallback
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息主體 message : {}", message);
        log.info("消息主體 message : {}", replyCode);
        log.info("描述:{}", replyText);
        log.info("消息使用的交換器 exchange : {}", exchange);
        log.info("消息使用的路由鍵 routing : {}", routingKey);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM