Rabbitmq可靠消息投遞,消息確認機制


前言

我們知道,消息從發送到簽收的整個過程是
Producer-->Broker/Exchange-->Broker/Queue-->Consumer,因此如果只是要保證消息的可靠投遞,我們需要考慮的僅是前兩個階段,因為消息只要成功到達隊列,就算投遞成功。

  • 比如投遞消息時指定的Exchange不存在,那么階段一就會失敗
  • 如果投遞到Exchange成功,但是指定的路由件錯誤或者別的原因,消息沒有從Exchange到達Queue,那就是第二階段出錯。
    在這里插入圖片描述

而從生產者和消費者角度來看,消息成功投遞到隊列才算成功投遞,因此階段一和階段而都屬於生產者一方需要關注,階段三屬於消費者一方,這里只考慮消息的成功投遞,因此不考慮消費者的簽收部分。而Rabbitmq和springboot整合時,默認是沒有開啟消息確認的。

開啟消息確認機制

一、Producer --> Broker/Exchange ConfirmCallback

1. 配置
# springboot2.2.0以前,
# spring.rabbitmq.publisher-confirms=true
# springboot2.2.0以后
spring.rabbitmq.publisher-confirm-type=correlated

關於這個Type有三種取值:

  • none:默認值,不開啟confirmcallback機制

  • correlated:開啟confirmcallback,發布消息時,可以指定一個CorrelationData,會被保存到消息頭中,消息投遞到Broekr時觸發生產者指定的ConfirmCallback,這個值也會被返回,以進行對照處理,CorrelationData可以包含比較豐富的元信息進行回調邏輯的處理。無特殊需求,就設定為這個值。

  • simple:這個比較復雜,spring官方指出:

    Normally, when using the template, a Channel is checked out of the cache (or created), used for the operation, and returned to the cache for reuse. In a multi-threaded environment, there is no guarantee that the next operation uses the same channel. There may be times, however, where you want to have more control over the use of a channel and ensure that a number of operations are all performed on the same channel.
    2529 / 5000
    翻譯結果
    通常,使用模板時,會從緩存中檢出(或創建)通道,以進行操作,然后將其返回到緩存中以進行重用。在多線程環境中,不能保證下一個操作使用相同的通道。但是,有時您可能希望更好地控制通道的使用,並確保在同一通道上執行全部操作。

    也就說這,這個simple模式:其一效果和correlated值一樣能觸發回調方法,其二用於發布消息成功后使用rabbitTemplate調用waitForConfirmswaitForConfirmsOrDie方法等待broker節點返回發送結果,需求根據返回結果來判定下一步的邏輯,執行更復雜的業務。要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息。

2. 如何使用

SpringBoot自動配置幫我們往容器中注冊了一個RabbitTemplate,但因為默認沒有開啟消息確認機制,因此它在創建時並未配置confirmCallback屬性,我們需要手動為其創建一個 RabbitTemplate.ConfirmCallback

因此我們需要拿到這個創建好的RabbitTemplate,再手動執行其setConfirmCallback方法。

拿到這個對象簡單,只要一個@Autowired,問題是我們要保證之后通過@Autowired拿到的RabbitTemplate是已經注冊了ConfirmCallback的。

我們手寫一個配置類/或者隨便什么類,加上注解@Component/@Service/@Controller/@Configuration,無論哪個,只要能被自動創建並加入容器,然后我們寫一個方法,加上@PostConstructor表示創建這個對象完成時需要回調這個方法,我們在這個類中拿到RabbitTemplate,在這個方法中執行它的setConfirmCallback,這樣spring容器在創建我們這個配置類的時候將創好的RabbitTemplate進行了完善,而整個過程都是在spring'boot啟動過程中自動完成,就能保證我們之后使用@Autowired拿到的RabbitTemplate就是注冊號confirmCallback的。

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

 	// 方法名無所謂,主要是 @PostConstruct 指定它一定會被回調
    @PostConstruct
    public void setCallback() {
        /**
         * 為容器創建好的rabbitTemplate注冊confirmCallback
         * 消息由生產者投遞到Broker/Exchange回調
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        	/**
             * @param correlationData 發送消息時指定的唯一關聯數據(消息id)
             * @param ack 這個消息是否成功投遞到Exchange
             * @param cause 失敗的原因
             */
        	@Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				if (ack) {
	                log.info("消息投遞到交換機成功:[correlationData={}]",correlationData);
	            } else {
	                log.error("消息投遞到交換機失敗:[correlationData={},原因:{}]", correlationData, cause);
	            }
            }
        });
    }
}

二、Exchange-->Queue ConfirmCallback

1. 配置
  • 注意下面兩項必須同時配置,可以嘗試不配置第二項,通過測試能夠發現當消息路由到Queue失敗(比如路由件錯誤)時,returnCallback並未被回調。
# 開啟階段二(消息從E->Q)的確認回調    Exchange --> Queue  returnCallback
spring.rabbitmq.publisher-returns=true
# 官方文檔說此時這一項必須設置為true
# 實際上這一項的作用是:消息【未成功到達】隊列時,能監聽到到路由不可達的消息,以異步方式優先調用我們自己設置的returnCallback,默認情況下,這個消息會被直接丟棄,無法監聽到
spring.rabbitmq.template.mandatory=true
2. 如何使用

和注冊confirmCallback的原理一樣,就不多贅述,直接看配置,需要注意的是 這個回調只會在消息在從Exchange投遞到Queue【失敗】時被執行

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setCallback() {
        
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        	 /**
             * 能夠看出來,參數中並沒有像confirmCallback那樣提供的boolean類型					     的ack,因此這個回調只是在【失敗】情況下觸發的
             * @param message 發送的消息
             * @param replyCode 回復錯誤碼
             * @param replyText 回復錯誤內容
             * @param exchange  發送消息時指定的交換機
             * @param routingKey 發送消息時使用的路由件
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("路由到隊列失敗,消息內容:{},交換機:{},路由件:{},回復碼:{},回復文本:{}", message, exchange, routingKey, replyCode, replyText);
            }
        });
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM