RabbitMq的死信隊列和延遲隊列


死信隊列

DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱。當消息在一個隊列中變成死信(dead message)之后,它能被重新發送到另一個交換機中,這個交換機就是DLX ,綁定DLX的隊列就稱之為死信隊列。

消息變成死信,可能是由於以下的原因:

  • 消息被拒絕
  • 消息過期
  • 隊列達到最大長度

DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的隊列上被指定,實際上就是設置某一個隊列的屬性。當這個隊列中存在死信時,Rabbitmq就會自動地將這個消息重新發布到設置的DLX上去,進而被路由到另一個隊列,即死信隊列。

要想使用死信隊列,只需要在定義隊列的時候設置隊列參數 x-dead-letter-exchange 指定交換機即可。

定義交換機和隊列

/**
 * @author WGR
 * @create 2020/9/2 -- 16:26
 */
@Configuration
public class RabbitMQDLXConfig {

    @Bean("my_dlx_queue")
    public Queue myDlxQueue(){
        return QueueBuilder.durable("my_dlx_queue").build();
    }

    @Bean("my_dlx_exchange")
    public Exchange myDlxExchange(){
        return ExchangeBuilder.directExchange("my_dlx_exchange").durable(true).build();
    }

    //綁定隊列和交換機
    @Bean
    public Binding myTtlDlx1(@Qualifier("my_dlx_queue") Queue queue,
                                       @Qualifier("my_dlx_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange)
                .with("my_ttl_dlx").noargs();
    }

    //綁定隊列和交換機
    @Bean
    public Binding myTtlDlx2(@Qualifier("my_dlx_queue") Queue queue,
                             @Qualifier("my_dlx_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange)
                .with("my_max_dlx").noargs();
    }

    //聲明隊列
    @Bean("my_ttl_dlx_queue")
    public Queue myTtlDlxQueue(){
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","my_dlx_exchange");
        arguments.put("x-dead-letter-routing-key","my_ttl_dlx");
        arguments.put("x-message-ttl",60000);
        return QueueBuilder.durable("my_ttl_dlx_queue").withArguments(arguments).build();
    }

    //聲明隊列
    @Bean("my_max_dlx_queue")
    public Queue myMaxDlxQueue(){
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","my_dlx_exchange");
        arguments.put("x-dead-letter-routing-key","my_max_dlx");
        arguments.put("x-max-length",2);
        return QueueBuilder.durable("my_max_dlx_queue").withArguments(arguments).build();
    }

    @Bean("my_normal_exchange")
    public Exchange myNormalExchange(){
        return ExchangeBuilder.directExchange("my_normal_exchange").durable(true).build();
    }


    //綁定隊列和交換機
    @Bean
    public Binding myTtlDlx3(@Qualifier("my_ttl_dlx_queue") Queue queue,
                             @Qualifier("my_normal_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange)
                .with("my_ttl_dlx").noargs();
    }

    //綁定隊列和交換機
    @Bean
    public Binding myTtlDlx4(@Qualifier("my_max_dlx_queue") Queue queue,
                             @Qualifier("my_normal_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange)
                .with("my_max_dlx").noargs();
    }


}

image-20200902170512538

進行測試:

   /**
     * 過期消息投遞到死信隊列
     * 投遞到一個正常的隊列,但是該隊列有設置過期時間,到過期時間之后消息會被投遞到死信交換機(隊列)
     */
    @Test
    public void dlxTTLMessageTest(){
        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_ttl_dlx",
                "測試過期消息;6秒過期后會被投遞到死信交換機2222");
    }


    /**
     * 消息長度超過2,會投遞到死信隊列中
     */
    @Test
    public void dlxMaxMessageTest(){
        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "發送消息4:消息長度超過2,會被投遞到死信隊列中!");

        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "發送消息5:消息長度超過2,會被投遞到死信隊列中!");

        rabbitTemplate.convertAndSend(
                "my_normal_exchange",
                "my_max_dlx",
                "發送消息6:消息長度超過2,會被投遞到死信隊列中!");

    }

image-20200902170227667

3)流程

具體因為隊列消息過期而被投遞到死信隊列的流程:

image-20200902170652969

延遲隊列

延遲隊列存儲的對象是對應的延遲消息;所謂“延遲消息” 是指當消息被發送以后,並不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。

在RabbitMQ中延遲隊列可以通過 過期時間 + 死信隊列 來實現;具體如下流程圖所示:

1565520000397

在上圖中;分別設置了兩個5秒、10秒的過期隊列,然后等到時間到了則會自動將這些消息轉移投遞到對應的死信隊列中,然后消費者再從這些死信隊列接收消息就可以實現消息的延遲接收。

延遲隊列的應用場景;如:

  • 在電商項目中的支付場景;如果在用戶下單之后的幾十分鍾內沒有支付成功;那么這個支付的訂單算是支付失敗,要進行支付失敗的異常處理(將庫存加回去),這時候可以通過使用延遲隊列來處理
  • 在系統中如有需要在指定的某個時間之后執行的任務都可以通過延遲隊列處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM