在RocketMQ4.3.0版本后,開放了事務消息這一特性,對於分布式事務而言,最常說的還是二階段提交協議,那么RocketMQ的事務消息又是怎么一回事呢,這里主要帶着以下幾個問題來探究一下RocketMQ的事務消息:
事務消息是如何實現的
我們有哪些手段來監控事務消息的狀態
事務消息的異常恢復機制
RocketMQ的事務消息是如何實現的
RocketMQ作為一款消息中間件,主要作用就是幫助各個系統進行業務解耦,以及對消息流量有削峰填谷的作用,而對於事務消息,主要是通過消息的異步處理,可以保證本地事務和消息發送同時成功執行或失敗,從而保證數據的最終一致性,
事務消息從誕生到結束的整個時間線流程:
生產者發送消息到broker,該消息是prepare消息,且事務消息的發送是同步發送的方式。 broker接收到消息后,會將該消息進行轉換,所有的事務消息統一寫入Half Topic,該Topic默認是RMQ_SYS_TRANS_HALF_TOPIC ,寫入成功后會給生產者返回成功狀態。 本地生產獲取到該消息的事務Id,進行本地事務處理。 本地事務執行成功提交Commit,失敗則提交Rollback,超時提交或提交Unknow狀態則會觸發broker的事務回查。 若提交了Commit或Rollback狀態,Broker則會將該消息寫入到Op Topic,該Topic默認是RMQ_SYS_TRANS_OP_HALF_TOPIC,該Topic的作用主要記錄已經Commit或Rollback的prepare消息,Broker利用Half Topic和Op Topic計算出需要回查的事務消息。如果是commit消息,broker還會將消息從Half取出來存儲到真正的Topic里,從而消費者可以正常進行消費,如果是Rollback則不進行其他操作 如果本地事務執行超時或返回了Unknow狀態,則broker會進行事務回查。若生產者執行本地事務超過6s則進行第一次事務回查,總共回查15次,后續回查間隔時間是60s,broker在每次回查時會將消息再在Half Topic寫一次。回查次數和時間間隔都是可配置的。 執行事務回查時,生產者可以獲取到事務Id,檢查該事務在本地執行情況,返回狀態同第一次執行本地事務一樣。
從上述流程可以看到事務消息其實只是保證了生產者發送消息成功與本地執行事務的成功的一致性,消費者在消費事務消息時,broker處理事務消息的消費與普通消息是一樣的,若消費不成功,則broker會重復投遞該消息16次,若仍然不成功則需要人工介入。
事務消息的成功投遞是需要經歷三個Topic的
Half Topic:用於記錄所有的prepare消息 Op Half Topic:記錄已經提交了狀態的prepare消息 Real Topic:事務消息真正的Topic,在Commit后會才會將消息寫入該Topic,從而進行消息的投遞
理解清楚事務消息在這三個Topic的流轉就基本理解清楚了RocketMQ的事務消息的處理。接下來我們看看在源碼中是如何使用這三個Topic的。
事務消息是如何處理回查的
在RocketMQ中,消息都是順序寫隨機讀的,以offset來記錄消息的存儲位置與消費位置,所以對於事務消息的prepare消息來說,不可能做到物理刪除,broker啟動時每間隔60s會開始檢查一下有哪些prepare消息需要回查,從上面的分析我們知道,所有prepare消息都存儲在Half Topic中,那么如何從該Topic中取出需要回查的消息進行回查呢?這就需要Op Half Topic以及一個內部的消費進度計算出需要回查的prepare消息進行回查:
Half Topic 默認Topic是RMQ_SYS_TRANS_HALF_TOPIC,建一個隊列,存儲所有的prepare消息
Op Half Topic默認是RMQ_SYS_TRANS_OP_HALF_TOPIC,建立的對列數與Half Topic相同,存儲所有已經確定狀態的prepare消息(rollback與commit狀態),消息內容是該條消息在Half Topic的Offset
Half Topic消費進度,默認消費者是CID_RMQ_SYS_TRANS,每次取prepare消息判斷回查時,從該消費進度開始依次獲取消息。
Op Half Topic消費進度,默認消費者是CID_RMQ_SYS_TRANS,每次獲取prepare消息都需要判斷是否在Op Topic中已存在該消息了,若存在表示該prepare消息已結束流程,不需要再進行事務回查,每次判斷都是從Op Topic中獲取一定消息數量出來進行對比的,獲取的消息就是從Op Topic中該消費進度開始獲取的,最大一次獲取32條。
獲取Half Topic的所有隊列,循環隊列開始檢測需要獲取的prepare消息,實際上Half Topic只有一個隊列。 獲取Half Topic與Op Half Topic的消費進度。 調用fillOpRemoveMap方法,獲取Op Half Topic中已完成的prepare事務消息。 從Half Topic中當前消費進度依次獲取消息,與第3步獲取的已結束的prepare消息進行對比,判斷是否進行回查: 如果Op消息中包含該消息,則不進行回查, 如果不包含,獲取Half Topic中的該消息,判斷寫入時間是否符合回查條件,若是新消息則不處理下次處理,並將消息重新寫入Half Topic,判斷回查次數是否小於15次,寫入時間是否小於72h,如果不滿足就丟棄消息,若滿足則更新回查次數,並將消息重新寫入Half Topic並進行事務回查, 在循環完后重新更新Half Topic與Op Half Topic中的消費進度,下次判斷回查邏輯時,將從最新的消費進度獲取信息。 生產客戶端的ClientRemotingProcessor的processRequest方法會處理服務端的CHECK_TRANSACTION_STATE請求,最后會調用checkLocalTransactionState方法,該方法就是業務方可以自己實現事務消息回查邏輯的地方,並將結果最后用endTransactionOneway方法返回給Broker,該執行邏輯可以通過ClientRemotingProcessor的方法processRequest依次理解就可以了。
我們有哪些手段來監控事務消息的狀態
事務消息主要有三個狀態:
UNKNOW狀態:表示事務消息未確定,可能是業務方執行本地事務邏輯時間耗時過長或者網絡原因等引起的,該狀態會導致broker對事務消息進行回查,默認回查總次數是15次,第一次回查間隔時間是6s,后續每次間隔60s, ROLLBACK狀態,該狀態表示該事務消息被回滾,因為本地事務邏輯執行失敗導致 COMMIT狀態,表示事務消息被提交,會被正確分發給消費者。
那么監控事務消息時,主要是查看該事務消息是否是處於我們想要的狀態,而在事務消息生產者發送prepare消息成功后只能拿到一個transactionId,該id不是的RocketMQ消息存儲的物理offset地址,RocketMQ只有在准備寫入commitlog文件時才會生成真正的msgId,而這里可以獲取的transactionId和msgId都是客戶端生成的一個消息的唯一標識符,我們在這里稱為uniqId,在broker端,會把該uniqId作為一個msgKey寫入消息,所以可以通過該uniqId來查找uniqId的一些狀態:
通過DefaultMQAdminExt的viewMessage(String topic, String msgId)方法可以消息的信息,這里topic參數是RMQ_SYS_TRANS_HALF_TOPIC ,該topic是真正的Half Topic,msgId傳發送prepare消息獲取的uniqId,這樣可以獲取prepare消息在Half Topic真正的offsetMsgId,
通過第一步獲取的offsetMsgId繼續調用viewMessage(String topic, String msgId)方法,但是topic是RMQ_SYS_TRANS_OP_HALF_TOPIC,這樣可以獲取Op Half Topic中該事務消息的狀態,如果存在說明prepare消息已處理,否則可能仍在回查中或已被丟棄
如果在第二步查到了信息可以用uniqId和事務消息真正Topic繼續調用viewMessage(String topic, String msgId)方法獲取消息真正的信息,如果存在說明消息已被投遞,否則該事務消息已被回滾。只通過Op Half Topic是不能確定消息狀態的,這里的sysFlag被設置0,sysFlag是用於確定事務消息狀態。
通過上述三步就可以確定事務消息的狀態。
事務消息的異常恢復機制
事務消息的異常狀態主要有:
生產者提交prepare消息到broker成功,但是當前生產者實例宕機了 生產者提交prepare消息到broker失敗,可能是因為提交的broker已宕機 生產者提交prepare消息到broker成功,執行本地事務邏輯成功,但是broker宕機了未確定事務狀態 生產提交prepare消息到broker成功,但是在進行事務回查的過程中broker宕機了,未確定事務狀態
異常解決:
對於1:事務消息會根據producerGroup搜尋其他的生產者實例進行回查,所以transactionId務必保存在中央存儲中,並且事務消息的pid不能跟其他消息的pid混用。 對於2:當前實例會搜尋其他的可用的broker-master進行提交,因為只有提交prepare消息后才會執行本地事務,所以沒有影響,注意生產者報的是超時異常時,是不會進行重發的。 對於3:因為返回狀態是oneway方式,此時如果消費者未收到消息,需要用手段確定該事務消息的狀態,盡快將broker重啟,broker重啟后會通過回查完成事務消息。 對於4:同3,盡快重啟broker。
---------------------
轉自:https://blog.csdn.net/qq_28632173/article/details/83790243