1.前言
上一隨筆詳細記錄了直連交換機的方法,發送的消息是異步的,如果消息未被消費者消費,那么可以一直存在消息隊列中。
那么有沒有辦法做一個回調,當消息被消費后,被通知消息成功被消費者消費啦?
答案是有的。
需要在消息生產者的消息生產類實現 RabbitTemplate.ConfirmCallback 接口,重寫 回調方法confirm(),
同時 RabbitTemplate 模板工具需要自定義注入連接rabbitmq的連接工廠對象才可以正常執行回調操作。
而消費者端的代碼不需要修改。
下面演示,以上一節隨筆為基礎,修改消息生產者部分代碼實現演示,隨筆地址https://www.cnblogs.com/c2g5201314/p/13156932.html
總結: (1)異步操作,獲取回調消費結果,需要實現RabbitTemplate.ConfirmCallback 接口,然后重寫 confirm()方法。 (2)獲取回調結果,指的是獲取消息是否被消費端正常消費而返回的結果,並不是消費端返回 的處理結果,這一點得注意,如果需要等待消費端返回處理結果,則需要做同步操作, 而不是做回調操作。 (3)需要做同步操作時,應該rabbitTemplate.convertSendAndReceive()方法,返回結果類型是Object,需要根據消費端返回的數據類型來決定強轉的類型。 (4)異步則使用rabbitTemplate.convertAndSend()方法。
2.操作
(1)修改配置類,添加自定義RabbitTemplate模板
完整源碼

package com.example.rabbitmqproducer1004.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq配置類---消息生產者 */ @Configuration public class RabbitmqConfig { //日志記錄器 Logger logger = LoggerFactory.getLogger(getClass()); //=========================================================== /** * 手動配置RabbitTemplate 是為了獲得回調操作,否則無法執行獲取消費結果 */ /** * 獲取rabbitmq的登錄信息 */ //ip地址 @Value("${spring.rabbitmq.host}") private String host; //端口號 @Value("${spring.rabbitmq.port}") private int port; //賬號 @Value("${spring.rabbitmq.username}") private String username; //密碼 @Value("${spring.rabbitmq.password}") private String password; /** * 設置連接工廠 */ @Bean public ConnectionFactory connectionFactory() { //實例緩存連接工廠,參數是 rabbitmq的ip和端口 CachingConnectionFactory factory = new CachingConnectionFactory(host, port); //登錄用戶名 factory.setUsername(username); //登錄密碼 factory.setPassword(password); //設置主機的虛擬路徑 factory.setVirtualHost("/"); //確認是否發布 factory.setPublisherConfirms(true); return factory; } /** * 設置rabbitmq模板 */ @Bean public RabbitTemplate rabbitTemplate() { //將連接工程工廠對象注入模板里,然后返回一個模板對象 return new RabbitTemplate(this.connectionFactory()); } //===================================================================== /** * 定義 交換機、消息隊列、路由鍵 的名字 */ //定義交換機名字 exchange public static final String EXCHANG_1 = "exchange_1"; //定義消息隊列名字 queue public static final String QUEUE_1 = "queu_1"; //定義路由鍵 routingkey public static final String ROUTINGKEY_1 = "routing_1"; //=============================================================== /** * 下面的是 直連交換機 設置 綁定 消息隊列 到 交換機 * * DirectExchange:直連交換機,按照routingkey分發到指定隊列 */ //============================================== /** * 設置交換機類型 */ @Bean public DirectExchange directExchange() { logger.warn("設置交換機類型"); //實例交換機對象,然后注入該交換機的名字 return new DirectExchange(EXCHANG_1); } /** * 創建消息隊列 */ @Bean public Queue queue1() { logger.warn("創建消息隊列"); //實例消息隊列對象,輸入該隊列名字,如果需要該隊列持久化,則設為true,默認是false // return new Queue(QUEUE_1, true); return new Queue(QUEUE_1); } /** * 綁定 消息隊列 到 交換機【一個 交換機 允許被多個 消息隊列 綁定】 */ @Bean public Binding binding() { logger.warn("綁定 消息隊列 到 交換機"); //使用綁定構造器將 指定的隊列 綁定到 指定的交換機上 ,Direct交換機需要攜帶 路由鍵 return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1); } }
(2)修改消息生產類
實現接口
重寫回方法
使用構造注入RabbitTemplate模板對象
完整源碼

package com.example.rabbitmqproducer1004.rabbitmqFactory; import com.example.rabbitmqproducer1004.config.RabbitmqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.UUID; /** * 消息生產類 */ @Component //實現接口 //public class SendMessage { //需要設置回調方法,獲取消費結果才需要實現 RabbitTemplate.ConfirmCallback 接口, public class SendMessage implements RabbitTemplate.ConfirmCallback { Logger logger = LoggerFactory.getLogger(this.getClass()); //====================================================================== /** * 方法一:設置回調方法,獲取消費結果, * <p> * 缺點是:必須手動配置RabbitTemplate模板 ,代碼量大 */ //存儲 rabbitmq模板的臨時變量 private final RabbitTemplate rabbitTemplate; /** * 構造注入rabbitmq模板,這樣可以設置回調方法,獲取消費結果,但是必須手動配置RabbitTemplate模板 */ @Autowired public SendMessage(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //設置確認回調的方法,參數類型為ConfirmCallback this.rabbitTemplate.setConfirmCallback(this); } /** * 回調方法,獲取消費結果 * * @param correlationData 關聯數據 * @param b 消息是否被消費成功,成功為true ,失敗為false * @param s 原因 ,消費成功則返回null,否則返回失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.warn("回調的連接數據:" + correlationData); if (correlationData != null) { //CorrelationData [id=1bcab025-2b4c-4f74-a22d-41007e30f551] logger.warn("獲取correlationData的id值:" + correlationData.getId()); } //1bcab025-2b4c-4f74-a22d-41007e30f551 if (b) { logger.warn("回調結果:消息消費成功"); } else { logger.warn("回調結果:失敗。原因:" + s); } } //======================================================================== // /** // * 方法二 :不需要獲取獲取消費結果,只需要發送即可 // * // * 優點:自動裝配,代碼量少 // */ // @Autowired // private RabbitTemplate rabbitTemplate; //======================================================================== /** * 發送消息 * <p> * 參數是消息內容 */ public void send(String message) { logger.warn("發送消息,內容:" + message); /** * 方法一:異步操作,不等待消費者端返回處理結果,設置在回調操作的關聯數據,用於識別是哪一條消息和確認是否執行成功 */ // 實例關聯數據對象,使用UUID隨機數 作為 回調id CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串、關聯數據 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message,correlationData); /** * 方法二:異步操作,不等待消費者端返回處理結果,且在消息回調操作的關聯數據為null,如果不做回調操作,則建議這樣使用 */ // //發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message); /** * 方法三:同步操作,等待消費者端返回處理結果 */ // Object dd = rabbitTemplate.convertSendAndReceive(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message); // logger.warn("結果是什么???==" + dd); } }
3.測試
依次啟動生產者端、消費者端
訪問網址 http://localhost:1004/mq?msg=你大爺,幫我發短信3999
查看生產者控制台打印
對調成功
再看看消費者的打印台
成功!!!撒花