這兩個機制都是發送端和mq服務器之間消息的確認,可以理解為生產端ack
1、confirm機制,消息的確認,是指生產者投遞消息之后,如果Broker收到消息,則會給生產者一個應答,生產者能接收應答,用來確定這條消息是否正常的發送到Broker,這種機制是消息可靠性投遞的核心保障。confirm機制是只保證消息到達exchange,並不保證消息可以路由到正確的queue。
2、return機制,用於處理一些不可路由的消息,在一些特殊的情況下,當前的exchange不存在或者指定的路由key路由不到,這時如果我們需要及時監聽這種消息,就需要return機制。
原代碼:https://www.cnblogs.com/gyjx2016/p/13622097.html
properties
server.port=8080
spring.rabbitmq.host=dev-mq.ttsingops.com
spring.rabbitmq.port=5672
spring.rabbitmq.username=cddayuwen
spring.rabbitmq.password=cddayuwen@123
spring.rabbitmq.virtual-host=/cd
#發布者確認 spring.rabbitmq.publisher-confirm-type=correlated #發布者到達確認 spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.type=simple
#simple關閉自動ack,手動ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
### 開啟重試機制
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重試傳遞次數
spring.rabbitmq.listener.simple.retry.max-attempts=3
#第一次和第二次嘗試傳遞消息的間隔時間 單位毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000ms
#最大重試時間間隔,單位毫秒
spring.rabbitmq.listener.simple.retry.max-interval=300000ms
#應用前一次重試間隔的乘法器,multiplier默認為1
spring.rabbitmq.listener.simple.retry.multiplier=3
#以上配置的間隔0s 5s 15s 45s
#重試次數超過上面的設置之后是否丟棄(消費者listener拋出異常,是否重回隊列,默認true:重回隊列, false為不重回隊列(結合死信交換機))
spring.rabbitmq.listener.simple.default-requeue-rejected=true
### 模板配置 ##設置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監聽,而不會自動刪除 spring.rabbitmq.template.mandatory=true
稍作改動,新加confirm和return 確認代碼
@Component @Slf4j public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { /** * confirm機制只保證消息到達exchange,不保證消息可以路由到正確的queue,如果exchange錯誤,就會觸發confirm機制 * * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("rabbitmq confirm fail,cause:{}", cause); } } /** * Return 消息機制用於處理一個不可路由的消息。在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定路由 key 路由不到,這個時候我們需要監聽這種不可達的消息 * 就需要這種return機制 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("mq消息不可達,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey); String messageId = message.getMessageProperties().getMessageId(); } }
/** * 設置返回回調和確認回調 * * @param connectionFactory * @return */ @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,RabbitMQConfirmAndReturn rabbitMQConfirmAndReturn) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setConfirmCallback(rabbitMQConfirmAndReturn); rabbitTemplate.setReturnCallback(rabbitMQConfirmAndReturn); //Mandatory為true時,消息通過交換器無法匹配到隊列會返回給生產者,為false時匹配不到會直接被丟棄 rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; }
controller模擬這兩個確認機制
參考文獻:
https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#template-confirms
https://docs.spring.io/spring-amqp/docs/2.1.17.RELEASE/reference/html/#cf-pub-conf-ret

