RocketMQ事務消息實現分析


這周RocketMQ發布了4.3.0版本,New Feature中最受關注的一點就是支持了事務消息:

今天花了點時間看了下具體的實現內容,下面是簡單的總結。

RocketMQ事務消息概要

通過馮嘉發布的《RocketMQ 4.3正式發布,支持分布式事務》一文可以看到RocketMQ采用了2PC的方案來提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息。

這張圖說明了事務消息的大致方案,分為兩個邏輯:正常事務消息的發送及提交、事務消息的補償流程

事務消息發送及提交:

  1. 發送消息(half消息)
  2. 服務端響應消息寫入結果
  3. 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)
  4. 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

補償流程:

  1. 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”
  2. Producer收到回查消息,檢查回查消息對應的本地事務的狀態
  3. 根據本地事務狀態,重新Commit或者Rollback

補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。

以上RocketMQ事務消息的整體方案,對於了解Notify的同學應該是很熟悉的,下面是之前Notify相關的資料:

整體方案是完全相同的,只是兩者的Storage不同。

RocketMQ事務消息設計

一階段的消息如何對用戶不可見

事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。

如何做到寫入了消息但是對用戶不可見?——寫入消息數據,但是不創建對應的消息的索引信息。

熟悉RocketMQ的同學應該都清楚,消息在服務端的存儲結構如上,每條消息都會有對應的索引信息,Consumer通過索引讀取消息。

那么實現一階段寫入的消息不被用戶消費(需要在Commit后才能消費),只需要寫入Storage Queue,但是不構建Index Queue即可。

RocketMQ中具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中。

上圖即RocketMQ替換事務消息屬性的代碼實現,替換屬性后這條消息被寫入到TransactionalMessageUtil.buildHalfTopic()的Queue 0中。

(RocketMQ將事務消息一階段發送的消息稱為Half消息讓人費解,采用的2PC的方式,一階段消息稱為Prepare Message或者Pending Message更能體現它的含義)

在完成Storage Queue的寫入后,在appendCallback中,普通消息會去構建消息索引,而如果發現是事務消息,則跳過了創建索引的邏輯。

如果讓一階段的消息對用戶可見

在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。

先說Rollback的情況。對於Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序寫文件的)。

但是區別於這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條消息的最終狀態。

RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息是否狀態已經確定(Commit或者Rollback)。如果一條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。

引入Op消息后,事務消息無論是Commit或者Rollback都會記錄一個Op操作。

Commit相對於Rollback只是在寫入Op消息前創建Half消息的索引。

Op消息的存儲

RocketMQ將Op消息寫入到全局一個特定的Topic中:TransactionalMessageUtil.buildOpTopic()

這個Topic是一個內部的Topic(像Half消息的Topic一樣),不會被用戶消費。

Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行后續的回查操作。

Half消息的索引構建

在執行二階段的Commit操作時,需要構建出Half消息的索引。

一階段的Half消息由於是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,並將Topic和Queue替換成真正的目標的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。

所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息,然后走一遍消息寫入流程。

如何處理二階段失敗的消息

如果二階段失敗了,比如在Commit操作時出現網絡問題導致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。

RocketMQ采用了一種補償機制,稱為“回查”。

Broker端對未確定狀態的消息發起回查,將消息發送到對應的Producer端(同一個Group的Producer),由Producer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。

Broker端通過對比Half消息和Op消息進行事務消息的回查並且推進CheckPoint(記錄那些事務消息的狀態是確定的)。

值得注意的一點是具體實現中,在回查前,系統會執行putBackHalfMsgQueue操作,即將Half消息重新寫一遍到Half消息的Queue中。這么做其實是為了能有效的推進上面的CheckPoint。

RocketMQ事務消息設計總結

以上是RocketMQ事務消息實現的示意圖:

  • 通過寫Half消息的方式來實現一階段消息對用戶不可見
  • 通過Op消息來標記事務消息的狀態
  • 通過讀取Half消息來生成一條新的Normal消息來完成二階段Commit之后消息對Consumer可見
  • 通過Op消息來執行回查

優勢:

  • Half Queue和Op Queue的數量可控,不會隨着Topic的增加而增加
  • 沒有外部依賴,實現自包含

缺陷:

  • 每條事務消息至少需要寫一條Half消息(異常情況可能會有多條)和Normal,寫放大了
  • 所有Half消息都是寫到全局預設的一個內部的Topic,這塊可能性能會有一些問題(所有Topic的事務消息會往一個Topic上寫)
  • 全局Op消息寫一個Topic,回查時間可能會有相互影響


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM