【rabbitmq】之confirm和return機制


這兩個機制都是發送端和mq服務器之間消息的確認,可以理解為生產端ack

1、confirm機制,消息的確認,是指生產者投遞消息之后,如果Broker收到消息,則會給生產者一個應答,生產者能接收應答,用來確定這條消息是否正常的發送到Broker,這種機制是消息可靠性投遞的核心保障。confirm機制是只保證消息到達exchange,並不保證消息可以路由到正確的queue。

 

2、return機制,用於處理一些不可路由的消息,在一些特殊的情況下,當前的exchange不存在或者指定的路由key路由不到,這時如果我們需要及時監聽這種消息,就需要return機制。

 

原代碼:https://www.cnblogs.com/gyjx2016/p/13622097.html

 

properties

server.port=8080

spring.rabbitmq.host=dev-mq.ttsingops.com
spring.rabbitmq.port=5672
spring.rabbitmq.username=cddayuwen
spring.rabbitmq.password=cddayuwen@123
spring.rabbitmq.virtual-host=/cd
#發布者確認 spring.rabbitmq.publisher-confirm-type=correlated #發布者到達確認 spring.rabbitmq.publisher-returns=true

spring.rabbitmq.listener.type=simple

#simple關閉自動ack,手動ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
### 開啟重試機制
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重試傳遞次數
spring.rabbitmq.listener.simple.retry.max-attempts=3
#第一次和第二次嘗試傳遞消息的間隔時間 單位毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000ms
#最大重試時間間隔,單位毫秒
spring.rabbitmq.listener.simple.retry.max-interval=300000ms
#應用前一次重試間隔的乘法器,multiplier默認為1
spring.rabbitmq.listener.simple.retry.multiplier=3
#以上配置的間隔0s  5s  15s  45s


#重試次數超過上面的設置之后是否丟棄(消費者listener拋出異常,是否重回隊列,默認true:重回隊列, false為不重回隊列(結合死信交換機))
spring.rabbitmq.listener.simple.default-requeue-rejected=true

### 模板配置 ##設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除 spring.rabbitmq.template.mandatory=true

 

稍作改動,新加confirm和return 確認代碼

@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    /**
     * confirm機制只保證消息到達exchange,不保證消息可以路由到正確的queue,如果exchange錯誤,就會觸發confirm機制
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("rabbitmq confirm fail,cause:{}", cause);
        }
    }


    /**
     * Return 消息機制用於處理一個不可路由的消息。在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定路由 key 路由不到,這個時候我們需要監聽這種不可達的消息
      * 就需要這種return機制 
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("mq消息不可達,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey);
        String messageId = message.getMessageProperties().getMessageId();
    }
}

 

/**
     * 設置返回回調和確認回調
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,RabbitMQConfirmAndReturn rabbitMQConfirmAndReturn) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setConfirmCallback(rabbitMQConfirmAndReturn); rabbitTemplate.setReturnCallback(rabbitMQConfirmAndReturn);
        //Mandatory為true時,消息通過交換器無法匹配到隊列會返回給生產者,為false時匹配不到會直接被丟棄
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

 

 

controller模擬這兩個確認機制

 

image

 

 

 

參考文獻:

https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#template-confirms

https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#cf-pub-conf-ret


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM