MQ關於實現最終一致性分布式事務原理解析


本文講述阿里雲官方文檔中關於通過MQ實現分布式事務最終一致性原理

 

概念介紹

  • 事務消息:消息隊列 MQ 提供類似 X/Open XA 的分布式事務功能,通過消息隊列 MQ 事務消息能達到分布式事務的最終一致。

  • 半事務消息:暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處於該種狀態下的消息即半事務消息。

  • 消息回查:由於網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列 MQ 服務端通過掃描發現某條消息長期處於“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該詢問過程即消息回查。

適用場景

事務消息的適用場景示例:

通過購物車進行下單的流程中,用戶入口在購物車系統,交易下單入口在交易系統,兩個系統之間的數據需要保持最終一致,這時可以通過事務消息進行處理。交易系統下單之后,發送一條交易下單的消息到消息隊列 MQ,購物車系統訂閱消息隊列 MQ 的交易下單消息,做相應的業務處理,更新購物車數據。

交互流程

事務消息交互流程如下圖所示。

事務消息

事務消息發送步驟如下:

  1. 發送方將半事務消息發送至消息隊列 MQ 服務端。

  2. 消息隊列 MQ 服務端將消息持久化成功之后,向發送方返回 Ack 確認消息已經發送成功,此時消息為半事務消息。

  3. 發送方開始執行本地事務邏輯。

  4. 發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半事務消息,訂閱方將不會接受該消息。

事務消息回查步驟如下:

  1. 在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。

  2. 發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。

  3. 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。

注意事項

  1. 事務消息的 Group ID 不能與其他類型消息的 Group ID 共用。與其他類型的消息不同,事務消息有回查機制,回查時消息隊列 MQ 服務端會根據 Group ID 去查詢客戶端。

  2. 通過 ONSFactory.createTransactionProducer 創建事務消息的 Producer 時必須指定 LocalTransactionChecker 的實現類,處理異常情況下事務消息的回查。

  3. 事務消息發送完成本地事務后,可在 execute 方法中返回以下三種狀態:

    • TransactionStatus.CommitTransaction:提交事務,允許訂閱方消費該消息。

    • TransactionStatus.RollbackTransaction:回滾事務,消息將被丟棄不允許消費。

    • TransactionStatus.Unknow:暫時無法判斷狀態,等待固定時間以后消息隊列 MQ 服務端向發送方進行消息回查。

  4. 可通過以下方式給每條消息設定第一次消息回查的最快時間:

     
    1. Message message = new Message();
    2. // 在消息屬性中添加第一次消息回查的最快時間,單位秒。例如,以下設置實際第一次回查時間為 120 秒 ~ 125 秒之間
    3. message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
    4. // 以上方式只確定事務消息的第一次回查的最快時間,實際回查時間向后浮動 0 秒 ~ 5 秒;如第一次回查后事務仍未提交,后續每隔 5 秒回查一次


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM