RocketMQ:(4) Consumer - 定時消息、順序消息


一、 定時消息機制

  定時消息是指消息發送到Broker后,並不立即被消費者消費而是要等到特定的時間后才能被消費,RocketMQ並不支持任意的時間精度,如果要支持任意時間精度定時調度,不可避免地需要在Broker層做消息排序,再加上持久化方面的考量,將不可避免地帶來巨大的性能消耗,所以RocketMQ只支持特定級別的延遲消息。消息延遲級別在Broker端通過messageDelayLevel配置,默認為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延遲消息1s,delayLevel=2表示延遲5s,依次類推。

  消息重試正是借助定時任務實現的,在將消息存入commitlog文件之前需要判斷消息的重試次數,如果大於0,則會將消息的主題設置為SCHEDULE_TOPIC_XXXX。

ScheduleMessageService

  RocketMQ定時消息實現類為ScheduleMessageService,調用順序為:構造方法 -> load() -> start()。

load方法

  該方法主要完成延遲消息消費隊列消息進度的加載與delayLevelTable數據的構造,延遲隊列消息消費進度默認存儲路徑為${ROCKET_HOME}/store/config/delayOffset.json,同時解析messageDelayLevel定義的延遲級別轉換為Map,延遲級別1,2,3等對應的延遲時間。

start方法

  根據延遲級別創建對應的延時任務,啟動定時任務持久化延遲消息隊列進度存儲。
  Step1:根據延遲隊列創建定時任務,遍歷延遲級別,根據延遲級別 level 從 offsetTable 中獲取消費隊列的消費進度,如果不存在,則使用0。也就是說每一個延遲級別對應一個消息消費隊列。然后創建定時任務,每一個定時任務第一次啟動時默認延遲1s先執行一次定時任務,第二次調度開始才使用相應的延遲時間。延遲級別與消息消費隊列的映射關系為:消息隊列ID = 延遲級別 - 1。
  Step2:創建定時任務,每隔10s持久化一次延遲隊列的消息消費進度。

定時調度邏輯

  start方法啟動后,會為每一個延遲級別創建一個調度任務,每一個延遲級別其實對應SCHEDULE_TOPIC_XXXX主題下的一個消息消費隊列。
  Step1:根據隊列ID與延遲主題查找消息消費隊列,如果未找到,說明目前並不存在該延時級別的消息,忽略本次任務。
  Step2:根據offset從消息消費隊列中獲取當前隊列中所有有效的消息。
  Step3:遍歷ConsumeQueue,解析出消息的物理偏移量、消息長度、消息tag hashcode, 為從commitlog加載具體的消息做准備。
  Step4:根據消息物理偏移量與消息大小從commitlog文件中查找消息。
  Step5:根據消息重新構建新的消息對象,清除消息的延遲級別屬性(delayLevel)、並恢復消息原先的消息主題與消息消費隊列。
  Step6:將消息再次存入到commitlog,並轉發到主題對應的消息隊列上,供消費者再次消費。
  Step7:更新延遲隊列拉取進度。

總結

  定時消息的第一個設計關鍵點是,定時消息單獨一個主題:SCHEDULE_TOPIC_XXXX,該主題下隊列數量等於配置的延遲級別數量。其對應關系為queueId等於延遲級別減1。ScheduleMessageService為每一個延遲級別創建一個定時Timer根據延遲級別對應的延遲時間進行延遲調度。在消息發送時,如果消息的延遲級別delayLevel大於0,將消息的原主題名稱、隊列ID存入消息的屬性中,然后改變消息的主題、隊列與延遲主題與延遲主題所屬隊列,消息將最終轉發到延遲隊列的消費隊列

  定時消息第二個設計關鍵點:消息存儲時如果消息的延遲級別屬性delayLevel大於0,則會備份原主題、原隊列到消息屬性中,通過為不同的延遲級別創建不同的調度任務,當時間到達后執行調度任務,調度任務主要就是根據延遲拉取消息消費進度從延遲隊列中拉取消息,然后從commitlog中加載完整消息,清除延遲級別屬性並恢復原先的主題、隊列,再次創建一條新的消息存入到commitlog中並轉發到消息消費隊列供消息消費者消費。 

  1)消息發送者發送消息,如果發送的消息delayLevel大於0,則改變消息主題為SCHEDULE_TOPIC_XXXX,消息隊列為delayLevel-1。
  2)消息經由Commitlog轉發到消息消費隊列SCHEDULE_TOPIC_XXXX的消息消費隊列0。
  3)定時任務Timer每隔1秒根據上次拉取偏移量從消費隊列中取出所有消息。
  4)根據消息的物理偏移量和消息大小從Commitlog中拉取消息。
  5)根據消息屬性重新創建消息,並恢復原主題topic、原隊列ID,清除delayLevel屬性,存入Commitlog文件。
  6)轉發到原主題topic的消息消費隊列,供消息消費者消費。

 

 

二、順序消息

  RocketMQ支持局部消息順序消費,可以確保同一個消息消費隊列中的消息被順序消費,如果需要做到全局順序消費則可以將主題配置成一個隊列。

  要保證部分消息有序,需要發送端和消費端配合處理。在發送端,要做到把同一業務ID的消息發送到同一個Message Queue;在消費過程中,要做到從同一個Message Queue讀取的消息不被並發處理,這樣才能達到部分有序。

  根據並發消息消費的流程,消息消費包含如下4個步驟:消息隊列負載、消息拉取、消息消費、消息消費進度存儲。

消息隊列負載 

  RocketMQ首先需要通過RebalanceService線程實現消息隊列的負載,集群模式下同一個消費組內的消費者共同承擔其訂閱主題下消息隊列的消費,同一個消息消費隊列在同一時刻只會被消費組內一個消費者消費,一個消費者同一時刻可以分配多個消費隊列。
  如果經過消息隊列重新負載(分配)后,分配到新的消息隊列時,首先需要嘗試向Broker發起鎖定該消息隊列的請求,如果返回加鎖成功則創建該消息隊列的拉取任務,否則將跳過,等待其他消費者釋放該消息隊列的鎖,然后在下一次隊列重新負載時再嘗試加鎖。
  順序消息消費與並發消息消費的第一個關鍵區別:順序消息在創建消息隊列拉取任務時需要在Broker服務器鎖定該消息隊列。

消息拉取

  RocketMQ消息拉取由PullMessageService線程負責,根據消息拉取任務循環拉取消息。如果消息處理隊列未被鎖定,則延遲3s后再將PullRequest對象放入到拉取任務中,如果該處理隊列是第一次拉取任務,則首先計算拉取偏移量,然后向消息服務端拉取消息。

消息消費

  順序消息消費的實現類:org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService

1、ConsumeMessageOrderlyService 構造方法
  初始化實例參數,這里的關鍵是消息任務隊列為LinkedBlockingQueue,消息消費線程池最大運行時線程個數為consumeThreadMin。

2、ConsumeMessageOrderlyService 啟動方法
  如果消費模式為集群模式,啟動定時任務,默認每隔20s執行一次鎖定分配給自己的消息消費隊列。集群模式下順序消息消費在創建拉取任務時並未將ProcessQueue的locked狀態設置為true,在未鎖定消息隊列之前無法執行消息拉取任務,ConsumeMessageOrderlyService以每20s的頻率對分配給自己的消息隊列進行自動加鎖操作,從而消費加鎖成功的消息消費隊列。
  Step1:向Broker(Master主節點)發送鎖定消息隊列,該方法返回成功被當前消費者鎖定的消息消費隊列。
  Step2:將成功鎖定的消息消費隊列相對應的處理隊列設置為鎖定狀態,同時更新加鎖時間。
  Step3:遍歷當前處理隊列中的消息消費隊列,如果當前消費者不持有該消息隊列的鎖,將處理隊列鎖狀態設置為false,暫停該消息消費隊列的消息拉取與消息消費。

3、ConsumeMessageOrderlyService 提交消費任務
  構建消費任務 ConsumeRequest, 並提交到消費線程池中。順序消息的ConsumeRequest消費任務不會直接消費本次拉取的消息,而是在消息消費時從處理隊列中拉取。
  Step1:如果消息處理隊列為丟棄,則停止本次消費任務。
  Step2:根據消息隊列獲取一個對象,然后在消息消費時先申請獨占鎖 objLock。順序消息消費的並發度為消息隊列。也就是一個消息消費隊列同一時刻只會被一個消費線程池中一個線程消費。
  Step3:如果是廣播模式,直接進入消費,無須鎖定處理隊列,因為相互之間無競爭;如果是集群模式,消息消費的前提條件是 processQueue 被鎖定並且鎖未超時。
  Step4:順序消息消費處理邏輯,每一個 ConsumeRequest 消費任務不是以消費消息條數來計算的,而是根據消費時間,默認60s后,本次消費任務結束,由消費組內其他線程繼續消費。
  Step5:每次從處理隊列中按順序取出 consumeBatchSize 消息, 如果未取到消息,則設置 continueConsume 為 false,本次消費結束;
  Step6:執行消息消費鈎子函數(消息消費之前before方法)
  Step7:申請消息消費鎖,如果消息隊列被丟棄,放棄該消息消費隊列的消費。然后執行消息消費監聽器,調用業務方具體消息監聽器執行真正的消息消費處理邏輯,並通知RocketMQ消息消費結果。
  Step8:執行消息消費鈎子函數
  Step9:如果消息消費結果為SUCCESS,執行ProcessQueue的commit方法,並返回待更新的消息消費進度。提交,就是將該批消息從ProcessQueue中移除。
  檢查消息的重試次數,如果消息重試次數大於或等於允許的最大重試次數,將該消息發送到Broker端,該消息在消息服務端最終會進入到DLQ(死信隊列),也就是RocketMQ不會再次消費,需要人工干預。
  Step10:存儲消息消費進度。

消息隊列鎖實現

  消費端通過使用MessageListenerOrderly類解決單MessageQueue 的消息被並發處理的問題,在MessageListenerOrderly實現中名,為每個Consumer Queue加個鎖,消費每個消息前,需要先獲得這個消息對應的Consumer Queue所對應的鎖,這樣保證了同一時間,同一Consumer Queue的消息不被並發消費,但不同Consumer Queue的消息可以並發處理。

總結

  並發消息消費指消費線程池中的線程可以並發地對同一個消息消費隊列的消息進行消費,消費成功后,取出消息處理隊列中最小的消息偏移量作為消息消費進度偏移量存在於消息消費進度存儲文件中,集群模式消息進度存儲在Broker,廣播模式消息進度存儲在消費者端。
  順序消息消費一般使用集群模式,是指消息消費者內的線程池中的線程對消息消費隊列只能串行消費。與並發消息消費最本質的區別是消費消息時必須成功鎖定消息消費隊列,在Broker端會存儲消息消費隊列的鎖占用情況。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM