MQ系列12:如何保證消息順序性


MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的發送模式
MQ系列6:消息的消費
MQ系列7:消息通信,追求極致性能
MQ系列8:數據存儲,消息隊列的高可用保障
MQ系列9:高可用架構分析
MQ系列10:如何保證消息冪等性消費
MQ系列11:如何保證消息可靠性傳輸

1 介紹

消息的有序性在很多業務場景中占有很重要的位置。
比如購物場景,需要按照 創建訂單 --> 訂單付款 --> 完成訂單 順序執行。
又比如出行場景,接單 --> 接送到達目的地 --> 付款 --> 完成訂單。
這種是嚴格按照順序執行的,這樣的順序消費才不會出問題,而且各個訂單之間是互相獨立和並行執行的。
所以,在MQ中,如何穩定地保證順序性消息處理,是一個不可避免的話題。

2 消息的有序性說明

消息的有序執行,一般不是單個組件的能力。而是整個消息從生產,排隊,存儲到消費都是有序的,比如上面提到的購物和出行場景。
這就要求我們在消息隊列(如果是Kafka,還是RocketMQ、RabbitMQ)中,保證以下前提:

  • 消息生產的有序性:即生產者組件有序發送消息
  • 消息入出隊列的有序性:即消息是按照進入的先后順序排隊列放的,遵循FIFO原則。
  • 消息的存儲的有序性:與上一點一致,部分場景下為了提高可用,就是要持久化到磁盤,這時候應該遵循有序存放,才能保證后續有序消費
  • 消息消費的有序性:即按照順序進行消費。又分為全局順序消息與部分順序消息,全局是指Topic下的所有消息都要保證順序;部分順序消息保證每一組消息被順序消費即可。

這邊還有個問題,如果想讓全局都是順序性消費,那么只能用一個消費者去消費隊列(一般來說也是單個生產者),這是會嚴重影響整體性能的,一般沒這個,都是分組順序執行消費的。
image

2.1 消息生產的有序性

要保證整個消息隊列的有序性執行,首先要保證消息生產的有序性。
RocketMQ在Broker中防止了很多Topic,主題(Topic)可以看做消息的歸類,我們將消息進行類型划分,相同類型的消息稱為一個 Topic。比如我們在淘寶或京東上購買商品的的過程,就可能產生:購物車消息、交易消息、物流消息等,1條消息必然歸屬於1個 Topic 。
1個 Topic可以有0 ~ n 個生產者向其發送消息;也可以被 0~n 個消費者訂閱和處理,於是就有出現了生產者組和消費者組,如下圖:
image

或者同一個Topic中,創建不同的Queue,同一個消息生產者將消息隔離發送到不同的Queue中:
image

按照上述的模式,同理,我們只需要保證一組相同的消息按照給定的順序存入同一個隊列中,就能保證生產者有序存儲,比如一次完整的消費過程:創建訂單、付款、完成訂單按照順序在一個隊列(Queue)中執行那就可以了。

★ 同時我們要保證同一組的消息在消息生產的時候投送到一個組中。這個相對來說不難,可以這么做:

  • 比如一個訂單的多個子消息的父訂單號是一致,我們把這些消息按照訂單號取模,投送到對應的Queue中就行了,比如 訂單號 % 隊列數量( 163105015 % 9)
  • 發送消息自定義消息標簽(消息標簽可以用隊列編號命名),一組消息使用同一個標簽,改組標簽對應的消息都投向標簽所在的隊列。

★ 業務程序方面,必須使用同步發送的方式,這樣才能保證生產者發送的消息有序,否則按照FIFO的原則,很可能 訂單完成 會被先消費。
但是我們業務程序,比如Java代碼中為了提升性能,可能使用多線程的模式進行事件觸發。多線程下保證生產者順序性,可以使用鎖並配合 spring的publish event(按照順序執行的內部隊列),持久化之后,再按照先進先出的順序推送消息進入MQ中。
可以參考下 ,大概就是將你的事件進行順序化一下。

★ 上述方法也不能完完全全的避免順序化執行。如果broker服務發生故障,或者消息發生丟失,都有可能導致事件消費不完整,出現不一致的問題。

2.2 消息有序性存儲

Broker 存儲架構采用文件存儲機制(類似Kafka),即直接在磁盤上使用文件來保存消息,而不是采用Redis或者MySQL之類的持久化工具。
它會把消息存儲所屬相關的文件存儲在ROCKETMQ_HOME下,包含三個部分:

  • CommitLog 消息元數據
  • ConsumeQueue 消息邏輯隊列
  • IndexFile 索引文件

存儲消息的元數據,所有消息都會順序存入到CommitLog文件中。
ConsumeQueue是指存儲消息在CommitLog上的索引,一個MessageQueue一個文件,記錄當前MessageQueue被哪些消費者組消費到了哪一條CommitLog。
所以一切都是順序性操作下來的,而且按照 MessageQueue 做了隔離了,不用擔心亂序的問題。詳細參考 《MQ系列8:數據存儲,消息隊列的高可用保障

image

2.3 消息消費的有序性

最后一步就是消費的有序性了,既然消息生產和消息持久化都可以做到有序性。那么只要保證消費的有序性,就能保證整個消息隊列的有序執行。
這邊以RocketMQ為例子,RockerMQ采用MessageListener 回調函數進行監聽,監聽到消息之后進行數據處理。MessageListener主要提供了兩種消費模式,如下:

  • 有序消費模式MessageListenerOrderly
  • 並發消費模式MessageListenerConcurrently

其中有序消費模式有序消費模式MessageListenerOrderly可以保證按照順序進行消息處理。但是消費的業務代碼實現是多線程並行的,依然是無法保證的。
實際上RocketMQ也是這么做的,MessageListenerConcurrently拉到消息之后會提交到線程池去消費,而MessageListenerOrderly則是通過分布式鎖和本地鎖保證同時只有一條線程去消費一個隊列(Queue)上的數據。
這種消費模式就是使用以下3把鎖來確保順序性:

  • broker端的分布式鎖
  • messageQueue的本地synchronized鎖
  • ProcessQueue的本地consumeLock

3 總結

要消息的順序性消費:需要保持先后順序的消息放到同一個消息隊列中(kafka中就是partition,rabbitMq中就是queue),然后使用線程池消費的時候使用分布式鎖和本地鎖保證同時只有一條線程去消費一個隊列(Queue)上的數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM