概述
事務消息解決的問題是:Provider本地事務 + 消息投遞 一起執行。解決應用端 和 MQ端兩個獨立的應用的操作,在一個事務里面完成
因為傳統的模式無法保證這一點,比如MQ宕機,或者網絡丟失,而事務消息有一個兩階段確認的這一操作,可以大大降低這種丟失的概率。
但是這個功能和消費者無關,並不能確保該消息能被消費者成功消費。
消費端同樣也存在這個分布式的問題:成功的從MQ中取出消息到本地 + 消費端成功業務上消費這個消息
思考題
RocketMQ有發送同步消息的功能,只有Broker Ack Send_OK狀態碼時才代表消息發送成功,否則阻塞重試,重試2次還失敗就報錯。
既然同步消息可以保證消息成功的寫入到MQ中,為什么還要有事務消息呢?
事務消息解決的問題是:Provider本地事務 + 消息投遞 一起執行。
而同步消息解決的問題是:消息一定投遞成功。
應用場景:
比如工行用戶A向建行用戶B轉賬1萬元。
使用同步消息:
①:工行系統發送一個同步消息給MQ,給B增款1萬元
②:MQ ack反饋發送成功了
③:工行系統給用戶A扣款1萬元
可能的問題,ack Send_OK之后,工行系統拋出異常,沒有給用戶A扣款,但是消息已經發送出去了,B贈款成功了。
使用事務消息:
①:工行系統發一個事務消息給MQ,給B增款1萬元
②:Broker precommit成功,executeLocalTransaction,真正執行工行用戶A扣款1萬元
③:扣款成功ACK Commit給MQ
④:MQ收到Commit ACK時,commit消息,建行系統可以消費這個消息
⑤:如果工行系統扣款異常,則消息雖然prepareCommit在MQ中,但是對建行不可見。另外如果ACK網絡丟失或者延時,MQ如果超時未接收到ACK,會發起重試確認到工行。
最終確保:扣款 + 消息成功投遞 在一個事務里面執行
實現原理
投遞消息:Producer向Broker投遞一個事務消息,並且帶有唯一的key作為參數(冪等性)
①:Broker預提交消息(在Broker本地做了存儲,但是該消息的狀態對Consumer不可見)
②:Broker預提交成功后回調Producer的executeLocalTransaction方法
④:Producer提交業務(比如記錄最終成功投遞的日志),並根據業務提交的執行情況,向Broker反饋Commit 或者回滾
⑤:Broker最終處理
Broker監聽到Producer發來的Commit反饋時,會最終提交這個消息到本地,此時該事務消息對Consumer可見,事務消息最終投遞成功,事務結束
Broker監聽到Producer發來的RollBack反饋時,會最終回滾掉本地的預提交的消息,事務消息最終投遞失敗,事務結束
Broker超時未接受到Producer的反饋,會定時重試調用Producer.checkLocalTransaction,Producer會根據自己的執行情況Ack給Broker
Ack消息的3種狀態
Broker是根據Producer發送過來的狀態碼,來決定下一步的操作(提交、回滾、重試)
①:TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message.
②:TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume.
③:TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status.
Producer實現2個接口方法:
實際上官方的這種模式,重試指的是check的重試而不是execute的重試,因為execute方法只會執行一次,要特別注意。
executeLocalTransaction:最終執行本地事務,並Ack執行狀態給Broker
checkLocalTransaction:檢查Producer是否成功執行了事務,並Ack執行狀態給Broker
實際上是可以寫在一個方法里面的,execute的時候先根據key進行check,已經執行了就Ack OK,沒有的話就執行。執行成功Ack Ok,執行失敗就Ack RollBack。
但是這里官方把這個功能拆分的更細了,降低單一方法的復雜度
事務消息的優點:
①:消息的投遞失敗時(比如MQ宕機或者網絡丟失),Producer是可以感知到的,因為最終的業務提交是在回調的execute方法里面執行的
②:如果消息成功發送到Broker,但是沒有Producer最終Commit Ack時(比如Producer宕機了),該事務消息仍然處於預提交的狀態,不會被消費者讀取到,這保證了消息在P和C端的狀態一致性
總結:其實rocketmq事務消息是在回調里面做的本地事務的提交,以及check本地事務執行情況。保證本地事務的正常提交,以及mq消息正常發送成功。
第一步也就是先發送一個半消息,這個消息對consumer是不可見的,在回調里面做本地事務的正常提交。
源碼地址demo:https://gitee.com/gd1234/springboot-rocketmq
補一下圖:
圖一:rocketmq事務消息設計思路圖:
①:應用模塊遇到要發送事務消息的場景時,先發送prepare消息給MQ。
②:prepare消息發送成功后,應用模塊執行數據庫事務(本地事務)。
③:根據數據庫事務執行的結果,再返回Commit或Rollback給MQ。
④:如果是Commit,MQ把消息下發給Consumer端,如果是Rollback,直接刪掉prepare消息。
⑤:第3步的執行結果如果沒響應,或是超時的,啟動定時任務回查事務狀態(最多重試15次,超過了默認丟棄此消息),處理結果同第4步。
⑥:MQ消費的成功機制由MQ自己保證。
圖二:RocketMQ事務消息實現流程圖
以RocketMQ 4.5.2版本為例,事務消息有專門的一個隊列RMQ_SYS_TRANS_HALF_TOPIC,所有的prepare消息都先往這里放,當消息收到Commit請求后,就把消息再塞到真實的Topic隊列里,供Consumer消費,同時向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一條消息。簡易流程圖如下:
圖三:rocketmq回查本地事務圖