rabbitmq生產者的消息確認


通過Publisher Confirms and Returns機制,生產者可以判斷消息是否發送到了exchange及queue,而通過消費者確認機制,Rabbitmq可以決定是否重發消息給消費者,以保證消息被處理。

1.什么是Publisher Confirms and Returns?

Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-9-1 parlance; broker acknowledgements to publishers are a protocol extension called publisher confirms. 
地址:http://www.rabbitmq.com/confirms.html

根據RabbitMq官網定義,rabbitmq代理(broker)對發布者(publishers)的確認被稱作發布者確認(publisher confirms),這種機制是Rabbitmq對標准Amqp協議的擴展。因此通過這種機制可以確認消息是否發送給了目標。

2.如何通過Spring amqp來使用Publisher Confirms and Returns機制?

Confirmed and returned messages are supported by setting the CachingConnectionFactory’s publisherConfirms and publisherReturns properties to ‘true’ respectively.When these options are set, Channel s created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections. 
http://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html#cf-pub-conf-ret

通過Spring amqp文檔可以看到,要使用這種機制需要將Template模版的設publisherConfirms 或publisherReturns 屬性設置為true,此外ConnectionFactory要配置為CachingConnectionFactory。

復制代碼
    <bean id="connectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="192.168.2.133" />
        <property name="port" value="5672" />
        <property name="username" value="sun" />
        <property name="password" value="123456" />
        <property name="publisherConfirms" value="true" />
        <property name="publisherReturns" value="true" />
    </bean>
復制代碼

2.1 ConfirmCallback的使用及觸發的一種場景

復制代碼
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; /** * @author wangzhongqiu * Created on 2017/10/31. * @description:繼承RabbitTemplate.ConfirmCallback,消息確認監聽器 */ @Service public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback { private Logger log = LoggerFactory.getLogger(CommonProducer.class); @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("收到回調,成功發送到broker"); } }
復制代碼

2.2 ReturnCallback的使用及觸發的一種場景

復制代碼
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; /** * @author wangzhongqiu * Created on 2017/10/31. * @description:繼承RabbitTemplate.ReturnCallback,消息發送失敗返回監聽器 */ @Service public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback { private Logger log = LoggerFactory.getLogger(CommonProducer.class); @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("收到回調"); log.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); } }
復制代碼

使用場景:

如果消息沒有到exchange,則confirm回調,ack=false

如果消息到達exchange,則confirm回調,ack=true

exchange到queue成功,則不回調return

exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)

 

轉載::http://www.cnblogs.com/wangzhongqiu/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM