事務消息實現思想
RocketMQ事務消息的實現原理基於兩階段提交和定時事務狀態回查來決定消息最終是提交還是回滾。
1)應用程序在事務內完成相關業務數據落庫后,需要同步調用RocketMQ消息發送接口,發送狀態為prepare的消息。消息發送成功后,RocketMQ服務器會回調RocketMQ消息發送者的事件監聽程序,記錄消息的本地事務狀態,該相關標記與本地業務操作同屬一個事務,確保消息發送與本地事務的原子性。
2)RocketMQ在收到類型為prepare的消息時,會首先備份消息的原主題與原消息消費隊列,然后將消息存儲在主題為RMQ_SYS_TRANS_HALF_TOPIC的消息消費隊列中。
3)RocketMQ消息服務器開啟一個定時任務,消費RMQ_SYS_TRANS_HALF_TOPIC的消息,向消息發送端(應用程序)發起消息事務狀態回查,應用程序根據保存的事務狀態回饋消息服務器事務的狀態(提交、回滾、未知),如果是提交或回滾,則消息服務器提交或回滾消息,如果是未知,待下一次回查,RocketMQ允許設置一條消息的回查間隔與回查次數,如果在超過回查次數后依然無法獲知消息的事務狀態,則默認回滾消息。
事務消息發送流程
RocketMQ事務消息發送者為org.apache.rocketmq.client.producer.TransactionMQProducer。
1、TransactionMQProducer
1 ) TransactionListener transactionListener:事務監聽器,主要定義實現本地事務狀態執行、本地事務狀態回查兩個接口 。
2 ) ExecutorService executorService:事務狀態回查異步執行線程池。
2、TransactionListener
1 ) LocalTransactionState executeLocalTransaction (final Message msg, final Object arg ):執行本地事務 。
2 ) LocalTransactionState checkLocalTransaction (final MessageExt msg):事務消息狀態回查 。
Step1:首先為消息添加屬性,TRAN_MSG 和 PGROUP,分別表示消息為 prepare 消息、消息所屬消息生產者組。設置消息生產者組的目的是在查詢事務消息本地事務狀態時,從該生產者組中隨機選擇一個消息生產者即可,然后通過同步調用方式向RocketMQ發送消息。
Step2:根據消息發送結果執行相應的操作 。
1 )如果消息發送成功,則執行 TransactionListener#executeLocalTransaction 方法,該方法的職責是記錄事務消息的本地事務狀態, 例如可以通過將消息唯一 ID 存儲在數據中,並且該方法與業務代碼處於同一個事務,與業務事務要么一起成功,要么一起失敗。這里是事務消息設計的關鍵理念之一,為后續的事務狀態回查提供唯一依據。
2 )如果消息發送失敗,則設置本次事務狀態為 LocalTransactionState.ROLLBACK_MESSAGE 。
Step3:結束事務 。 根據第二步返回的事務狀態執行提交、回滾或暫時不處理事務。
1)LocalTransactionState.COMMIT_MESSAGE: 提交事務 。
2)LocalTransactionState.ROLLBACK_MESSAGE:回滾事務 。
3)LocalTransactionState.UNKNOW:結束事務 ,但不做任何處理。
在使用事務消息TransactionListener#execute方法除了記錄事務消息狀態后,應該返回LocalTransactionState.UNKNOW,事務消息的提交與回滾通過事務消息狀態回查時再決定是否提交或回滾。
事務消息與非事務消息發送流程的主要區別 :
如果是事務消息則備份消息的原主題與原消息消費隊列,然后將主題變更為 RMQ_SYS_TRANS_HALF TOPIC,消費隊列變更為0,然后消息按照普通消息存儲在 commitlog 文件進而轉發到 RMQ SYS_TRANS_HALF_TOPIC 主題對應的消息消費隊列。也就是說,事務消息在未提交之前並不會存入消息原有主題,自然也不會被消費者消費。既然變更了主題,RocketMQ通常會采用定時任務(單獨的線程 )去消費該主題,然后將該消息在滿足特定條件下恢復消息主題,進而被消費者消費。它與 RocketMQ 定時消息的處理過程如出一轍。
事務消息回查事務狀態
事務消息存儲在消息服務器時主題被替換為RMQ_SYS_TRANS_HALF_TOPIC,執行完本地事務返回本地事務狀態為 UN_KNOW 時,結束事務時將不做任何處理,而是通過事務狀態定時回查以期得到發送端明確的事務操作(提交事務或回滾事務)。
RocketMQ 通過 TransactionalMessageCheckService 線程定時去檢測 RMQ_SYS_TRANS_HALF_TOPIC 主題中的消息,回查消息的事務狀態。TransactionalMessageCheckService 的檢測頻率默認為 1 分鍾,可通過在 broker.conf文件中設置 transactionChecklnterval 來改變默認值,單位為毫秒。
transactionTimeOut:事務的過期時間,只有當消息的存儲時間加上過期時間大於系統當前時間時,才對消息執行事務狀態回查 ,否則在下一次周期中執行事務回查操作。
transactionCheckMax:事務回查最大檢測次數,如果超過最大檢測次數還是無法獲知消息的事務狀態,RocketMQ 將不會繼續對消息進行事務狀態回查,而是直接丟棄即相當於回滾事務 。
事務消息的處理涉及如下兩個主題。
RMQ_SYS_TRANS_HALF_TOPIC:prepare 消息的主題,事務消息首先進入到該主題。
RMQ_SYS_TRANS_OP_HALF_TOPIC:當消息服務器收到事務消息的提交或回滾請求后,會將消息存儲在該主題下。
首先構建事務狀態回查請求消息,核心參數包含消息offsetld、消息 ID (索引)、消息事務 ID、事務消息隊列中的偏移量、消息主題、消息隊列。然后根據消息的生產者組,從中隨機選擇一個消息發送者。最后向消息發送者發送事務回查命令。
事務回查命令的最終處理者為 ClientRemotingProssor 的 processRequest 方法,最終將任務提交到TransactionMQProducer的線程池中執行,最終調用應用程序實現的TransactionListener 的 checkLocalTransaction 方法 ,返回事務狀態。如果事務狀態為 LocalTransactionState#COMMIT_MESSAGE,則向消息服務器發送提交事務消息命令;如果事務狀態為 Loca!TransactionState#ROLLBACK MESSAGE,則向 Broker 服務器發送回滾事務操作;如果事務狀態為 UNOWN,則服務端會忽略此次提交 。
提交或回滾事務
本節繼續探討兩階段提交的第二個階段:提交或回滾事務 。
根據消息所屬的消息隊列獲取 Broker 的 IP與端口 信息,然后發送結束事務命令,其關鍵就是根據本地執行事務的狀態分別發送提交、回滾或“不作為”的命令。Broker服務端的結束事務處理器為:EndTransactionProcessor。
如果結束事務動作為提交事務,則執行提交事務邏輯,其關鍵實現如下 。
1 )首先從結束事務請求命令中獲取消息的物理偏移量(commitlogOffset)。
2 )然后恢復消息的主題、消費隊列,構建新的消息對象。
3 )然后將消息再次存儲在 commitlog 文件中,此時的消息主題則為業務方發送的消息,將被轉發到對應的消息消費隊列,供消息消費者消費。
4 )消息存儲后,刪除 prepare 消息,其實現方法並不是真正的刪除,而是將 prepare消息存儲到 RMQ_SYS_TRANS_OP_HALF_TOPIC 主題中,表示該事務消息(prepare 狀態的消息)已經處理過(提交或回滾),為未處理的事務進行事務回查提供查找依據。
事務的回滾與提交的唯一差別是無須將消息恢復原主題,直接刪除 prepare 消息即可,同樣是將預處理消息存儲在 RMQ_SYS_TRANS_OP_HALF_TOPIC 主題中,表示已處理過該消息。
總結
RocketMQ 事務消息基於兩階段提交和事務狀態回查機制來實現,所謂的兩階段提交,即首先發送 prepare 消息,待事務提交或回滾時發送 commit、rollback 命令。 再結合定時任務,RocketMQ 使用專門的線程以特定的頻率對 RocketMQ 服務器上的 prepare 信息進行處理,向發送端查詢事務消息的狀態來決定是否提交或回滾消息 。
發送事務消息可能存在3種情況:
1)正常情況:步驟1 -> 步驟2 -> 步驟3 -> 步驟4
發送方先發送prepare消息,發送成功后執行本地事務,本地事務的執行成功通知RocketMQ服務器進行commit投遞消息,本地事務執行失敗通知RocketMQ服務器rollback刪除消息不進行投遞。
2)回查情況:步驟1 -> 步驟2 -> 步驟3 -> 步驟5 -> 步驟6 -> 步驟7
發送方發送prepare消息,發送成功后執行本地事務,但由於本地事務可能執行時間較長,如果超過一定時間RocketMQ服務器沒有收到處理結果,不知道是commit還是rollback,這時會根據定時任務進行發送方本地事務狀態的回查,在監聽器中進行事務狀態回查,如果查到本地事務處理結果,則返回commit或rollback狀態;如果還是未查到本地事務處理結果,返回unknow狀態,RocketMQ服務器不會做操作仍會進行定時回查。
監聽器(在發送方中定義)需要實現TransactionListener,有兩個功能:prepare消息發送成功,則在executeLocalTransaction()中執行本地事務;在checkLocalTransaction()中執行本地事務狀態回查邏輯。
3)異常情況:步驟1 -> 步驟2 -> 步驟3 -> 步驟5 -> 步驟6 -> 人工干預隊列
基於情況2的情況,如果回查次數達到上限,將消息放入人工干預隊列。