聊一聊順序消息(RocketMQ順序消息的實現機制)


當我們說順序時,我們在說什么?

日常思維中,順序大部分情況會和時間關聯起來,即時間的先后表示事件的順序關系。

比如事件A發生在下午3點一刻,而事件B發生在下午4點,那么我們認為事件A發生在事件B之前,他們的順序關系為先A后B。

上面的例子之所以成立是因為他們有相同的參考系,即他們的時間是對應的同一個物理時鍾的時間。如果A發生的時間是北京時間,而B依賴的時間是東京時間,那么先A后B的順序關系還成立嗎?

如果沒有一個絕對的時間參考,那么A和B之間還有順序嗎,或者說怎么斷定A和B的順序?

顯而易見的,如果A、B兩個事件之間如果是有因果關系的,那么A一定發生在B之前(前因后果,有因才有果)。相反,在沒有一個絕對的時間的參考的情況下,若A、B之間沒有因果關系,那么A、B之間就沒有順序關系。

那么,我們在說順序時,其實說的是:

  • 有絕對時間參考的情況下,事件的發生時間的關系;

  • 和沒有時間參考下的,一種由因果關系推斷出來的happening before的關系;

在分布式環境中討論順序

當把順序放到分布式環境(多線程、多進程都可以認為是一個分布式的環境)中去討論時:

  • 同一線程上的事件順序是確定的,可以認為他們有相同的時間作為參考

  • 不同線程間的順序只能通過因果關系去推斷

 

(點表示事件,波浪線箭頭表示事件間的消息)

上圖中,進程P中的事件順序為p1->p2->p3->p4(時間推斷)。而因為p1給進程Q的q2發了消息,那么p1一定在q2之前(因果推斷)。但是無法確定p1和q1之間的順序關系。

推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會透徹的分析分布式系統中的順序問題。

消息中間件中的順序消息

什么是順序消息

有了上述的基礎之后,我們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。

順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發布和消費的消息類型。順序消息由兩個部分組成:順序發布和順序消費。

順序消息包含兩種類型:

分區順序:一個Partition內所有的消息按照先進先出的順序進行發布和消費

全局順序:一個Topic內所有的消息按照先進先出的順序進行發布和消費

這是阿里雲上對順序消息的定義,把順序消息拆分成了順序發布和順序消費。那么多線程中發送消息算不算順序發布?

如上一部分介紹的,多線程中若沒有因果關系則沒有順序。那么用戶在多線程中去發消息就意味着用戶不關心那些在不同線程中被發送的消息的順序。即多線程發送的消息,不同線程間的消息不是順序發布的,同一線程的消息是順序發布的。這是需要用戶自己去保障的。

而對於順序消費,則需要保證哪些來自同一個發送線程的消息在消費時是按照相同的順序被處理的(為什么不說他們應該在一個線程中被消費呢?)。

全局順序其實是分區順序的一個特例,即使Topic只有一個分區(以下不在討論全局順序,因為全局順序將面臨性能的問題,而且絕大多數場景都不需要全局順序)。

如何保證順序

在MQ的模型中,順序需要由3個階段去保障:

  1. 消息被發送時保持順序

  2. 消息被存儲時保持和發送的順序一致

  3. 消息被消費時保持和存儲的順序一致

發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中采用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。

如下圖所示:

對於兩個訂單的消息的原始數據:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):

  • 在發送時,a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒有順序關系,這意味着a、b訂單的消息可以在不同的線程中被發送出去

  • 在存儲時,需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證

    • a1、b1、b2、a2、a3、b3是可以接受的

    • a1、a2、b1、b2、a3、b3也是可以接受的

    • a1、a3、b1、b2、a2、b3是不能接受的

  • 消費時保證順序的簡單方式就是“什么都不做”,不對收到的消息的順序進行調整,即只要一個分區的消息只由一個線程處理即可;當然,如果a、b在一個分區中,在收到消息后也可以將他們拆分到不同線程中處理,不過要權衡一下收益

開源RocketMQ中順序的實現

上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區中。文檔只是給出了Producer順序的處理,Consumer消費時通過一個分區只能有一個線程消費的方式來保證消息順序,具體實現如下。

Producer端

Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區,在RocketMQ中,通過MessageQueueSelector來實現分區的選擇。

  • List<MessageQueue> mqs:消息要發送的Topic下所有的分區

  • Message msg:消息對象

  • 額外的參數:用戶可以傳遞自己的參數

比如如下實現就可以保證相同的訂單的消息被路由到相同的分區:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消費端有兩種類型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用戶控制線程,主動從服務端獲取消息,每次獲取到的是一個MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲順序一致,用戶需要再拿到這批消息后自己保證消費的順序。

對於PushConsumer,由用戶注冊MessageListener來消費消息,在客戶端中需要保證調用MessageListener時消息的順序性。RocketMQ中的實現如下:

  1. PullMessageService單線程的從Broker獲取消息

  2. PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個消息的緩存),之后提交一個消費任務到ConsumeMessageOrderService

  3. ConsumeMessageOrderService多線程執行,每個線程在消費消息時需要拿到MessageQueue的鎖

  4. 拿到鎖之后從ProcessQueue中獲取消息

保證消費順序的核心思想是:

  • 獲取到消息后添加到ProcessQueue中,單線程執行,所以ProcessQueue中的消息是順序的

  • 提交的消費任務時提交的是“對某個MQ進行一次消費”,這次消費請求是從ProcessQueue中獲取消息消費,所以也是順序的(無論哪個線程獲取到鎖,都是按照ProcessQueue中消息的順序進行消費)

順序和異常的關系

順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區,消息需要保證每個分區的數據只有一個線程消息,那么就會有一些缺陷:

  • 發送順序消息無法利用集群的Failover特性,因為不能更換MessageQueue進行重試

  • 因為發送的路由策略導致的熱點問題,可能某一些MessageQueue的數據量特別大

  • 消費的並行讀依賴於分區數量

  • 消費失敗時無法跳過

不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過Raft、Paxos之類的算法保證有可用的副本,或者通過其他高可用的存儲設備來存儲MessageQueue。

熱點問題好像沒有什么好的解決辦法,只能通過拆分MessageQueue和優化路由方法來盡量均衡的將消息分配到不同的MessageQueue。

消費並行度理論上不會有太大問題,因為MessageQueue的數量可以調整。

消費失敗的無法跳過是不可避免的,因為跳過可能導致后續的數據處理都是錯誤的。不過可以提供一些策略,由用戶根據錯誤類型來決定是否跳過,並且提供重試隊列之類的功能,在跳過之后用戶可以在“其他”地方重新消費到這條消息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM