rabbitmq消息可靠性和插件機制


1、消息可靠性

rabbitmq一般通過三個方面保證消息的可靠性:

(1)、發送可靠性:確保消息成功發送到broker端。

 

  rabbitmq支持“最多一次”和“最少一次”。

其中“最少一次”的實現需要考慮以下幾個方面的內容:

消息生產者需要開啟事務機制或者publisher confirm機制,已保證消息可以可靠的傳輸到rabbitmq中;消息生產者需要配合使用mandatory參數或者備份交換器來確保消息能夠從交換器路由到隊列中,進而能夠保存下來而不會被丟棄。

“最多一次”的方式就無須考慮以上內容,生產者隨意發送,不過這樣很難保證消息或成功發送。

配置消息回調

參考地址:https://www.cnblogs.com/wangzaiplus/p/11213709.html

目前回調存在ConfirmCallback和ReturnCallback兩者。他們的區別在於

如果消息沒有到exchange,則ConfirmCallback回調,ack=false, 如果消息到達exchange,則ConfirmCallback回調,ack=true exchange到queue成功,則不回調ReturnCallback(必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發回調)

ConfirmCallback的回調

/** * 消息發送成功的回調 * 需要開啟 * # 開啟發送確認 * spring.rabbitmq.publisher-confirm-type=CORRELATED **/ @Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData必須在發送的時候設置,否則將會使null,如下: //CorrelationData correlationData = new CorrelationData("1"); //rabbitTemplate.convertAndSend("OrderDispathExchange","kk", // "{\"msgId\":1,\"data\":\"測試\"}",correlationData);
        log.info("消息唯一標識: {}", correlationData); log.info("確認狀態: {}", ack); log.info("造成原因: {}", cause); } }

ReturnCallback的回調

/** * 發生異常時的消息返回提醒 * 需要開啟 * # 開啟發送失敗退回 * spring.rabbitmq.publisher-returns=true **/ @Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體: {}", message); log.info("回復編碼: {}", replyCode); log.info("回復內容: {}", replyText); log.info("交換器: {}", exchange); log.info("路由鍵: {}", routingKey); } }

將回調配置到模板中

@Configuration public class RabbitMqConf { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ CachingConnectionFactory factory = (CachingConnectionFactory)connectionFactory; factory.addConnectionListener(new ConnectionListener() { @Override public void onCreate(Connection connection) { System.err.println("端口:"+connection.getLocalPort()); } });      RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); // 觸發setReturnCallback回調必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發回調
        rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); return rabbitTemplate; }

(2)、存儲可靠性:broker端對消息的持久化,確保消息不會丟失。

(3)、消費可靠性:確保消息成功被消費。(可以參考:RabbitMq解決分布式事務 

 消費者在消費消息的同時,需要將autoAck設置為false(spring.rabbitmq.listener.simple.acknowledge-mode=manual),然后通過手動確認的方式去確認已經正常消費的消息,以免在消費端引起不必要的消息丟失。

2、插件機制

rabbitmq支持插件,通過插件可以擴展多種核心功能:支持多種協議、系統狀態監控、其它AMQP 0-9-1交換類型、節點聯合等。許多功能都是通過插件實現的。

可以通過命令:rabbitmq-plugins list查看當前rabbitmq安裝的插件

 通過以下命令可以啟用或禁用指定插件

#啟用插件 rabbitmq-plugins enable <plugin_name> #禁用插件 rabbitmq-plugins disable<plugin_name>

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM