如何用RabbitMQ實現延遲隊列


前言

jdkjuc 工具包中,提供了一種延遲隊列 DelayQueue。延遲隊列用處非常廣泛,比如我們最常見的場景就是在網購或者外賣平台中發起一個訂單,如果不付款,一般 15 分鍾后就會被關閉,這個直接用定時任務是不好實現的,因為每個用戶下單的時間並不確定,所以這時候就需要用到延遲隊列。

什么是延遲隊列

延遲隊列本身也是隊列,只不過這個隊列是延遲的,意思就是說當我們把一條消息放入延遲隊列,消息並不會立刻出隊,而是會在到達指定時間之后(或者說過了指定時間)才會出隊,從而被消費者消費。

利用死信隊列實現延遲隊列

RabbitMQ 中的死信隊列就是用來存儲特定條件下的消息,那么假如我們把這個條件設定為指定時間過期(設定帶TTL 的消息或者隊列),就可以用來實現延遲隊列的功能。

  1. 新建一個 TtlDelayRabbitConfig 配置類(省略了包名和導入),消息最開始發送至 ttl 消息隊列,這個隊列中所有的消息在 5 秒后過期,后期后會進入死信隊列:
@Configuration
public class TtlDelayRabbitConfig {

    //路由ttl消息交換機
    @Bean("ttlDelayFanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("TTL_DELAY_FANOUT_EXCHANGE");
    }

    //ttl消息隊列
    @Bean("ttlDelayQueue")
    public Queue ttlQueue(){
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-message-ttl", 5000);//隊列中所有消息5秒后過期
        map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//過期后進入死信隊列
        return new Queue("TTL_QUEUE",false,false,false,map);
    }

    //Fanout交換機和productQueue綁定
    @Bean
    public Binding bindTtlFanoutExchange(@Qualifier("ttlDelayQueue") Queue queue, @Qualifier("ttlDelayFanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    //fanout死信交換機
    @Bean("ttlDelayDeadLetterExchange")
    public FanoutExchange deadLetterExchange(){
        return new FanoutExchange("TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");
    }

    //死信隊列
    @Bean("ttlDelayDeadLetterQueue")
    public Queue ttlDelayDeadLetterQueue(){
        return new Queue("TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE");
    }

    //死信隊列和死信交換機綁定
    @Bean
    public Binding deadLetterQueueBindExchange(@Qualifier("ttlDelayDeadLetterQueue") Queue queue, @Qualifier("ttlDelayDeadLetterExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

  1. 新建一個消費者 TtlDelayConsumer 類,監聽死信隊列,這里收到的消息都是生產者生產消息之后的 5 秒,也就是延遲了 5 秒的消息:
@Component
public class TtlDelayConsumer {

    @RabbitHandler
    @RabbitListener(queues = "TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE")
    public void fanoutConsumer(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("【延遲隊列】【" + sdf.format(new Date()) + "】收到死信隊列消息:" + msg);
    }
}
  1. 新建一個 DelayQueueController 類做生產者來發送消息:
@RestController
@RequestMapping("/delay")
public class DelayQueueController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="/ttl/send")
    public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){
        rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",msg);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息發送成功【" + sdf.format(new Date()) + "】");
        return "succ";
    }
}
  1. 最后我們在瀏覽器輸入地址 http://localhost:8080/delay/ttl/send?msg=測試ttl延遲隊列 進行測試,可以看到每條消息都是在發送 5 秒之后才能收到消息:

TTL 延遲隊列的問題

假如我們實際中,有的消息是 10 分鍾過期,有的是 20 分鍾過期,這時候我們就需要建立多個隊列,一旦時間維度非常龐大,那么就需要維護非常多的隊列。說到這里,可能很多人會有疑問,我們可以針對單條信息設置過期時間,大可不必去定義多個隊列?

然而事實真的是如此嗎?接下來我們通過一個例子來驗證下。

  1. 把上面示例中 TtlDelayRabbitConfig 類中的隊列定義函數 x-message-ttl 屬性去掉,不過需要注意的是我們需要先把這個隊列后台刪除掉,否則同名隊列重復創建無效:
@Bean("ttlDelayQueue")
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<String, Object>();
    //        map.put("x-message-ttl", 5000);//注釋掉這個屬性,隊列不設置過期時間
    map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//過期后進入死信隊列
    return new Queue("TTL_QUEUE",false,false,false,map);
}
  1. 然后將 DelayQueueController 類中的發送消息方法修改一下,對每條信息設置過期時間:
@GetMapping(value="/ttl/send")
    public String ttlMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg,
                             @RequestParam(value = "time") String millTimes){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration(millTimes);//單條消息設置過期時間,單位:毫秒
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",message);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息發送成功【" + sdf.format(new Date()) + "】");
        return "succ";
    }
  1. 然后執行 2 條消息發送,一條 10 秒過期,一條 5 秒過期,先發送 10 秒的:
http://localhost:8080/delay/ttl/send?msg=10秒過期消息&time=10000
http://localhost:8080/delay/ttl/send?msg=5秒過期消息&time=5000
  1. 執行之后得到如下結果:

我們看到,兩條消息都是 10 秒后過期,這是巧合嗎?並不是,這是因為 RabbitMQ 中的機制就是如果前一條消息沒有出隊,那么即使后一條消息已經失效,也必須要等前一條消息出隊之后才能出隊,所以這就是為什么一般都盡量避免同一個隊列單條消息設置不同過期時間的做法。

死信隊列實現的延遲隊列缺點

通過以上兩個例子,使用死信隊列來實現延遲隊列,我們可以得到幾個很明顯的缺點:

  • 如果有非常多的時間點(比如有的 10 分鍾過期,有的 20 分鍾過期等),則需要創建不同的交換機和隊列來實現消息的路由。
  • 單獨設置消息的 TTL 時可能會造成消息的阻塞。因為當前一條消息沒有出隊,后一條消息即使到期了也不能出隊。
  • 消息可能會有一定的延遲(上面的示例中就可以看到有一點延遲)。

為了避免 TTL 和死信隊列可能造成的問題,所以就非常有必要用一種新的更好的方案來替代實現延遲隊列,這就是延時隊列插件。

利用插件實現延遲隊列

RabbitMQ3.5.7 版本之后,提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列 ,同時需保證 Erlang/OPT 版本為 18.0 之后。

安裝延遲隊列插件

  1. RabbitMQ版本在 3.5.7-3.7.x 的可以執行以下命令進行下載(也可以直接通過瀏覽器下載):
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果 RabbitMQ3.8 之后的版本,可以點擊這里,找到延遲隊列對應版本的插件,然后下載。

  1. 下載好之后,將插件上傳到 plugins 目錄下,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令啟動插件。如果要禁止該插件,則可以執行命令 rabbitmq-plugins disable rabbitmq_delayed_message_exchange(啟用插件后需要重啟 RabbitMQ 才會生效)。

延遲隊列插件示例

  1. 新建一個 PluginDelayRabbitConfig 配置類:
@Configuration
public class PluginDelayRabbitConfig {
    @Bean("pluginDelayExchange")
    public CustomExchange pluginDelayExchange() {
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");//必須要配置這個類型,可以是direct,topic和fanout
        //第二個參數必須為x-delayed-message
        return new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap);
    }

    @Bean("pluginDelayQueue")
    public Queue pluginDelayQueue(){
        return new Queue("PLUGIN_DELAY_QUEUE");
    }

    @Bean
    public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue,@Qualifier("pluginDelayExchange") CustomExchange customExchange){
        return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs();
    }
}
  1. 新建一個消費者類 PluginDelayConsumer
@Component
public class PluginDelayConsumer {

    @RabbitHandler
    @RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//監聽延時隊列
    public void fanoutConsumer(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("【插件延遲隊列】【" + sdf.format(new Date()) + "】收到消息:" + msg);
    }
}
  1. 在上面示例中的 DelayQueueController 類,新增一個方法:
@GetMapping(value="/plugin/send")
public String pluginMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("x-delay",5000);//延遲5秒被刪除
    Message message = new Message(msg.getBytes(), messageProperties);
    amqpTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE","delay",message);//交換機和路由鍵必須和配置文件類中保持一致
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("消息發送成功【" + sdf.format(new Date()) + "】");
    return "succ";
}
  1. 接下來就可以訪問地址 http://localhost:8080/delay/plugin/send?msg=插件延遲隊列消息 進行測試,可以看到,消息在延時 5 秒之后被消費:

總結

延遲隊列的使用非常廣泛,如果是單機部署,可以考慮使用 jdk 自帶的 DelayQueue,分布式部署可以采用 RabbitMQRedis 等中間件來實現延遲隊列。本文主要介紹了如何利用 RabbitMQ 實現兩種延遲隊列的兩種方案,當然本文的例子只是引導,並沒有開啟回調等消息確認模式,如果想了解 RabbitMQ 消息的可靠性傳輸的,可以點擊這里


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM