rabbitmq 延遲隊列


延時隊列

  隊列內部是有序的,最重要的特性就體現在它的延時屬性上,延時隊列中的元素是希望 在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的 元素的隊列。

延遲隊列使用場景

  1.訂單在十分鍾之內未支付則自動取消

  2.新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。

  3.用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。

  4.用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。

  5.預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參加會議

首先要只要一個rabbitmq的特性TTL

TTL是什么呢?TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有 消息的最大存活時間,

單位是毫秒。換句話說,如果一條消息設置了 TTL 屬性或者進入了設置TTL 屬性的隊列,那么這 條消息如果在TTL 設置的時間內沒有被消費,則會成為"死信"。如果同時配置了隊列的TTL 和消息的 TTL,那么較小的那個值將會被使用,有兩種方式設置 TTL

  1 消息設置TTL

 

 

 

  2 隊列設置TTL

 

 

 

 兩者的區別:

  如果設置了隊列的 TTL 屬性,那么一旦消息過期,就會被隊列丟棄(如果配置了死信隊列被丟到死信隊 列中),而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者 之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需 要注意的一點是,如果不設置 TTL,表示消息永遠不會過期,如果將 TTL 設置為 0,則表示除非此時可以 直接投遞該消息到消費者,否則該消息將會被丟棄。 前一小節我們介紹了死信隊列,剛剛又介紹了 TTL,至此利用 RabbitMQ 實現延時隊列的兩大要素已 經集齊,接下來只需要將它們進行融合,再加入一點點調味料,延時隊列就可以新鮮出爐了。想想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL 則剛好能讓消息在延遲多久之后成為死信,另一方面, 成為死信的消息都會被投遞到死信隊列里,這樣只需要消費者一直消費死信隊列里的消息就完事了,因為 里面的消息都是希望被立即處理的消息

實現:

  創建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設置為 10S 和 40S, 且兩個隊列都沒有消費者 ,然后在創建一個普通交換機 X 和死信交換機 Y,它們的類型都是direct,創建一個死信隊列 QD,它們的綁定關系如下

 

 

 面臨的問題:

  舉個例子: 如果有兩條消息 從X交換機 到QA隊列 , 我們對這兩條消息設置TTL ,   "消息1" 延遲40秒 , "消息2"延遲10秒 , 最后會發現 兩條消息都會延遲40秒 , 看起來 "消息1" 擋住了路 , 沒有讓 延遲比它低的"消息2" 提前過去;

      原因:因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列, 如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息並不會優先得到執行。

 

解決辦法: 

  上文中提到的問題,確實是一個問題,如果不能實現在消息粒度上的 TTL,並使其在設置的TTL 時間 及時死亡,就無法設計成一個通用的延時隊列。那如何解決呢,接下來我們就去解決該問題。

  Rabbitmq 插件實現延遲隊列 !!!

  在官網上下載 https://www.rabbitmq.com/community-plugins.html,

  下載 rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄。

  進入 RabbitMQ 的安裝目錄下的 plgins 目錄,執行下面命令讓該插件生效,然后重啟 RabbitMQ /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

  rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

使用:

  

 

    在我們自定義的交換機中,這是一種新的交換類型,該類型消息支持延遲投遞機制 消息傳遞后並 不會立即投遞到目標隊列中,而是存儲在 mnesia(一個分布式數據系統)表中,當達到投遞時間時,才 投遞到目標隊列中。

定義交換機:

 

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //自定義交換機 我們在這里定義的是一個延遲交換機
    @Bean
    public CustomExchange delayedExchange() {
        Map < String, Object > args = new HashMap < > ();
        //自定義交換機的類型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
            args);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
        @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return
        BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生產者:

  

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
        correlationData - > {
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
    log.info(" 當 前 時 間 : {}, 發 送 一 條 延 遲 {} 毫秒的信息給隊列 delayed.queue:{}", new Date(), delayTime, message);
}

  消費者正常操作

  延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實現延時隊列可以很好的利用 RabbitMQ 的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正 確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點故障問題,不會因為 單個節點掛掉導致延時隊列不可用或者消息丟失。 當然,延時隊列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的時間輪,這些方式各有特點,看需要適用的場景


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM