RabbitMQ --- 直連交換機 【 有回調方法,獲取消費結果 】


1.前言

  上一隨筆詳細記錄了直連交換機的方法,發送的消息是異步的,如果消息未被消費者消費,那么可以一直存在消息隊列中。

那么有沒有辦法做一個回調,當消息被消費后,被通知消息成功被消費者消費啦?

  答案是有的。

  需要在消息生產者的消息生產類實現  RabbitTemplate.ConfirmCallback  接口,重寫 回調方法confirm(),

同時 RabbitTemplate 模板工具需要自定義注入連接rabbitmq的連接工廠對象才可以正常執行回調操作。

  而消費者端的代碼不需要修改。

下面演示,以上一節隨筆為基礎,修改消息生產者部分代碼實現演示,隨筆地址https://www.cnblogs.com/c2g5201314/p/13156932.html
總結:
(1)異步操作,獲取回調消費結果,需要實現RabbitTemplate.ConfirmCallback 接口,然后重寫 confirm()方法。
(2)獲取回調結果,指的是獲取消息是否被消費端正常消費而返回的結果,並不是消費端返回
的處理結果,這一點得注意,如果需要等待消費端返回處理結果,則需要做同步操作,
而不是做回調操作。
(3)需要做同步操作時,應該rabbitTemplate.convertSendAndReceive()方法,返回結果類型是Object,需要根據消費端返回的數據類型來決定強轉的類型。
 (4)異步則使用rabbitTemplate.convertAndSend()方法。

 

2.操作

(1)修改配置類,添加自定義RabbitTemplate模板

 

 

 完整源碼

package com.example.rabbitmqproducer1004.config;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置類---消息生產者
 */
@Configuration
public class RabbitmqConfig {
    //日志記錄器
    Logger logger = LoggerFactory.getLogger(getClass());


    //===========================================================
    /**
     * 手動配置RabbitTemplate 是為了獲得回調操作,否則無法執行獲取消費結果
     */

    /**
     * 獲取rabbitmq的登錄信息
     */
    //ip地址
    @Value("${spring.rabbitmq.host}")
    private String host;
    //端口號
    @Value("${spring.rabbitmq.port}")
    private int port;
    //賬號
    @Value("${spring.rabbitmq.username}")
    private String username;
    //密碼
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    /**
     * 設置連接工廠
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        //實例緩存連接工廠,參數是 rabbitmq的ip和端口
        CachingConnectionFactory factory = new CachingConnectionFactory(host, port);
        //登錄用戶名
        factory.setUsername(username);
        //登錄密碼
        factory.setPassword(password);
        //設置主機的虛擬路徑
        factory.setVirtualHost("/");
        //確認是否發布
        factory.setPublisherConfirms(true);
        return factory;
    }

    /**
     * 設置rabbitmq模板
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        //將連接工程工廠對象注入模板里,然后返回一個模板對象
        return new RabbitTemplate(this.connectionFactory());

    }

//=====================================================================
    /**
     * 定義 交換機、消息隊列、路由鍵 的名字
     */

    //定義交換機名字 exchange
    public static final String EXCHANG_1 = "exchange_1";

    //定義消息隊列名字 queue
    public static final String QUEUE_1 = "queu_1";


    //定義路由鍵 routingkey
    public static final String ROUTINGKEY_1 = "routing_1";


    //===============================================================

    /**
     * 下面的是 直連交換機 設置 綁定 消息隊列 到 交換機
     *
     * DirectExchange:直連交換機,按照routingkey分發到指定隊列
     */
    //==============================================

    /**
     * 設置交換機類型
     */
    @Bean
    public DirectExchange directExchange() {
        logger.warn("設置交換機類型");
        //實例交換機對象,然后注入該交換機的名字
        return new DirectExchange(EXCHANG_1);
    }

    /**
     * 創建消息隊列
     */
    @Bean
    public Queue queue1() {
        logger.warn("創建消息隊列");
        //實例消息隊列對象,輸入該隊列名字,如果需要該隊列持久化,則設為true,默認是false
//        return new Queue(QUEUE_1, true);
        return new Queue(QUEUE_1);
    }

    /**
     * 綁定 消息隊列 到 交換機【一個 交換機 允許被多個 消息隊列 綁定】
     */
    @Bean
    public Binding binding() {
        logger.warn("綁定 消息隊列 到 交換機");
        //使用綁定構造器將 指定的隊列 綁定到 指定的交換機上 ,Direct交換機需要攜帶 路由鍵
        return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1);
    }


}
View Code

(2)修改消息生產類

實現接口

 

 

 重寫回方法

 

 

 使用構造注入RabbitTemplate模板對象

 

 

 完整源碼

package com.example.rabbitmqproducer1004.rabbitmqFactory;


import com.example.rabbitmqproducer1004.config.RabbitmqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * 消息生產類
 */
@Component
//實現接口
//public class SendMessage  {

//需要設置回調方法,獲取消費結果才需要實現 RabbitTemplate.ConfirmCallback 接口,
public class SendMessage implements RabbitTemplate.ConfirmCallback {
    Logger logger = LoggerFactory.getLogger(this.getClass());

//======================================================================
    /**
     * 方法一:設置回調方法,獲取消費結果,
     * <p>
     * 缺點是:必須手動配置RabbitTemplate模板 ,代碼量大
     */

    //存儲 rabbitmq模板的臨時變量
    private final RabbitTemplate rabbitTemplate;

    /**
     * 構造注入rabbitmq模板,這樣可以設置回調方法,獲取消費結果,但是必須手動配置RabbitTemplate模板
     */
    @Autowired
    public SendMessage(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //設置確認回調的方法,參數類型為ConfirmCallback
        this.rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 回調方法,獲取消費結果
     *
     * @param correlationData 關聯數據
     * @param b               消息是否被消費成功,成功為true ,失敗為false
     * @param s               原因 ,消費成功則返回null,否則返回失敗原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.warn("回調的連接數據:" + correlationData);
        if (correlationData != null) {
            //CorrelationData [id=1bcab025-2b4c-4f74-a22d-41007e30f551]
            logger.warn("獲取correlationData的id值:" + correlationData.getId());
        }
        //1bcab025-2b4c-4f74-a22d-41007e30f551
        if (b) {
            logger.warn("回調結果:消息消費成功");
        } else {
            logger.warn("回調結果:失敗。原因:" + s);
        }
    }

//========================================================================
//    /**
//     * 方法二 :不需要獲取獲取消費結果,只需要發送即可
//     *
//     * 優點:自動裝配,代碼量少
//     */
//    @Autowired
//    private RabbitTemplate rabbitTemplate;

//========================================================================

    /**
     * 發送消息
     * <p>
     * 參數是消息內容
     */
    public void send(String message) {
        logger.warn("發送消息,內容:" + message);
        /**
         * 方法一:異步操作,不等待消費者端返回處理結果,設置在回調操作的關聯數據,用於識別是哪一條消息和確認是否執行成功
         */
//        實例關聯數據對象,使用UUID隨機數 作為 回調id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串、關聯數據
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message,correlationData);
        /**
         * 方法二:異步操作,不等待消費者端返回處理結果,且在消息回調操作的關聯數據為null,如果不做回調操作,則建議這樣使用
         */
//        //發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由關鍵字、消息字符串
//        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message);
        /**
         * 方法三:同步操作,等待消費者端返回處理結果
         */
//        Object dd = rabbitTemplate.convertSendAndReceive(RabbitmqConfig.EXCHANG_1, RabbitmqConfig.ROUTINGKEY_1, message);
//        logger.warn("結果是什么???==" + dd);
    }


}
View Code

3.測試

依次啟動生產者端、消費者端

訪問網址 http://localhost:1004/mq?msg=你大爺,幫我發短信3999

 

 查看生產者控制台打印

 

 對調成功

 

再看看消費者的打印台

 

 

 

成功!!!撒花

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM