一、背景
RocketMQ的分布式事務可以稱為“半消息事務”。
二、原理
2.1原理
RocketMQ是靠半消息機制實現分布式事務:
流程:
1.發送方向 MQ 服務端發送事務消息;
2.MQ Server 將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。
3.發送方開始執行本地事務邏輯。
4.發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
5.在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間后 MQ Server 將對該消息發起消息回查。
6.發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
7.發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。
2.2 疑問
極端情況:
是否任何情況下MQ的事務性消息都可以保證雙方的最終一致性?答案是否定的。
考慮上面提到的異常情況“情況2:MQ發送方在步驟(3)執行完本地事務之后commit之前異常退出”。在這種情況下如果如果MQ發送方由於運維上的失誤長時間不重啟MQ發送方,那么MQ在多次回查不成功之后將會丟棄該消息。最終分布式事務的雙方是不能達到最終一致性了。當然這個回查的最大值可以通過修改broker的參數transactionCheckMax來調整。但是過大的transactionCheckMax參數將會導致MQ堆積過多的半包消息,從而危害MQ的穩定性,是個需要權衡的參數。
三、使用
如上圖所示,使用者只需要實現紫色+綠色模塊:
- 紫色代表業務方自定義實現,
- 綠色代表RocketMQ定義業務需要實現的方法。
具體步驟如下:
一、生產者
1.業務方保存本地事務記錄,並初始化狀態。
2.業務方調用sendMessageInTransaction發送半消息到MQ的RMQ_SYS_TRANS_HALF_TOPIC隊列。
3.MQ執行成功,回調業務方executeLocalTransaction方法,也就是業務方的業務邏輯。
4.業務方返回事務狀態給MQ,
- commit: 塞一條消息進REAL_TOPIC真實隊列,等待消費者消費。
- commit/rollback:添加一條消息進RMQ_SYS_TRANS_OP_HALF_TOPIC隊列,代表已處理消息。
- unknow:根據一定的頻率回查業務方本地事務狀態。
5.MQ內部有定時任務,輪詢比較halfoffset、opset,判定哪些未處理(無結果)消息,並回查業務方本地事務狀態。
6.MQ->業務方, 執行checkLocalTransaction方法,查詢本地事務狀態。返回事務狀態給MQ就是步驟4.
需要業務方實現的也就3個方法。
二、消費者
初始化:
自定義實現CommandLineRunner接口,執行startConsumer(): spring 容器啟動完畢后,執行初始化過程。
1. XXConsumerEntry extends ConsumerEntry。init()子類實現,addConsumerAction()添加具體業務操作。指定一個tag,一個ConsumerExecutor().
2.DefaultMQPushConsumer定義消費者,MessageModel=集群消費,指定消費群組。
(注:這里還可以設置很多參數,例如:consumeMessageBatchMaxSize:一次派發消費多少條(默認1),pullBatchSize:一次拉取多少條(默認32))
3.指定消息監聽器:使用base包提供的TracingRocketMQSingleConsumerr。注冊監聽器TracingRocketMQSingleConsumer.SingleMessageListenerConcurrently。實際上就是封裝的RocketMQ的MessageListener接口,定義了consumeMessage()接口,最終會調用步驟1定義的ConsumerAction的execute()。執行消息的消費。
拉取消費
消費者會從MQ長輪詢並發拉取消息,並根據初始化的MessageLister接口執行業務消費邏輯。
4.MQ根據返回的狀態,如果是RECONSUME_LATER重試,就會入SCHEDULE延遲隊列、RETRY重試隊列、DLQ死信隊列。要注意的是:進入死信隊列的消息,需要管理員手動排查問題。
需要業務方實現1個方法。
四、 其它細節
4.1.從哪里開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
CONSUME_FROM_LAST_OFFSET:一個新的訂閱組第一次啟動從隊列的最后位置開始消費,后續再啟動接着上次消費的進度開始消費
CONSUME_FROM_FIRST_OFFSET:一個新的訂閱組第一次啟動從隊列的最前位置開始消費,后續再啟動接着上次消費的進度開始消費
CONSUME_FROM_TIMESTAMP:一個新的訂閱組第一次啟動從指定時間點開始消費,后續再啟動接着上次消費的進度開始消費
4.2.一些問題排查思路
理解了RocketMQ原理,數據流轉,對排查問題可以提供思路。
1.隊列數據膨脹
RMQ_SYS_TRANS_HALF_TOPIC膨脹:可能是死循環了。定時任務反查事務狀態,一直消費不完。
RMQ_SYS_TRANS_OP_HALF_TOPIC膨脹:業務量暴增,接口被刷。
RETRY重試、DLQ死信隊列膨脹:可能是服務不可用。
2.rocketMQ業務異常日志,具體判斷。
3.broker延遲可能reblance失衡。
4.3 唯一消息ID
msgId transacctionId
MessageExt extends Message :transacctionId是Message字段,msgId是MessageExt的拓展字段。
MessageExt的transactionId就是RocketMQ認為的唯一ID,消息在RocketMQn內部流轉,transactionId不變,msgId 會變。看下圖就明白了:
下圖是生產環境rocketMQ 異常時的日志總結,注意圖中newMsgId=msgId realMsgId=transactionId
注意:這里transacctionId就是RocktMQ認定的唯一事務ID。這里是說對應一個事務,但是不一定適合做接口冪等性(消息重復消費問題)。接口冪等性是與業務耦合的,保證多次執行,同一結果。
冪等性如何實現?
- 天然冪等性:純讀接口
- 后天校驗型:狀態機校驗、業務key校驗,等等。