本文講述阿里雲官方文檔中關於通過MQ實現分布式事務最終一致性原理
概念介紹
-
事務消息:消息隊列 MQ 提供類似 X/Open XA 的分布式事務功能,通過消息隊列 MQ 事務消息能達到分布式事務的最終一致。
-
半事務消息:暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處於該種狀態下的消息即半事務消息。
-
消息回查:由於網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列 MQ 服務端通過掃描發現某條消息長期處於“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該詢問過程即消息回查。
適用場景
事務消息的適用場景示例:
通過購物車進行下單的流程中,用戶入口在購物車系統,交易下單入口在交易系統,兩個系統之間的數據需要保持最終一致,這時可以通過事務消息進行處理。交易系統下單之后,發送一條交易下單的消息到消息隊列 MQ,購物車系統訂閱消息隊列 MQ 的交易下單消息,做相應的業務處理,更新購物車數據。
交互流程
事務消息交互流程如下圖所示。
事務消息發送步驟如下:
-
發送方將半事務消息發送至消息隊列 MQ 服務端。
-
消息隊列 MQ 服務端將消息持久化成功之后,向發送方返回 Ack 確認消息已經發送成功,此時消息為半事務消息。
-
發送方開始執行本地事務邏輯。
-
發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半事務消息,訂閱方將不會接受該消息。
事務消息回查步驟如下:
-
在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。
-
發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
-
發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。
注意事項
-
事務消息的 Group ID 不能與其他類型消息的 Group ID 共用。與其他類型的消息不同,事務消息有回查機制,回查時消息隊列 MQ 服務端會根據 Group ID 去查詢客戶端。
-
通過
ONSFactory.createTransactionProducer
創建事務消息的 Producer 時必須指定LocalTransactionChecker
的實現類,處理異常情況下事務消息的回查。 -
事務消息發送完成本地事務后,可在
execute
方法中返回以下三種狀態:-
TransactionStatus.CommitTransaction
:提交事務,允許訂閱方消費該消息。 -
TransactionStatus.RollbackTransaction
:回滾事務,消息將被丟棄不允許消費。 -
TransactionStatus.Unknow
:暫時無法判斷狀態,等待固定時間以后消息隊列 MQ 服務端向發送方進行消息回查。
-
-
可通過以下方式給每條消息設定第一次消息回查的最快時間:
Message message = new Message();
// 在消息屬性中添加第一次消息回查的最快時間,單位秒。例如,以下設置實際第一次回查時間為 120 秒 ~ 125 秒之間
message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只確定事務消息的第一次回查的最快時間,實際回查時間向后浮動 0 秒 ~ 5 秒;如第一次回查后事務仍未提交,后續每隔 5 秒回查一次