延時隊列
隊列內部是有序的,最重要的特性就體現在它的延時屬性上,延時隊列中的元素是希望 在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的 元素的隊列。
延遲隊列使用場景
1.訂單在十分鍾之內未支付則自動取消
2.新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
3.用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
4.用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
5.預定會議后,需要在預定的時間點前十分鍾通知各個與會人員參加會議
首先要只要一個rabbitmq的特性TTL
TTL是什么呢?TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有 消息的最大存活時間,
單位是毫秒。換句話說,如果一條消息設置了 TTL 屬性或者進入了設置TTL 屬性的隊列,那么這 條消息如果在TTL 設置的時間內沒有被消費,則會成為"死信"。如果同時配置了隊列的TTL 和消息的 TTL,那么較小的那個值將會被使用,有兩種方式設置 TTL
1 消息設置TTL

2 隊列設置TTL

兩者的區別:
如果設置了隊列的 TTL 屬性,那么一旦消息過期,就會被隊列丟棄(如果配置了死信隊列被丟到死信隊 列中),而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者 之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需 要注意的一點是,如果不設置 TTL,表示消息永遠不會過期,如果將 TTL 設置為 0,則表示除非此時可以 直接投遞該消息到消費者,否則該消息將會被丟棄。 前一小節我們介紹了死信隊列,剛剛又介紹了 TTL,至此利用 RabbitMQ 實現延時隊列的兩大要素已 經集齊,接下來只需要將它們進行融合,再加入一點點調味料,延時隊列就可以新鮮出爐了。想想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL 則剛好能讓消息在延遲多久之后成為死信,另一方面, 成為死信的消息都會被投遞到死信隊列里,這樣只需要消費者一直消費死信隊列里的消息就完事了,因為 里面的消息都是希望被立即處理的消息
實現:
創建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設置為 10S 和 40S, 且兩個隊列都沒有消費者 ,然后在創建一個普通交換機 X 和死信交換機 Y,它們的類型都是direct,創建一個死信隊列 QD,它們的綁定關系如下

面臨的問題:
舉個例子: 如果有兩條消息 從X交換機 到QA隊列 , 我們對這兩條消息設置TTL , "消息1" 延遲40秒 , "消息2"延遲10秒 , 最后會發現 兩條消息都會延遲40秒 , 看起來 "消息1" 擋住了路 , 沒有讓 延遲比它低的"消息2" 提前過去;
原因:因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列, 如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息並不會優先得到執行。
解決辦法:
上文中提到的問題,確實是一個問題,如果不能實現在消息粒度上的 TTL,並使其在設置的TTL 時間 及時死亡,就無法設計成一個通用的延時隊列。那如何解決呢,接下來我們就去解決該問題。
Rabbitmq 插件實現延遲隊列 !!!
在官網上下載 https://www.rabbitmq.com/community-plugins.html,
下載 rabbitmq_delayed_message_exchange 插件,然后解壓放置到 RabbitMQ 的插件目錄。
進入 RabbitMQ 的安裝目錄下的 plgins 目錄,執行下面命令讓該插件生效,然后重啟 RabbitMQ /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用:

在我們自定義的交換機中,這是一種新的交換類型,該類型消息支持延遲投遞機制 消息傳遞后並 不會立即投遞到目標隊列中,而是存儲在 mnesia(一個分布式數據系統)表中,當達到投遞時間時,才 投遞到目標隊列中。
定義交換機:
@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } //自定義交換機 我們在這里定義的是一個延遲交換機 @Bean public CustomExchange delayedExchange() { Map < String, Object > args = new HashMap < > (); //自定義交換機的類型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
生產者:
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @GetMapping("sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData - > { correlationData.getMessageProperties().setDelay(delayTime); return correlationData; }); log.info(" 當 前 時 間 : {}, 發 送 一 條 延 遲 {} 毫秒的信息給隊列 delayed.queue:{}", new Date(), delayTime, message); }
消費者正常操作
延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實現延時隊列可以很好的利用 RabbitMQ 的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正 確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點故障問題,不會因為 單個節點掛掉導致延時隊列不可用或者消息丟失。 當然,延時隊列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的時間輪,這些方式各有特點,看需要適用的場景
