死信隊列
引言
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),其實應該叫做死信交換機才更恰當。
當消息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX。
總結:其實死信隊列就是一個普通的交換機,有些隊列的消息成為死信后,(比如過期了或者隊列滿了)這些死信一般情況下是會被 RabbitMQ 清理的。但是你可以配置某個交換機為此隊列的死信交換機,該隊列的消息成為死信后會被重新發送到此 DLX 。至於怎么處理這個DLX中的死信就是看具體的業務場景了,DLX 中的信息可以被路由到新的隊列。
消息成為死信的三種情況
- 隊列長度到達限制,無法加入新的消息
- 消費者拒接消費消息,並且不重回隊列。該信息會被清除並進入死信隊列
- 原隊列存在消息過期設置,消息到達超時時間未被消費
隊列如何綁定 DLX
設置兩個參數
- x-dead-letter-exchange 指定此隊列的死信隊列
- x-dead-letter-exchange 指定此隊列向DLX發送死信的routing key,因為這個時候該隊列相當於一個生產者,發送消息要指定routing key
實現效果:往一個普通隊列添加消息,消息過期成為死信,進入死信隊列。死信隊列根據配置好的route key 路由到與它綁定的其他普通隊列。
-
聲明死信隊列(普通交換機)
// 聲明死信交換機 @Bean("deadExchange") public Exchange deadExchange(){ return ExchangeBuilder.topicExchange("sb_dead_exchange").durable(true) .autoDelete().build(); }
-
聲明普通隊列,配置它的DLX
// 聲明普通隊列,綁定死信隊列 @Bean public Queue queue3(){ Queue build = QueueBuilder.durable("sb_dead_queue").build(); build.addArgument("x-message-ttl",10000); build.addArgument("x-dead-letter-exchange","sb_dead_exchange"); // 此時隊列相當於生產者,因此要指定消息的routing key,死信隊列可以更加routing key路由到其他隊列 build.addArgument("x-dead-letter-routing-key","user4.info"); return build; } // 綁定此隊列和它的交換機 @Bean public Binding exchangQueue3(@Qualifier("queue3")Queue queue, @Qualifier("topicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("user3.#").noargs(); }
-
綁定死信隊列和普通隊列,死信隊列中的消息會根據路由發送到其他隊列
// 綁定死信隊列和普通隊列 @Bean public Binding exchangQueue4(@Qualifier("queue2")Queue queue, @Qualifier("deadExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("user4.#").noargs(); }
延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。經典的應用場景是下單減庫存。
預扣庫存的模式下,我們下單會立刻減庫存,但是超過支付時間還沒支付的話該訂單就會被取消,庫存回滾。
具體實現可以采用定時器的方式,定時檢查當前時間與下單時間是否超過上限,比如設置為30min。但是多久執行一次定時任務是個問題,精度大(比如1s執行一次)的話數據庫的壓力十分大,精度小(比如2min執行一次)的話又會帶來誤差。
現在采用的方式是采用延遲隊列。RabbitMQ沒有提供延遲隊列的功能。但是我們能使用死信隊列 + TTL 自己實現延遲隊列。TTL時間為過期時間(如30min)。
我們為每條訂單信息設置一個過期時間(30min),消息過期后成為死信,自動進入死信隊列,死信隊列又把消息路由到它綁定的普通隊列,庫存系統訂閱該隊列即可在30min后才取出該信息。間接實現了延遲隊列的功能
如此設計庫存系統就一定只能在30min后才能從隊列中取出訂單