SpringBoot整合RabbitMQ


文末有源碼地址

添加依賴

<!--amqp依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

創建所需常量類

public class RabbitMQConstant {

    //簡單模式
    public static final String QUEUE_EASY = "easy.queue";
    //work模式
    public static final String QUEUE_WORK = "work.queue";
    //topic模式
    public static final String QUEUE_TOPIC_FIRST = "topic.queue.first";
    public static final String QUEUE_TOPIC_SECOND = "topic.queue.second";
    //發布訂閱模式
    public static final String QUEUE_FANOUT = "fanout.queue";
    public static final String QUEUE_FANOUT_SECOND = "fanout.queue.second";

    //路由key
    public static final String ROUTING_KEY_EASY = "routing.key.easy";
    public static final String ROUTING_KEY_WORK = "routing.key.work";
    public static final String ROUTING_KEY_TOPIC_FIRST = "routing.key.topic.first";
    public static final String ROUTING_KEY_TOPIC_SECOND = "routing.key.topic.second";


    // direct交換機
    public static final String EXCHANGE_DIRECT = "direct_exchange";
    // work交換機
    public static final String EXCHANGE_WORK = "work_exchange";
    // topic交換機
    public static final String EXCHANGE_TOPIC = "topic_exchange";
    // fanout交換機
    public static final String EXCHANGE_FANOUT = "fanout_exchange";

}

創建交換機

@Configuration
public class ExchangeConfig {

    /**
     * 交換機說明:
     * durable="true" rabbitmq重啟的時候不需要創建新的交換機
     * auto-delete 表示交換機沒有在使用時將被自動刪除 默認是false
     * direct交換器相對來說比較簡單,匹配規則為:如果路由鍵匹配,消息就被投送到相關的隊列
     * topic交換器你采用模糊匹配路由鍵的原則進行轉發消息到隊列中
     * fanout交換器中沒有路由鍵的概念,他會把消息發送到所有綁定在此交換器上面的隊列中。
     */

    @Bean(name = RabbitMQConstant.EXCHANGE_DIRECT)
    public DirectExchange directExchange() {
        return new DirectExchange(RabbitMQConstant.EXCHANGE_DIRECT, true, false);
    }

    @Bean(name = RabbitMQConstant.EXCHANGE_WORK)
    public DirectExchange workExchange() {
        return new DirectExchange(RabbitMQConstant.EXCHANGE_WORK, true, false);
    }

    @Bean(name = RabbitMQConstant.EXCHANGE_TOPIC)
    public TopicExchange topicExchange() {
        return new TopicExchange(RabbitMQConstant.EXCHANGE_TOPIC, true, false);
    }

    @Bean(name = RabbitMQConstant.EXCHANGE_FANOUT)
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(RabbitMQConstant.EXCHANGE_FANOUT, true, false);
    }

}

創建隊列

@Configuration
public class QueueConfig {

    /**
     * durable="true" 持久化 rabbitmq重啟的時候不需要創建新的隊列
     * exclusive 表示該消息隊列是否只在當前connection生效,默認是false
     * auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false
     */

    @Bean(name = RabbitMQConstant.QUEUE_EASY)
    public Queue easyQueue() {
        return new Queue(RabbitMQConstant.QUEUE_EASY, true, false, false);
    }

    @Bean(name = RabbitMQConstant.QUEUE_WORK)
    public Queue workQueue() {
        return new Queue(RabbitMQConstant.QUEUE_WORK, true, false, false);
    }

    @Bean(name = RabbitMQConstant.QUEUE_TOPIC_FIRST)
    public Queue topicQueue() {
        return new Queue(RabbitMQConstant.QUEUE_TOPIC_FIRST, true, false, false);
    }

    @Bean(name = RabbitMQConstant.QUEUE_TOPIC_SECOND)
    public Queue topicQueueSecond() {
        return new Queue(RabbitMQConstant.QUEUE_TOPIC_SECOND, true, false, false);
    }

    @Bean(name = RabbitMQConstant.QUEUE_FANOUT)
    public Queue fanoutQueue() {
        return new Queue(RabbitMQConstant.QUEUE_FANOUT, true, false, false);
    }

    @Bean(name = RabbitMQConstant.QUEUE_FANOUT_SECOND)
    public Queue fanoutQueueSecond() {
        return new Queue(RabbitMQConstant.QUEUE_FANOUT_SECOND, true, false, false);
    }


}

綁定交換機和隊列

@Configuration
@Slf4j
public class RabbitMqConfig {

    @Resource
    private QueueConfig queueConfig;
    @Resource
    private ExchangeConfig exchangeConfig;
    /**
     * 連接工廠
     */
    @Resource
    private ConnectionFactory connectionFactory;


    /**
     * 將消息隊列和交換機進行綁定,指定路由
     */
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(queueConfig.easyQueue()).to(exchangeConfig.directExchange()).with(RabbitMQConstant.ROUTING_KEY_EASY);
    }

    @Bean
    public Binding bindingWork() {
        return BindingBuilder.bind(queueConfig.workQueue()).to(exchangeConfig.workExchange()).with(RabbitMQConstant.ROUTING_KEY_WORK);
    }

    @Bean
    public Binding bindingTopic() {
        return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_FIRST);
    }

    @Bean
    public Binding bindingTopicSecond() {
        return BindingBuilder.bind(queueConfig.topicQueueSecond()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_SECOND);
    }

    @Bean
    public Binding bindingFanout() {
        return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutSecond() {
        return BindingBuilder.bind(queueConfig.fanoutQueueSecond()).to(exchangeConfig.fanoutExchange());
    }

    /** ======================== 定制一些處理策略 =============================*/

    /**
     * 定制化amqp模版
     * <p>
     * Rabbit MQ的消息確認有兩種。
     * <p>
     * 一種是消息發送確認:這種是用來確認生產者將消息發送給交換機,交換機傳遞給隊列過程中,消息是否成功投遞。
     * 發送確認分兩步:一是確認是否到達交換機,二是確認是否到達隊列
     * <p>
     * 第二種是消費接收確認:這種是確認消費者是否成功消費了隊列中的消息。
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 消息發送失敗返回到隊列中, yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);

        /**
         * 使用該功能需要開啟消息確認,yml需要配置 publisher-confirms: true
         * 通過實現ConfirmCallBack接口,用於實現消息發送到交換機Exchange后接收ack回調
         * correlationData  消息唯一標志
         * ack              確認結果
         * cause            失敗原因
         */
        rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallback());
        /**
         * 使用該功能需要開啟消息返回確認,yml需要配置 publisher-returns: true
         * 通過實現ReturnCallback接口,如果消息從交換機發送到對應隊列失敗時觸發
         * message    消息主體 message
         * replyCode  消息主體 message
         * replyText  描述
         * exchange   消息使用的交換機
         * routingKey 消息使用的路由鍵
         */
        rabbitTemplate.setReturnCallback(new MsgSendReturnCallback());


        return rabbitTemplate;
    }

}

源碼地址:

https://gitee.com/xiaorenwu_dashije/rabbitmq_demo.git

包含Direct模式、Work模式、Fanout模式、Topic模式


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM