RabbitMQ延時隊列應用場景


應用場景

我們系統未付款的訂單,超過一定時間后,需要系統自動取消訂單並釋放占有物品

image-20211020212315320

常用的方案

就是利用Spring schedule定時任務,輪詢檢查數據庫

但是會消耗系統內存,增加了數據庫的壓力、還存在較大的時間誤差

image-20211020212617711

解決:rabbitmq的消息TTL和死信Exchange結合

介紹

1.何為消息TTL、死信

死信:對消息設置的過期時間到了,這個消息還沒有被消費就認為這個消息死了,死了的消息會進入死信交換機(Dead Letter Exchanges)

成為死信的三種條件:

  • 一個消息被Consumer拒收了,並且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。(basic.reject/ basic.nack)requeue=false
  • 上面的消息的TTL到了,消息過期了。
  • 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上

消息TTL:消息的TTL就是消息的存活時間

RabbitMQ可以對隊列和消息都設置過期時間,但代表的都是一個意思,只要消息在設置時間內沒有消費,消息就死了,就被稱為死信

如果隊列和消息都設置了過期時間,那么就取時間最小的,單個消息的過期時間才是延時隊列的關鍵

2.如何運作

設置隊列過期時間

image-20211020213655496

消費者P會通過一個路由鍵deal.message發送消息給X交換機,然后繼續發送給delay queau隊列,這個隊列比較特殊,設置了過期時間5分鍾過期,還設置了x-dead-letter-exchange用於指定下一個接收的交換機,消息過期之后會成為死信直接進入delay.exchange交換機,利用x-dead-letter-routing-key綁定的路由鍵找到下一個隊列,這時候只需要有人監聽這個隊列。

設置消息過期時間

image-20211020214756629

消費者發送一個消息,設置了5分鍾過期時間,最后交給了延時隊列,延時隊列說消息死了不要亂放,指定了一個死信路由,用於找到下一個隊列的路由鍵,等到五分鍾后服務器會自動檢查是否過期,過期的話會交給delay.exchange路由,最后再交給delay.message

代碼模擬

image-20211020215722888

下訂單成功先發動給order-event-exchangeorder-event-exchange綁定了兩個路由鍵order.create.orderorder.release.order,根據order.create.order路由鍵找到order.delay.queue隊列,這是一個特殊的隊列,上圖所訴,消息的存活時間為一分鍾,消息在order.delay.queue隊列中沒人使用變成死信了,交給order-event-exchange交換機,最后通過order.release.order綁定關系找到了order.release.order.queue隊列

@Configuration
public class MyMQConfig {

    //監聽最后一個隊列,獲取那些過期的訂單消息
    @RabbitListener(queues = "order.release.order.queue")
    public  void  listerner(OrderEntity orderEntity,Channel channel,Message message) throws IOException {
        System.out.println("收到過期訂單信息"+orderEntity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    //特殊隊列
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000);
        Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        return orderDelayQueue;
    }

    //最后接收死信消息的隊列
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

   //事件交換機
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }

    //綁定order.delay.queue隊列和的order-event-exchange交換機的路由鍵
    @Bean
    public Binding orderCreateBingding() {
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }

    //綁定order.release.order.queue隊列和的order-event-exchange交換機的路由鍵
    @Bean
    public Binding orderReleaseBingding() {
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
    }
 

}

測試

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @ResponseBody
    @GetMapping("/test/createOrder")
    public String createOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return "ok";
    }

image-20211020221215759

庫存解鎖實際場景

在庫存服務有個stock-event-exchange交換機,如果我們想要解鎖庫存,

1、首先訂單下成功、庫存鎖定成功

2、鎖定成功就要通過stock.locked路由鍵發送一個消息給交換機stock-event-exchange,消息內容包括哪個訂單、哪些商品、多少庫存等等

3、交換機通過綁定關系再發送給延時隊列stock.delay.queue

4、訂單可能需要30分鍾才會自動關閉,50分鍾之后來檢查庫存,就會知道訂單支付沒有

5、50分鍾消息沒有被消息,就變為死信,通過stock.release路由鍵綁定關系交給stock-event-exchange交換機

6、stock-event-exchange交換機通過stock.release路由鍵綁定關系找到strock.relelase.stock.queue隊列

7、所有的解鎖庫存服務就監聽這個隊列里的消息,只要這個隊列里消息能夠到達的都是超時沒有支付訂單的

下單遠程鎖定庫存,然后將倉庫鎖定庫存的數據發給訂單,當在訂單下單失敗時,由於不是分布式事務,訂單回滾,但倉庫不回滾,所以訂單一失敗,就需要通過訂單拿到mq中倉庫傳來的數據通知倉庫解鎖庫存

庫存解鎖場景:

1、下訂單成功,訂單過期沒有支付被系統自動取消或者用戶手動取消,都要解鎖庫存

2、下訂單成功、庫存鎖定成功,但是業務調用失敗導致訂單回滾,之前鎖定的庫存就自動解鎖,Seata分布式事務太慢,就要用一段時間后自動解決庫存。

3、訂單失敗,因為鎖庫存失敗有一個商品沒有鎖成功,導致整個鎖庫存服務都回滾,

消息隊列收到庫存消息場景

消息隊列收到消息之后

  • 如果沒有查到數據庫有鎖定成功的數據,說明庫存鎖失敗了,鎖庫存自動回滾,數據庫查不到記錄無需解鎖
  • 如果查到有數據,就說明庫存鎖定成功了
    • 沒有這個訂單必須解鎖庫存
    • 有訂單,訂單沒人支付失效了才能解鎖庫存

定時關閉訂單實際場景

image-20211021201122194

同上原理類似也是利用死信路由,訂單創建后,默認放入延時隊列,也就是訂單的有效時間,超過這個時間沒有支付或者用戶主動取消都會導致訂單信息進入order.release.order.queue隊列,最后被釋放


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM