延遲隊列
延遲隊列 又被稱為 延時隊列、死信隊列 ,它也是 RabbitMQ 隊列中的一種,指進入該隊列中的消息會被延遲消費的隊列。
顧名思義,延遲隊列和普通隊列的區別在於:
- 進入普通隊列的消息將會立即『走向』下一個環節,而下一個環節就是消費者;而
- 進入延遲隊列的消息將會被延遲隊列『持有』若干時間,而后才『走向』下一個環節,而且下一個環節是另一個交換機。這個『另一個交換機』也被稱為死信交換機。
RabbitMQ 引入延遲隊列主要是用於『延遲』一定時間再處理特定的業務邏輯,而這種『延遲』在 RabbitMQ 看來是『自動化』的,無須人為進行干預。
延遲隊列的使用價值在於:
-
某些業務需要這種機制。例如,訂單 30 分鍾內未支付則需要取消訂單。
-
在某種程度上,它可以替代定時任務。
#1. 專有詞匯
與普通的隊列一樣,延遲隊列也具有消息、交換機、路由和隊列等名詞。不過,它還增加了 3 個專有名詞:
- DLX
- Dead Letter Exchange,死信隊列交換機,是一種特殊類型的交換機。
- DLK
- Dead Letter Routing-Key,死信路由,同樣也是一種特殊類型的路由。主要是和 DLX 組合在一起構成死信隊列。
- TTL
- Time To Live,指進入延遲隊列中的消息可以存活的時間。當 TTL 一到,將意味着該消息『死了』,從而進入下一個『中轉站』,等待被真正的消息隊列監聽消費。
普通隊列 + 三個特殊設置 = 延遲隊列/死信隊列
在 http://localhost:15672/#/queues 中創建延遲隊列時,在普通隊列的基礎上需要設定三個『額外』的屬性。
-
Dead letter exchange:x-dead-letter-exchange 。指定延遲隊列的『下家』交換機。
-
Dead letter routing key:x-dead-letter-routing-key 。延遲隊列自動向『下家』交換機投遞消息時所使用的消息的 routing-key。
-
Message TTL:x-message-ttl 。延遲隊列要持有消息的時長。例如:10000 ,即 10s 。
#2. 延遲隊列流程模型

以延遲消息為例來描述延遲隊列的流程模型:
表面上看,消息生產者發出消息若干秒(以 5 秒為例)消息的消費者才消費該消息,才觸發相應方法的執行。其中,核心問題的關鍵點在於:延遲隊列(在中間環節)持有了該消息 5 秒,從而達到了延遲 5 秒的效果。
因此,簡單而言,整體流程分為 3 步:
-
消息生產者將消息發送到延遲隊列;
-
延遲隊列(持有消息 5秒后)將消息轉發給消費者隊列;
-
由於消費者正『監聽着』消費者隊列,一旦消費者隊列收到消息,消費者就從中讀取消息,消費。
所以,整個環節中有 2 套 交換機 - 路由:
-
第一套
交換機 - 路由負責將消息從生產者路由到延遲隊列; -
第二套
交換即 - 路由負責將消息從延遲隊列路由到消費者隊列;
TIP
第二個交換機也被稱為死信交換機,不過它的創建和設置與普通交換機沒有區別。
#3. 代碼配置
充分理解上圖后,下面的代碼配置的含義和目的就一目了然了。
@Configuration @EnableRabbit public class RabbitMQConfig { public static final String first_exchange_name = "first-exchange"; public static final String second_exchange_name = "second-exchange"; public static final String first_routing_key = "first-routing-key"; public static final String second_routing_key = "second-routing-key"; public static final String first_binding = "first-binding"; public static final String second_binding = "second-binding"; public static final String dead_queue_name = "dead-queue"; public static final String real_queue_name = "real-queue"; @Bean("dead-queue") public Queue deadQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", second_exchange_name); // 指定時期后消息投遞給哪個交換器。 args.put("x-dead-letter-routing-key", second_routing_key); // 指定到期后投遞消息時以哪個路由鍵進行投遞。 args.put("x-message-ttl", 5000); // 指定到期時間。5 秒 return new Queue(dead_queue_name, true, false, false, args); } @Bean("real-queue") public Queue realQueue() { return new Queue(real_queue_name, true, false, false); } /* 問題一:發出的消息憑什么會到死信隊列。*/ @Bean(first_exchange_name) public DirectExchange firstExchange() { return new DirectExchange(first_exchange_name, true, false); } @Bean(first_binding) public Binding firstBinding(@Qualifier(dead_queue_name) Queue queue, @Qualifier(first_exchange_name) Exchange exchange) { return BindingBuilder.bind(queue) .to(exchange) .with(first_routing_key) .noargs(); } /* 問題二:延遲隊列憑什么會把消息再轉給 real-queue 。*/ @Bean(second_exchange_name) public DirectExchange secondExchange() { return new DirectExchange(second_exchange_name, true, false); } @Bean(second_binding) public Binding secondBiding(@Qualifier(real_queue_name) Queue queue, @Qualifier(second_exchange_name) Exchange exchange