基於RocketMQ實現分布式事務(半消息事務)


一、背景

RocketMQ的分布式事務可以稱為“半消息事務”。

二、原理

2.1原理

RocketMQ是靠半消息機制實現分布式事務:

事務消息:MQ 提供類似 X/Open XA 的分布事務功能,通過 MQ 事務消息能達到分布式事務的最終一致。
半消息:暫不能投遞的消息,發送方已經將消息成功發送到了 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處於該種狀態下的消息即半消息。
半消息回查:由於網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,MQ 服務端通過掃描發現某條消息長期處於“半消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。

 

流程:

1.發送方向 MQ 服務端發送事務消息;

2.MQ Server 將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。

3.發送方開始執行本地事務邏輯。

4.發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。

5.在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間后 MQ Server 將對該消息發起消息回查。

6.發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。

7.發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。

 

2.2 疑問

極端情況:

是否任何情況下MQ的事務性消息都可以保證雙方的最終一致性?答案是否定的。

考慮上面提到的異常情況“情況2:MQ發送方在步驟(3)執行完本地事務之后commit之前異常退出”。在這種情況下如果如果MQ發送方由於運維上的失誤長時間不重啟MQ發送方,那么MQ在多次回查不成功之后將會丟棄該消息。最終分布式事務的雙方是不能達到最終一致性了。當然這個回查的最大值可以通過修改broker的參數transactionCheckMax來調整。但是過大的transactionCheckMax參數將會導致MQ堆積過多的半包消息,從而危害MQ的穩定性,是個需要權衡的參數。



三、使用

 

 

 

如上圖所示,使用者只需要實現紫色+綠色模塊:

  • 紫色代表業務方自定義實現,
  • 綠色代表RocketMQ定義業務需要實現的方法。

具體步驟如下:

一、生產者

1.業務方保存本地事務記錄,並初始化狀態。

2.業務方調用sendMessageInTransaction發送半消息到MQ的RMQ_SYS_TRANS_HALF_TOPIC隊列。

3.MQ執行成功,回調業務方executeLocalTransaction方法,也就是業務方的業務邏輯。

4.業務方返回事務狀態給MQ,

  1. commit: 塞一條消息進REAL_TOPIC真實隊列,等待消費者消費。
  2. commit/rollback:添加一條消息進RMQ_SYS_TRANS_OP_HALF_TOPIC隊列,代表已處理消息。
  3. unknow:根據一定的頻率回查業務方本地事務狀態。

5.MQ內部有定時任務,輪詢比較halfoffset、opset,判定哪些未處理(無結果)消息,並回查業務方本地事務狀態。

6.MQ->業務方, 執行checkLocalTransaction方法,查詢本地事務狀態。返回事務狀態給MQ就是步驟4.

需要業務方實現的也就3個方法。

 

二、消費者

初始化

自定義實現CommandLineRunner接口,執行startConsumer(): spring 容器啟動完畢后,執行初始化過程。

1. XXConsumerEntry extends ConsumerEntry。init()子類實現,addConsumerAction()添加具體業務操作。指定一個tag,一個ConsumerExecutor().

2.DefaultMQPushConsumer定義消費者,MessageModel=集群消費,指定消費群組。

(注:這里還可以設置很多參數,例如:consumeMessageBatchMaxSize:一次派發消費多少條(默認1),pullBatchSize:一次拉取多少條(默認32))

3.指定消息監聽器:使用base包提供的TracingRocketMQSingleConsumerr。注冊監聽器TracingRocketMQSingleConsumer.SingleMessageListenerConcurrently。實際上就是封裝的RocketMQ的MessageListener接口,定義了consumeMessage()接口,最終會調用步驟1定義的ConsumerAction的execute()。執行消息的消費。

 

拉取消費

消費者會從MQ長輪詢並發拉取消息,並根據初始化的MessageLister接口執行業務消費邏輯。

4.MQ根據返回的狀態,如果是RECONSUME_LATER重試,就會入SCHEDULE延遲隊列、RETRY重試隊列、DLQ死信隊列。要注意的是:進入死信隊列的消息,需要管理員手動排查問題。

需要業務方實現1個方法

 

四、 其它細節

4.1.從哪里開始消費

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

CONSUME_FROM_LAST_OFFSET:一個新的訂閱組第一次啟動從隊列的最后位置開始消費,后續再啟動接着上次消費的進度開始消費

CONSUME_FROM_FIRST_OFFSET:一個新的訂閱組第一次啟動從隊列的最前位置開始消費,后續再啟動接着上次消費的進度開始消費

CONSUME_FROM_TIMESTAMP:一個新的訂閱組第一次啟動從指定時間點開始消費,后續再啟動接着上次消費的進度開始消費

4.2.一些問題排查思路

理解了RocketMQ原理,數據流轉,對排查問題可以提供思路。

1.隊列數據膨脹

RMQ_SYS_TRANS_HALF_TOPIC膨脹:可能是死循環了。定時任務反查事務狀態,一直消費不完。

RMQ_SYS_TRANS_OP_HALF_TOPIC膨脹:業務量暴增,接口被刷。

RETRY重試、DLQ死信隊列膨脹:可能是服務不可用。

2.rocketMQ業務異常日志,具體判斷。

3.broker延遲可能reblance失衡。

4.3 唯一消息ID

msgId  transacctionId

MessageExt extends Message :transacctionId是Message字段,msgId是MessageExt的拓展字段。

MessageExt的transactionId就是RocketMQ認為的唯一ID,消息在RocketMQn內部流轉,transactionId不變,msgId 會變。看下圖就明白了:

下圖是生產環境rocketMQ 異常時的日志總結,注意圖中newMsgId=msgId   realMsgId=transactionId

 

注意:這里transacctionId就是RocktMQ認定的唯一事務ID。這里是說對應一個事務,但是不一定適合做接口冪等性(消息重復消費問題)。接口冪等性是與業務耦合的,保證多次執行,同一結果。

冪等性如何實現?

  • 天然冪等性:純讀接口
  • 后天校驗型:狀態機校驗、業務key校驗,等等。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM