【RabbitMQ】一文帶你搞定springboot整合RabbitMQ涉及消息的發送確認,消息的消費確認機制,延時隊列的實現


說明

這一篇里,我們將繼續介紹RabbitMQ的高級特性,通過本篇的學習,你將收獲:

  • 什么是延時隊列
  • 延時隊列使用場景
  • RabbitMQ中的TTL
  • 如何利用RabbitMQ來實現延時隊列

本文大綱

什么是延遲隊列

延時隊列,首先,它是一種隊列,隊列意味着內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出。

其次,延時隊列,最重要的特性就體現在它的延時屬性上,跟普通的隊列不一樣的是,普通隊列中的元素總是等着希望被早點取出處理,而延時隊列中的元素則是希望被在指定時間得到取出和處理,所以延時隊列中的元素是都是帶時間屬性的,通常來說是需要被處理的消息或者任務。

簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列

延遲隊里的使用場景

那么什么時候需要用延時隊列呢?考慮一下以下場景:

  • 訂單在十分鍾之內未支付則自動取消。
  • 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
  • 賬單在一周內未支付,則自動結算。
  • 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
  • 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
  • 預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參加會議。
    這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鍾之后檢查該訂單支付狀態,然后將未支付的訂單進行關閉;發生店鋪創建事件,十天后檢查該店鋪上新商品數,然后通知上新數為0的商戶;發生賬單生成事件,檢查賬單支付狀態,然后自動結算未支付的賬單;發生新用戶注冊事件,三天后檢查新注冊用戶的活動數據,然后通知沒有任何活動記錄的用戶;發生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發送消息給相關運營人員;發生預定會議事件,判斷離會議開始是否只有十分鍾了,如果是,則通知各個與會人員。

看起來似乎使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然后處理不就完事了嗎?如果數據量比較少,確實可以這樣做,比如:對於“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對於時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。但對於數據量比較大,並且時效性較強的場景,如:“訂單十分鍾內未支付則關閉“,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且性能低下。

更重要的一點是,不!優!雅!

沒錯,作為一名有追求的程序員,始終應該追求更優雅的架構和更優雅的代碼風格,寫代碼要像寫詩一樣優美。【滑稽】

這時候,延時隊列就可以閃亮登場了,以上場景,正是延時隊列的用武之地。

既然延時隊列可以解決很多特定場景下,帶時間屬性的任務需求,那么如何構造一個延時隊列呢?接下來,本文將介紹如何用RabbitMQ來實現延時隊列。

RabbitMq中的TTL

在介紹延時隊列之前,還需要先介紹一下RabbitMQ中的一個高級特性——TTL(Time To Live)。

TTL是什么呢?TTL是RabbitMQ中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。換句話說,如果一條消息設置了TTL屬性或者進入了設置TTL屬性的隊列,那么這條消息如果在TTL設置的時間內沒有被消費,則會成為“死信”(至於什么是死信,請翻看上一篇)。如果同時配置了隊列的TTL和消息的TTL,那么較小的那個值將會被使用。
那么,如何設置這個TTL值呢?有兩種方式,第一種是在創建隊列的時候設置隊列的“x-message-ttl”屬性,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

這樣所有被投遞到該隊列的消息都最多不會存活超過6s。
另一種方式便是針對每條消息設置TTL,代碼如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

這樣這條消息的過期時間也被設置成了6s。

但這兩種方式是有區別的,如果設置了隊列的TTL屬性,那么一旦消息過期,就會被隊列丟棄,而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間。

另外,還需要注意的一點是,如果不設置TTL,表示消息永遠不會過期,如果將TTL設置為0,則表示除非此時可以直接投遞該消息到消費者,否則該消息將會被丟棄。

思路:rabbitMQ 如何實現

1、rabbitMQ為每個隊列設置消息的超時時間。只要給隊列設置x-message-ttl 參數,就設定了該隊列所有消息的存活時間,時間單位是毫秒。如果聲明隊列時指定了死信交換器,則過期消息會成為死信消息
2、需要設置的參數為:

原理:上圖


1、將延遲隊列(queue)在聲明的時候設置參數 “ x-dead-letter-exchange ”,“ x-message-ttl “ 分別對應 死信路由器(dlx_exchange) 和 消息過期時間(比如說30分鍾)。
2、一個消息從生產者發送到延遲隊列 ,在延遲隊列里等待,等待30分鍾后,會去綁定的死信路由(dlx_exchange)。通過死信路由的規則,走到死信隊列。
3、這時候監聽死信隊列的消費者就可以接收到消息,消費消息。比如說查看該訂單是否支付,如果沒有支付,則關閉該訂單。

實戰演練

springboot整合RabbitMQ涉及消息的發送確認,消息的消費確認機制

1.引入maven依賴
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在application.yml的配置:
spring:
  rabbitmq:
    host: 106.52.82.241
    port: 5672
    username: yang
    password: Yangxiaohui227
    virtual-host: /
    publisher-confirms: true #消息發送后,如果發送成功到隊列,則會回調成功信息
    publisher-returns: true  #消息發送后,如果發送失敗,則會返回失敗信息信息
    listener:  #加了2下面2個屬性,消費消息的時候,就必須發送ack確認,不然消息永遠還在隊列中
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
//為了統一管理所有的Mq消息,建一個類存儲常量,消息的設計都基本會涉及(隊列(queue),交換機(exchange),路由鍵(route)三個值)
public class RabbitMqConstant {

    //下單發送消息 隊列名,交換機名,路由鍵的配置
    public final static String SHOP_ORDER_CREATE_EXCHANGE="shop.order.create.exchange";
    public final static String SHOP_ORDER_CREATE_ROUTE="shop.order.create.route";
    public final static String SHOP_ORDER_CREATE_QUEUE="shop.order.create.queue";
}
package com.example.demo.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//該類是mq最重要的一個類,所有隊列的創建,交換機的創建,隊列和交換機的綁定都在這里實現
@Configuration
public class RabbitMqConfig {
    private final static Logger log = LoggerFactory.getLogger(RabbitMqConfig.class);
    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 單一消費者
     *
     * @return
     */

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        return factory;
    }

    /**
     * 多個消費者
     *
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(20);
        factory.setMaxConcurrentConsumers(20);
        factory.setPrefetchCount(20);
        return factory;
    }

    /**
     * 模板的初始化配置
     *
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean sucess, String cause) {
                if (sucess) {
                    log.info("消息發送成功:correlationData({}),ack({}),cause({})", correlationData, sucess, cause);
                }

            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }



    //消息的創建設計三個步驟:隊列的創建,交換機創建(direct類型,topic類型,fanout類型),隊列和交換機的通過路由鍵的綁定


    //--------- 下單消息配置
    //隊列
    @Bean
    public Queue shopOrderCreateQueue() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true);
    }

    //Direct交換機(一對一關系,一個direct交換機只能綁定一個隊列,當有2個相同消費者時,如項目部署2台機,只有一個消費者能消費,)
    @Bean
    DirectExchange shopOrderCreateExchange() {
        return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE);
    }

    //綁定
    @Bean
    Binding bindShopOrderCreateQueue() {
        return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE);
    }
}
import com.alibaba.fastjson.JSON;
import com.example.demo.domain.ShopOrderMast;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
//專門用一個類作為消息的生產者
@Service
public class ShopMessagePublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCreateOrderMessage(ShopOrderMast orderMast){
        CorrelationData correlationData=new CorrelationData(); //該參數可以傳,可以不傳,不傳時,correlationData的id值默認是null,消息發送成功后,在RabbitMqConfig類的rabbitTemplate類的confirm方法會接收到該值
        correlationData.setId(orderMast.getCodOrderId()); 
        String msg = JSON.toJSONString(orderMast);
        //convertAndSend該方法有非常多的重構方法,找到適合自己的業務方法就行了,這里我用的是其中一個,發送時指定exchange和route值,這樣就會發到對應的隊列去了
        rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE,msg,correlationData);

    }
}
//所有的消費都寫在一個消費類中
@Service
public class ShopMessageComsumer {
    //監聽下單消息
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE)
    public void createOrderMesaageComsumer(String msg, Channel channel, Message message) {
        try {
                //消息可以通過msg獲取也可以通過message的body屬性獲取
                System.out.println("開始消費了");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);


            /**
             * 因為我在application.yml那里配置了消息手工確認也就是傳說中的ack,所以消息消費后必須發送確認給mq
             * 很多人不理解ack(消息消費確認),以為這個確認是告訴消息發送者的,這個是錯的,這個ack是告訴mq服務器,
             * 消息已經被我消費了,你可以刪除它了
             * 如果沒有發送basicAck的后果是:每次重啟服務,你都會接收到該消息
             * 如果你不想用確認機制,就去掉application.yml的acknowledge-mode: manual配置,該配置默認
             * 是自動確認auto,去掉后,下面的channel.basicAck就不用寫了
             *
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出現異常,告訴mq拋棄該消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }
}
//這里我發送了一條消息,orderId我設置為555556666666,在消息發送時,存到了CorrelationData對象中,因此,發送成功后,在confirm方法可以接收到該值了
//消息發送成功后,在控制台會看到有成功的回調信息,也就是回調了rabbitTemplate的:
confirm(CorrelationData correlationData, boolean sucess, String cause)


//上面測試的下單消息是direct類型消息的,現在創建一個topic消息

//RabbitMqConstant新增topic的配置信息
//下單topic消息:路由鍵的名字 星號* 代表多個字符,#號代表一個字符
    //topic交換機,發送消息時,發送到指定shop.order.create.topic.exchange和shop.order.create.topic.route中
    public final static String SHOP_ORDER_CREATE_TOPIC_EXCHANGE="shop.order.create.topic.exchange";
    public final static String SHOP_ORDER_CREATE_TOPIC_TOUTE="shop.order.create.topic.route";


    //隊列1,通過shop.order.create.topic.*與交換機綁定
    public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE="shop.order.create.topic.*";
    public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE="shop.order.create.topic.queue.one";


    //隊列2 通過shop.order.create.topic.*與交換機綁定shop.order.create.topic.#
    public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO="shop.order.create.topic.#";
    public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO="shop.order.create.topic.queue.two";
//在RabbitMqConfig新增topic隊列的基本信息
//-------------------------下單TOPIC消息的創建

    //創建TOPIC交換機
    @Bean
    TopicExchange shopOrderCreateTopicExchange() {
        return new TopicExchange(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE);
    }
    //---------------------------//隊列1使用自己的route和交換機綁定
    //創建隊列1
    @Bean
    public Queue shopOrderCreateQueueOne() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE, true);
    }
    //綁定
    @Bean
    Binding bindShopOrderCreateQueueOne() {
        return BindingBuilder.bind(shopOrderCreateQueueOne()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE);
    }

    //---------------------------//隊列2用自己的route和交換機綁定

    //創建隊列2
    @Bean
    public Queue shopOrderCreateQueueTWO() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO, true);
    }

    //綁定
    @Bean
    Binding bindShopOrderCreateQueueTWO() {
        return BindingBuilder.bind(shopOrderCreateQueueTWO()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO);
    }

//消息的發送方新增
  //發送TOPIC消息
    public void sendCreateOrderTOPICMessage(ShopOrderMast orderMast){
        CorrelationData correlationData=new CorrelationData(); //該參數可以傳,可以不傳,不傳時,correlationData的id值默認是null,消息發送成功后,在RabbitMqConfig類的rabbitTemplate類的confirm方法會接收到該值
        correlationData.setId(orderMast.getCodOrderId());
        String msg = JSON.toJSONString(orderMast);
        //消息發送使用公共route而不是某個隊列自己的route
        rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_TOUTE,msg,correlationData);

    }
//消息的消費方新增
//消費者1
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE)
    public void createOrderMesaageComsumerOne(String msg, Channel channel, Message message) {
        try {
                //消息可以通過msg獲取也可以通過message對象的body值獲取
                System.out.println("我是消費者1");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);


            /**
             * 因為我在application.yml那里配置了消息手工確認也就是傳說中的ack,所以消息消費后必須發送確認給mq
             * 很多人不理解ack(消息消費確認),以為這個確認是告訴消息發送者的,這個是錯的,這個ack是告訴mq服務器,
             * 消息已經被我消費了,你可以刪除它了
             * 如果沒有發送basicAck的后果是:每次重啟服務,你都會接收到該消息
             * 如果你不想用確認機制,就去掉application.yml的acknowledge-mode: manual配置,該配置默認
             * 是自動確認auto,去掉后,下面的channel.basicAck就不用寫了
             *
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出現異常,告訴mq拋棄該消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }
    //消費者2
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO)
    public void createOrderMesaageComsumerTWO(String msg, Channel channel, Message message) {
        try {
                //消息可以通過msg獲取也可以通過message對象的body值獲取
                System.out.println("我是消費者2");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);


            /**
             * 因為我在application.yml那里配置了消息手工確認也就是傳說中的ack,所以消息消費后必須發送確認給mq
             * 很多人不理解ack(消息消費確認),以為這個確認是告訴消息發送者的,這個是錯的,這個ack是告訴mq服務器,
             * 消息已經被我消費了,你可以刪除它了
             * 如果沒有發送basicAck的后果是:每次重啟服務,你都會接收到該消息
             * 如果你不想用確認機制,就去掉application.yml的acknowledge-mode: manual配置,該配置默認
             * 是自動確認auto,去掉后,下面的channel.basicAck就不用寫了
             *
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出現異常,告訴mq拋棄該消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }

//測試結果:

springboot整合RabbitMQ延時隊列的實現

延時隊列:將消息發送到一個隊列,等過了一段時間后,該隊列會將消息轉發到真正的隊列消費,業務場景可以用於訂單定時取消

//在RabbitMqConstant類添加如下內容
 //延時隊列,消息先發到延時隊列中,到時間后,再發送到真正的隊列

    public final static String SHOP_ORDER_CREATE_DELAY_EXCHANGE="shop.order.create.delay.exchange";
    public final static String SHOP_ORDER_CREATE_DELAY_ROUTE="shop.order.create.delay.route";
    public final static String SHOP_ORDER_CREATE_DELAY_QUEUE="shop.order.create.delay.queue";

    //真正的隊列

    public final static String SHOP_ORDER_CREATE_REAL_EXCHANGE="shop.order.create.real.exchange";
    public final static String SHOP_ORDER_CREATE_REAL_ROUTE="shop.order.create.real.route";
    public final static String SHOP_ORDER_CREATE_REAL_QUEUE="shop.order.create.real.queue";
//在RabbitMqConfig加上
 //----------------------- 延時隊列的配置

    //延時隊列
    @Bean
    public Queue shopOrderCreateDelayQueue() {
        Map<String, Object> argsMap= Maps.newHashMap();
        argsMap.put("x-dead-letter-exchange",RabbitMqConstant.SHOP_ORDER_CREATE_REAL_EXCHANGE); //真正的交換機
        argsMap.put("x-dead-letter-routing-key",RabbitMqConstant.SHOP_ORDER_CREATE_REAL_ROUTE); //真正的路由鍵
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_QUEUE,true,false,false,argsMap);

    }
    //延時交換機
    @Bean
    DirectExchange shopOrderCreateDelayExchange() {
        return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_EXCHANGE);
    }

    //延時隊列綁定延時交換機
    @Bean
    Binding bindShopOrderCreateDelayQueue() {
        return BindingBuilder.bind(shopOrderCreateDelayQueue()).to(shopOrderCreateDelayExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_ROUTE);
    }


    //真正的隊列配置-------------------------------------


    //真正的隊列
    @Bean
    public Queue shopOrderCreateRealQueue() {

        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_QUEUE,true);

    }
    //真正的交換機
    @Bean
    DirectExchange shopOrderCreateRealExchange() {
        return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_EXCHANGE);
    }

    //綁定真正的交換機
    @Bean
    Binding bindShopOrderCreateRealQueue() {
        return BindingBuilder.bind(shopOrderCreateRealQueue()).to(shopOrderCreateRealExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_REAL_ROUTE);
    }
//在消息發送類(ShopMessagePublisher)新增
 //發送延時消息
    public void sendCreateOrderDelayMessage(ShopOrderMast orderMast){
        CorrelationData correlationData=new CorrelationData(); //該參數可以傳,可以不傳,不傳時,correlationData的id值默認是null,消息發送成功后,在RabbitMqConfig類的rabbitTemplate類的confirm方法會接收到該值
        correlationData.setId(orderMast.getCodOrderId());
        String msg = JSON.toJSONString(orderMast);
        // convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData)
        rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_EXCHANGE, RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_ROUTE, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setExpiration("60000");//單位是毫秒
                return message;
            }
        }, correlationData);

    }
//在消費類(ShopMessageComsumer) 新增
 //延遲隊列中真正隊列監聽
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_REAL_QUEUE)
    public void createOrderRealMesaageComsumer(String msg, Channel channel, Message message) {
        try {

                System.out.println("這是真正的隊列,在監聽延時隊列發送的消息");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);



            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出現異常,告訴mq拋棄該消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }

注意,如果同時使用了延時隊列的queue去接收,那么消息會被延遲隊列的消費者消費,而不是被真正的queue消費

//如果在延遲隊列消費時,加了下面這個隊列,上面那個真正的消費者就接收不到消息了
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_DELAY_QUEUE)
    public void createOrderDelayMesaageComsumer(String msg, Channel channel, Message message) {
        try {
                System.out.println("測試延遲隊列自己能否接收");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出現異常,告訴mq拋棄該消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }

補充:對於direct和topic交換機,如果部署多台相同queue的消費者,消息也只會消費一次,通過輪詢的方式進行負債均衡

如何在rabbitMq管理頁面查看沒有還沒被消費的消息信息:




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM