【RabbitMQ】一文帶你搞定RabbitMQ延遲隊列


本文口味:魚香肉絲   預計閱讀:10分鍾

一、說明

在上一篇中,介紹了RabbitMQ中的死信隊列是什么,何時使用以及如何使用RabbitMQ的死信隊列。相信通過上一篇的學習,對於死信隊列已經有了更多的了解,這一篇的內容也跟死信隊列息息相關,如果你還不了解死信隊列,那么建議你先進行上一篇文章的閱讀。

這一篇里,我們將繼續介紹RabbitMQ的高級特性,通過本篇的學習,你將收獲:

  1. 什么是延時隊列
  2. 延時隊列使用場景
  3. RabbitMQ中的TTL
  4. 如何利用RabbitMQ來實現延時隊列

二、本文大綱

以下是本文大綱:

1.png

本文閱讀前,需要對RabbitMQ以及死信隊列有一個簡單的了解。

三、什么是延時隊列

延時隊列,首先,它是一種隊列,隊列意味着內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出。

其次,延時隊列,最重要的特性就體現在它的延時屬性上,跟普通的隊列不一樣的是,普通隊列中的元素總是等着希望被早點取出處理,而延時隊列中的元素則是希望被在指定時間得到取出和處理,所以延時隊列中的元素是都是帶時間屬性的,通常來說是需要被處理的消息或者任務。

簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。

四、延時隊列使用場景

那么什么時候需要用延時隊列呢?考慮一下以下場景:

  1. 訂單在十分鍾之內未支付則自動取消。
  2. 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
  3. 賬單在一周內未支付,則自動結算。
  4. 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
  5. 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
  6. 預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參加會議。

這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鍾之后檢查該訂單支付狀態,然后將未支付的訂單進行關閉;發生店鋪創建事件,十天后檢查該店鋪上新商品數,然后通知上新數為0的商戶;發生賬單生成事件,檢查賬單支付狀態,然后自動結算未支付的賬單;發生新用戶注冊事件,三天后檢查新注冊用戶的活動數據,然后通知沒有任何活動記錄的用戶;發生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發送消息給相關運營人員;發生預定會議事件,判斷離會議開始是否只有十分鍾了,如果是,則通知各個與會人員。

看起來似乎使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然后處理不就完事了嗎?如果數據量比較少,確實可以這樣做,比如:對於“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對於時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。但對於數據量比較大,並且時效性較強的場景,如:“訂單十分鍾內未支付則關閉“,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且性能低下。

更重要的一點是,不!優!雅!

沒錯,作為一名有追求的程序員,始終應該追求更優雅的架構和更優雅的代碼風格,寫代碼要像寫詩一樣優美。【滑稽】

這時候,延時隊列就可以閃亮登場了,以上場景,正是延時隊列的用武之地。

既然延時隊列可以解決很多特定場景下,帶時間屬性的任務需求,那么如何構造一個延時隊列呢?接下來,本文將介紹如何用RabbitMQ來實現延時隊列。

五、RabbitMQ中的TTL

在介紹延時隊列之前,還需要先介紹一下RabbitMQ中的一個高級特性——TTL(Time To Live)

TTL是什么呢?TTL是RabbitMQ中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。換句話說,如果一條消息設置了TTL屬性或者進入了設置TTL屬性的隊列,那么這條消息如果在TTL設置的時間內沒有被消費,則會成為“死信”(至於什么是死信,請翻看上一篇)。如果同時配置了隊列的TTL和消息的TTL,那么較小的那個值將會被使用。

那么,如何設置這個TTL值呢?有兩種方式,第一種是在創建隊列的時候設置隊列的“x-message-ttl”屬性,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

這樣所有被投遞到該隊列的消息都最多不會存活超過6s。

另一種方式便是針對每條消息設置TTL,代碼如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

這樣這條消息的過期時間也被設置成了6s。

但這兩種方式是有區別的,如果設置了隊列的TTL屬性,那么一旦消息過期,就會被隊列丟棄,而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間。

另外,還需要注意的一點是,如果不設置TTL,表示消息永遠不會過期,如果將TTL設置為0,則表示除非此時可以直接投遞該消息到消費者,否則該消息將會被丟棄。

六、如何利用RabbitMQ實現延時隊列

前一篇里介紹了如果設置死信隊列,前文中又介紹了TTL,至此,利用RabbitMQ實現延時隊列的兩大要素已經集齊,接下來只需要將它們進行調和,再加入一點點調味料,延時隊列就可以新鮮出爐了。

想想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL則剛好能讓消息在延遲多久之后成為死信,另一方面,成為死信的消息都會被投遞到死信隊列里,這樣只需要消費者一直消費死信隊列里的消息就萬事大吉了,因為里面的消息都是希望被立即處理的消息。

從下圖可以大致看出消息的流向:

23.png

生產者生產一條延時消息,根據需要延時時間的不同,利用不同的routingkey將消息路由到不同的延時隊列,每個隊列都設置了不同的TTL屬性,並綁定在同一個死信交換機中,消息過期后,根據routingkey的不同,又會被路由到不同的死信隊列中,消費者只需要監聽對應的死信隊列進行處理即可。

下面來看代碼:

先聲明交換機、隊列以及他們的綁定關系:

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";
    public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";
    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";

    // 聲明延時Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明延時隊列A 延時10s
    // 並綁定到對應的死信交換機
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    這里聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        // x-message-ttl  聲明隊列的TTL
        args.put("x-message-ttl", 6000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    // 聲明延時隊列B 延時 60s
    // 並綁定到對應的死信交換機
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    這里聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        // x-message-ttl  聲明隊列的TTL
        args.put("x-message-ttl", 60000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
    }

    // 聲明死信隊列A 用於接收延時10s處理的消息
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 聲明死信隊列B 用於接收延時60s處理的消息
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 聲明延時隊列A綁定關系
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    }

    // 聲明業務隊列B綁定關系
    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    }

    // 聲明死信隊列A綁定關系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 聲明死信隊列B綁定關系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

接下來,創建兩個消費者,分別對兩個死信隊列的消息進行消費:

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

然后是消息的生產者:

@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }
    }
}

接下來,我們暴露一個web接口來生產消息:

@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg, Integer delayType){
        log.info("當前時間:{},收到請求,msg:{},delayType:{}", new Date(), msg, delayType);
        sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType)));
    }
}

准備就緒,啟動!

打開rabbitMQ的管理后台,可以看到我們剛才創建的交換機和隊列信息:

2.png

4.png

3.png

接下來,我們來發送幾條消息,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1 http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2

日志如下:

2019-07-28 16:02:19.813  INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:02:19 CST 2019,收到請求,msg:testMsg1,delayType:1
2019-07-28 16:02:19.815  INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started
2019-07-28 16:02:25.829  INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:02:25 CST 2019,死信隊列A收到消息:testMsg1
2019-07-28 16:02:41.326  INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:02:41 CST 2019,收到請求,msg:testMsg2,delayType:2
2019-07-28 16:03:41.329  INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:03:41 CST 2019,死信隊列B收到消息:testMsg2

第一條消息在6s后變成了死信消息,然后被消費者消費掉,第二條消息在60s之后變成了死信消息,然后被消費掉,這樣,一個還算ok的延時隊列就打造完成了。

不過,等等,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這里只有6s和60s兩個時間選項,如果需要一個小時后處理,那么就需要增加TTL為一個小時的隊列,如果是預定會議室然后提前通知這樣的場景,豈不是要增加無數個隊列才能滿足需求??

嗯,仔細想想,事情並不簡單。

七、RabbitMQ延時隊列優化

顯然,需要一種更通用的方案才能滿足需求,那么就只能將TTL設置在消息屬性里了。我們來試一試。

增加一個延時隊列,用於接收設置為任意延時時長的消息,增加一個相應的死信隊列和routingkey:

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";
    public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";
    public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";

    // 聲明延時Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明延時隊列C 不設置TTL
    // 並綁定到對應的死信交換機
    @Bean("delayQueueC")
    public Queue delayQueueC(){
        Map<String, Object> args = new HashMap<>(3);
        // x-dead-letter-exchange    這里聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這里聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }

    // 聲明死信隊列C 用於接收延時任意時長處理的消息
    @Bean("deadLetterQueueC")
    public Queue deadLetterQueueC(){
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }

    // 聲明延時列C綁定關系
    @Bean
    public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }

    // 聲明死信隊列C綁定關系
    @Bean
    public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }
}

增加一個死信隊列C的消費者:

@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("當前時間:{},死信隊列C收到消息:{}", new Date().toString(), msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

再次啟動!然后訪問:http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000 來生產消息,注意這里的單位是毫秒。

2019-07-28 16:45:07.033  INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:45:07 CST 2019,收到請求,msg:testMsg1,delayTime:5000
2019-07-28 16:45:11.694  INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:45:11 CST 2019,收到請求,msg:testMsg2,delayTime:5000
2019-07-28 16:45:12.048  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:45:12 CST 2019,死信隊列C收到消息:testMsg1
2019-07-28 16:45:16.709  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:45:16 CST 2019,死信隊列C收到消息:testMsg2

看起來似乎沒什么問題,但不要高興的太早,在最開始的時候,就介紹過,如果使用在消息屬性上設置TTL的方式,消息可能並不會按時“死亡“,因為RabbitMQ只會檢查第一個消息是否過期,如果過期則丟到死信隊列,索引如果第一個消息的延時時長很長,而第二個消息的延時時長很短,則第二個消息並不會優先得到執行。

實驗一下:

2019-07-28 16:49:02.957  INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:49:02 CST 2019,收到請求,msg:longDelayedMsg,delayTime:20000
2019-07-28 16:49:10.671  INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:49:10 CST 2019,收到請求,msg:shortDelayedMsg,delayTime:2000
2019-07-28 16:49:22.969  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:49:22 CST 2019,死信隊列C收到消息:longDelayedMsg
2019-07-28 16:49:22.970  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:49:22 CST 2019,死信隊列C收到消息:shortDelayedMsg

我們先發了一個延時時長為20s的消息,然后發了一個延時時長為2s的消息,結果顯示,第二個消息會在等第一個消息成為死信后才會“死亡“。

八、利用RabbitMQ插件實現延遲隊列

上文中提到的問題,確實是一個硬傷,如果不能實現在消息粒度上添加TTL,並使其在設置的TTL時間及時死亡,就無法設計成一個通用的延時隊列。

那如何解決這個問題呢?不要慌,安裝一個插件即可:https://www.rabbitmq.com/community-plugins.html ,下載rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄。

接下來,進入RabbitMQ的安裝目錄下的sbin目錄,執行下面命令讓該插件生效,然后重啟RabbitMQ。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后,我們再聲明幾個Bean:

@Configuration
public class DelayedRabbitMQConfig {
    public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
    public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";

    @Bean
    public Queue immediateQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

controller層再添加一個入口:

@RequestMapping("delayMsg2")
public void delayMsg2(String msg, Integer delayTime) {
    log.info("當前時間:{},收到請求,msg:{},delayTime:{}", new Date(), msg, delayTime);
    sender.sendDelayMsg(msg, delayTime);
}

消息生產者的代碼也需要修改:

public void sendDelayMsg(String msg, Integer delayTime) {
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
        a.getMessageProperties().setDelay(delayTime);
        return a;
    });
}

最后,再創建一個消費者:

@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveD(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("當前時間:{},延時隊列收到消息:{}", new Date().toString(), msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

一切准備就緒,啟動!然后分別訪問以下鏈接:

http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000
http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000

日志如下:

2019-07-28 17:28:13.729  INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 17:28:13 CST 2019,收到請求,msg:msg1,delayTime:20000
2019-07-28 17:28:20.607  INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 17:28:20 CST 2019,收到請求,msg:msg2,delayTime:2000
2019-07-28 17:28:22.624  INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 17:28:22 CST 2019,延時隊列收到消息:msg2
2019-07-28 17:28:33.751  INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 17:28:33 CST 2019,延時隊列收到消息:msg1

第二個消息被先消費掉了,符合預期。至此,RabbitMQ實現延時隊列的部分就完結了。

九、總結

延時隊列在需要延時處理的場景下非常有用,使用RabbitMQ來實現延時隊列可以很好的利用RabbitMQ的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。

當然,延時隊列還有很多其它選擇,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特點,但就像爐石傳說一般,這些知識就好比手里的卡牌,知道的越多,可以用的卡牌也就越多,遇到問題便能游刃有余,所以需要大量的知識儲備和經驗積累才能打造出更出色的卡牌組合,讓自己解決問題的能力得到更好的提升。

但另一方面,隨着時間的流逝和閱歷的增長,越來越感覺到自己的能力有限,無法獨自面對紛繁復雜且多變的業務需求,在很多方面需要其他人的協助才能很好的完成任務。也知道聞道有先后,術業有專攻,不會再狂妄自大,覺得自己能把所有事情都搞定,也將重心慢慢轉移到研究如何有效的進行團隊合作上來,我相信一個高度協調的團隊永遠比一個人戰斗要更有價值。

花了一個周末的時間完成了這篇文章,文中所有的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查閱,希望能對你有幫助,如果有錯誤的地方,歡迎指正,也歡迎關注我的公眾號進行留言交流。

TIM圖片20190714173105.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM