MQ系列14:MQ如何做到消息延時處理


MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式
MQ系列6:消息的消費
MQ系列7:消息通信,追求極致性能
MQ系列8:數據存儲,消息隊列的高可用保障
MQ系列9:高可用架構分析
MQ系列10:如何保證消息冪等性消費
MQ系列11:如何保證消息可靠性傳輸
MQ系列12:如何保證消息順序性
MQ系列13:消息大量堆積如何解決

1 背景

在互聯網業務的實際應用場景中,消息的延時處理是非常必要的。例如,在金融交易系統中,某些交易的確認可能需要一段時間才能完成。又如,在物流跟蹤系統中,貨物的運輸狀態需要一段時間才能更新。而MQ作為中間件的角色專門來處理消息媒介,實際也具備了使用消息的延時處理來保證信息的及時性的能力。
這邊舉兩個具體的例子:

  • 火車票訂購,提交了訂單就把車票給占位了,這時候可以發送一個延時確認的消息,15m 未付款,就要把該車票釋放,這樣其他人就可以購買了。
  • 購買電影票,可以發送一個核銷檢查消息,在電影開場前15分鍾就無法退票了。

既然消息延遲處理的使用場景這么常見,那我們就要詳細來分析下怎么使用MQ來實現,這邊以RocketMQ為技術選型。

2 消息延時處理原理

RocketMQ的消息延時處理是通過預定義的消息延時級別和延時隊列來實現的。在發送消息時,生產者可以設置一個延時級別,該消息將會被延遲一段時間后才能被消費者消費。RocketMQ默認提供了18個延時級別,每個級別對應不同的延遲時間。

所以延時時間並不是隨意指定的,Rocket源碼中指定的18種等級如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • RocketMq不支持任意時間延時,需設置固定的延時等級,從1s到2h分別對應着等級1到18
  • 可以使用setDelayTimeLevel(int level) 方法設置延時等級,level 從 0 開始

在RocketMQ中,每個Broker都設置了一個延時隊列,用於存儲延時消息。當消息的延時時間到達時,該消息將會被自動轉移到普通的消息隊列中,等待消費者的消費。這種方式可以有效地避免因為網絡延遲或者消費者處理速度慢而導致消息的延遲。

image

3 消息延時處理實戰

使用RocketMQ的消息延時處理非常簡單。在發送消息時,生產者只需要設置一個延時級別,然后將消息發送到RocketMQ即可。例如:

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、創建生產者producer,並指定生產者組名為 example_group_name
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");  
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 3、啟動生產者producer
        producer.start();
        // 4、創建消息,並指定Topic,Tag和消息體
        Message msg = new Message("example_topic","example_key", "試一試延遲30s發送的消息".getBytes("UTF-8"));
        // 5、設置延時等級3,對應30s,所以這個消息在30秒之后發送
        msg.setDelayTimeLevel(3);
        // 6、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 7、通過sendResult返回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 8、如果不再發送消息,關閉生產者Producer
        producer.shutdown();
    }
}

image

在上述代碼中,我們首先創建了一個生產者,然后指定了NameServer的地址,並啟動了生產者。接着,我們創建了一個延時級別為3的消息,即該消息將會被延遲30秒后才能發送並被消費者消費。最后,我們發送了該消息,並關閉了生產者。

4 消息延時的優化

雖然RocketMQ的消息延時處理功能已經非常強大,但是在實際應用中,我們可能還需要根據自己的業務需求進行一些優化。以下是一些可能的優化方式:

  • 調整延時隊列的大小。在RocketMQ中,每個Broker都只有一個延時隊列,隊列太小可能導致一些延時消息被miss。可以根據實際需求調整延時隊列的大小。
  • 使用多個消費者來消費同一主題的消息。在RocketMQ中,可能有批量執行被設置了同樣的延遲時間,這個就存在了一些風險,類似緩存的批量過期一樣,稍有不慎,可能會擊穿數據庫。如果只有一個消費者來消費該主題的消息,可能會導致該消費者的處理速度不夠快,從而影響到消息的及時性。我們可以根據實際需求增加消費者數量,以提高消息的處理速度。
  • 調整RocketMQ的配置參數。RocketMQ提供了一些配置參數,可以用來調整其性能和可靠性。我們可以根據實際需求調整這些參數,以優化消息的延時處理效果。

總之,RocketMQ的消息延時處理功能非常強大,可以滿足許多實際應用場景的需求。在實際應用中,我們可以根據自己的業務需求進行一些優化,以進一步提高消息的及時性和可靠性。

5 總結

本文我們介紹了RocketMQ如何使用消息延時來處理特殊的業務場景。除了上述的方法之外,我們還有一些其他方法,比如:

  • 定時發送消息。在定時發送中,生產者可以指定一個未來的時間戳,在該時間戳到達時,該消息將會被發送到Broker。RocketMQ內部會維護一個定時任務,每隔一段時間檢查一次待定時發送的消息,並判斷是否到達了指定的時間戳。如果到達了指定的時間戳,該消息將會被發送到Broker。
  • 自建環形隊列來實現“延時消息” ,參考這篇:1分鍾實現“延遲消息”功能,寫的不錯


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM