一、導讀
在之前的文章中我們介紹了如何基於RocketMQ搭建生產級消息集群,以及2PC、3PC和TCC等與分布式事務相關的基本概念(沒有讀過的讀者詳見👇推薦閱讀)。在這篇文章中我們將介紹RocketMQ的事務消息相關的內容,並通過一些實踐和大家一起來探索下事務消息如何解決分布式系統中的分布式事務問題。
二、事務消息原理
事務消息特性可以看作是兩階段協議的消息實現方式,用以確保在以消息中間件解耦的分布式系統中本地事務的執行和消息的發送,可以以原子的方式進行。
舉個例子,以某互聯網公司的用戶余額充值為例,因為有充返活動(充值100元贈送20元),優惠比較大,用戶Joe禁不住誘惑用支付寶向自己的余額賬戶充值了100元,支付成功后Joe的余額賬戶有了120元錢。
而該公司的關於用戶余額充值的系統設計是這樣的:
在這個設計流程中,該公司通過自建支付系統完成用戶Joe的支付寶扣款操作,成功后需要更新支付流水的狀態,因為用戶的余額賬戶系統與支付系統之間通過MQ解耦了,所以支付系統在完成支付流水狀態更新后需要通過發送MQ消息到消息中間件服務,然后用戶余額系統作為消費者通過消息消費的方式完成用戶余額的增加操作。
這里有個問題:“支付系統如何確保這筆余額充值消息一定會成功發送到MQ,並且用戶余額系統一定能處理成功呢”?如果支付系統在完成支付訂單狀態更新后,MQ消息發送失敗或者用戶余額系統消息處理失敗的話,都會導致Joe支付扣款成功,而自己的余額賬戶卻沒到賬的情況發生。
為了解決這個問題,按照目前的系統設計是需要“支付系統-MQ服務-用戶余額系統”三者的處理滿足數據的一致性要求。例如,如果支付系統感知到消息發送失敗后還可以進行重新投遞,從而確保支付系統與用戶余額數據的最終一致性。
而上述問題就是事務消息要解決的問題,在具體了解RocketMQ提供的事務消息機制之前,我們先來看下在RocketMQ的早期版本不支持事務消息,或者因為歷史原因選擇的消息中間件本身就不支持事務消息的情況下,一些大公司是怎么解決這個問題的?
早期為了實現基於MQ異步調用的多個服務間,業務邏輯執行要么一起成功、要么一起失敗,具備事務特點,通常會采用可靠消息最終一致性方案,來實現分布式事務。還是以Joe充值這件事來舉例,可靠消息方案實現過程如下:
在可靠消息最終一致性方案中,為了實現分布式事務,需要確保上游服務本地事務的處理與MQ消息的投遞具有原子性,也就是說上游服務本地事務處理成功后要確保消息一定要成功投遞到MQ服務,否則消息就不應該被投遞到MQ服務;同樣,被成功投遞到MQ服務的消息,也一定要被下游服務成功處理,否則就需要重新投遞MQ消息。
為了實現雙向的原子性,可靠消息服務需要對消息進行狀態標記,與此同時還需要對消息進行狀態檢查,從而實現重新投遞及消息狀態的最終一致性。核心流程說明如下:
1、上游服務(支付系統)如何確保完成自身支付成功狀態更新后消息100%的能夠投遞到下游服務(用戶余額系統)指定的Topic中?
在這個流程中上游服務在進行本地數據庫事務操作前,會先發送一個狀態為“待確認”的消息至可靠消息服務,而不是直接將消息投遞到MQ服務的指定Topic。可靠消息服務此時會將該消息記錄到自身服務的消息數據庫中(消息狀態為->待確認),完成后可靠消息服務會回調上游服務表示收到了消息,你們可以進行本地事務的操作了。
之后上游服務就會開啟本地數據庫事務執行業務邏輯操作,這里支付系統就會將該筆支付訂單狀態更新為“已成功”。(注意,這里只是舉個示例場景,在真正的實踐中一般是不會把支付訂單本身的狀態與業務端回調放在一個事務流程中的,關於這部分的詳細說明我們在下面的場景說明中再討論)。
如果上游服務本地數據庫事務執行成功,則繼續向可靠消息服務發送消息確認消息,此時可靠消息服務就會正式將消息投遞到MQ服務,並且同時更新消息數據庫中的消息狀態為“已發送”。(注意,這里可靠消息服務更新消息狀態與投遞消息至MQ也必須是在一個原子操作中,即消息投遞成功則一定要將消息狀態更新為“已發送”,所以在編程的細節中,可靠消息服務一般會先更新消息狀態,然后再進行消息投遞,這樣即使消息投遞失敗,也可以對消息狀態進行回滾->“待確認”,相反如果先進行消息投遞再更新消息狀態,可能就不好控制了)。
相反,如果上游本地數據庫事務執行失敗,則需要向可靠消息服務發送消息刪除消息,可靠消息服務此時就會將消息刪除,這樣就意味着事務在上游消息投遞過程中就被回滾了,而流程也就此結束了,此時上游服務可以需要通過業務邏輯的設計進行重發,這個就不再分布式事務的討論范疇了。
說到這里,大家可能會有疑問了!因為在上述描述中,即使上游服務本地數據庫事務執行成功了,但是在發送確認消息至可靠消息服務的過程中,以及可靠消息服務在投遞消息至MQ服務的過程中,還是會存在失敗的風險,這樣的話還是會導致支付服務更新了狀態,但是用戶余額系統連消息都沒有收到的情況發生?
實際上,實現數據一致性是一個復雜的活。在這個方案中可靠消息服務作為基礎性的服務除了執行正常的邏輯外,還得處理復雜的異常場景。在實現過程中可靠消息服務需要啟動相應的后台線程,不斷輪訓消息的狀態,這里會輪訓消息狀態為“待確認”的消息,並判斷該消息的狀態的持續時間是否超過了規定的時間,如果超過規定時間的消息還處於“待確認”的狀態,就會觸發上游服務狀態詢問機制。
可靠消息服務就會調用上游服務提供的相關借口,詢問這筆消息的處理情況,如果這筆消息在上游服務處理成功,則后台線程就會繼續觸發上圖中的步驟5,更新消息狀態為“已發送”並投遞消息至MQ服務;反之如果這筆消息上游服務處理失敗,可靠消息服務則會進行消息刪除。通過這樣以上機制就確保了“上游服務本地事務成功處理+消息成功投遞”處於一個原子操作了。
2、下游服務(用戶余額系統)如何確保對MQ服務Topic消息的消費100%都能處理成功?
在1的過程中,確保了上游服務邏輯處理與MQ消息的投遞具備原子性,那么當消息被成功投遞到了MQ服務的指定Topic后,下游服務如何才能確保消息的消費一定能被成功處理呢?
在正常的流程中,下游服務等待消費Topic的消息並進行自身本地數據庫事務的處理,如果處理成功則會主動通知可靠消息服務,可靠消息服務此時就會將消息的狀態更新為“已完成”;反之,處理失敗下游服務就無法再主動向可靠消息服務發送通知消息了。
此時,與消息投遞過程中的異常邏輯一樣,可靠消息服務也會啟動相應的后台線程,輪詢一直處於“已發送”狀態的消息,判斷狀態持續時間是否超過了規定時間,如果超時,可靠消息服務就會再次向MQ服務投遞此消息,從而確保消息能被再次消費處理。(注意,也可能出現下游服務處理成功,但是通知消息發送失敗的情況,所以為了確保冪等,下游服務也需要在業務邏輯上做好相應的防重處理)。
RocketMQ事務消息機制
在👆面第2小節的內容中,我們演示了一個自編寫的中間服務+MQ來實現事務消息的示例。但是在現實的工作場景中,開發和維護一套可靠消息服務是一件很耗費資源和成本的事情,實際上,RocketMQ的最新版本(4.3.0+)中已經實現了可靠消息服務的所有功能,並且在保證高並發、高可用、高性能方面做了更為優秀的架構實現。
從設計邏輯上看RocketMQ所支持的分布式事務特性與上節中闡述的可靠消息服務基本上是一致的。只是RocketMQ在實現上相比較於可靠消息服務而言做了更為復雜的設計,並且因為天然與MQ服務本身緊密結合,所以在高可用、可靠性、性能等方面直接繼承了MQ服務本身的架構優勢。
下面我們就結合流程並通過示例代碼的分析來和大家一起理解下利用RocketMQ是如何實現分布式事務操作的?
在應用場景中分布式服務通過MQ通信的過程中,發送消息的一方我們稱之為Producer,接收消費消息的一方我們稱之為Consumer。如果Producer自身業務邏輯本地事務執行成功與否希望和消息的發送保持一個原子性(也就是說如果Producer本地事務執行成功,那么這筆消息就一定要被成功的發送到RocketMQ服務的指定Topic,並且Consumer一定要被消費成功;反之,如果Producer本地事務執行失敗,那么這筆消息就應該被RocketMQ服務器丟棄)的話,RocketMQ是怎么做的呢?
1、Producer選擇使用RockerMQ提供的事務消息方法向RocketMQ服務發送事務消息(設置消息屬性TRAN_MSG=TRUE);
2、RocketMQ服務端在收到消息后會判斷消息的屬性是否為事務消息,如果是普通消息就直接Push給Consumer;如果是事務消息就會對該消息進行特殊處理設置事務ID,並暫時設置該消息對Consumer不可見,之后向Producer返回Pre消息發送狀態(SEND_OK)。
3、之后Producer就會開始執行本地事務邏輯,並設置本地事務處理狀態后向RocketMQ服務器發送該事務消息的確認/回滾消息(COMMIT_MESSAGE/ROLLBACK_MESSAGE)。
4、RocketMQ服務器根據該筆事務消息的本地事務執行狀態決定是否將消息Push給Consumer還是刪除該消息。
5、之后Consumer就會消費該消息,執行Consumer的本地事務邏輯,如果執行成功則向RocketMQ返回“CONSUME_SUCCESS”;反之出現異常則需要返回“RECONSUME_LATER”,以便RocketMQ再次Push該消息,這一點在實際編程中需要控制好。
正常情況下以上就是RocketMQ事務消息的基本運行流程了,但是從異常情況考慮,理論上也是存在Producer遲遲不發送確認或回滾消息的情況。與可靠消息服務一樣,RocketMQ服務端也會設置后台線程去掃描消息狀態,之后會調用Producer的本地checkLocalTransaction函數獲取本地事務狀態后繼續進行第3步操作。
相信看到這里,大家對於RocketMQ的分布式事務消息的理解應該有了一個相對清晰的概念了,那么在代碼中如何編寫呢?
在開發中使用RocketMQ的分布式事務消息Consumer的代碼不需要有什么特別的變化與普通消息Consumer代碼一致就可以

public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_PAY_ACCOUNT"); // Specify name server addresses. consumer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876"); // Subscribe one more more topics to consume. consumer.subscribe("PAY_ACCOUNT", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { System.out.println(new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); }
主要的改變是在Producer代碼,我們需要額外編寫一個實現執行本地事務邏輯,以及檢查本地事務狀態的類。示例代碼如下:

public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
Producer示例代碼:

public class TransactionProducerTest { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("CID_PAY_ACCOUNT"); producer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; try { Map<String, String> paramMap = new HashMap<>(); paramMap.put("type", "6"); paramMap.put("bizOrderId", "15414012438257823"); paramMap.put("payOrderId", "15414012438257823"); paramMap.put("amount", "10"); paramMap.put("userId", "200001"); paramMap.put("tradeType", "charge"); paramMap.put("financeStatus", "0");//財務狀態,應收 paramMap.put("channel", "a");//余額 paramMap.put("tradeTime", "20190101202022"); paramMap.put("nonce_str", "xkdkskskdksk"); //拼湊消息體 Message msg = new Message("PAY_ACCOUNT", "pre",paramMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } Thread.sleep(10*1000); producer.shutdown(); } }
與非事務消息直接調用RocketMQ Client的send方法不同,事務消息發送需要設置事務監聽器類,並調用sendMessageInTransaction方法,而這個方法的具體邏輯也就是上述流程中描述的那樣,具體大家可以看下。
以上代碼只是示例代碼,在實際的項目中我們是需要進行一些封裝設計的,以便與項目上下文環境集成。例如對於Springboot項目,我們一般會編寫一個stater工程進行集成。大家感興趣可以關注下我的github項目,后面我會以真實的項目場景做一些集成示范。
https://github.com/qiaojiang2/springboot-starter
場景說明
目前RocketMQ消息中間件的使用場景比較廣泛,對於需要通過MQ進行異步解耦的分布式應用系統來說,RocketMQ無疑是一個不錯的技術選擇。接下來,我們就以對數據一致性要求非常高的分布式支付系統為例,來看看基於RocketMQ的事務消息適用於哪些特定場景,從而實現支付系統數據的高度一致性。
事實上,支付系統的數據一致性是一個復雜的問題,原因在於支付流程的各個環節都存在異步的不確定性,例如支付系統需要跟第三方渠道進行交互,不同的支付渠道交互流程存在差異,並且有異步支付結果回調的情況。
除此以外,支付系統內部本身又是由多個不同子系統組成,除核心支付系統外,還有賬務系統、商戶通知系統等等,而核心支付系統本身也會被拆分為多個不同的服務模塊,如風控、路由等用以實現不同的功能邏輯。某些場景我們無法通過分布式事務來實現數據一致性,只能通過額外的業務補償手段,如二次輪訓、支付對賬等來實現數據最終一致性。
綜上所述,支付系統是一個復雜的系統,要完全實現數據的一致性單靠某一種手段是無法實現的,大部分情況下我們可以通過額外的業務補償邏輯來實現數據最終一致性,只是這樣補償邏輯需要以更多的業務開發邏輯為代價,並且在時效性上會存在延遲的問題。
舉個例子,支付核心系統支付成功后會更新自己的訂單狀態為支付成功,整個核心交易流程是一個比較實時同步的場景,如果出現數據不一致,會有額外的補償邏輯如二次支付訂單狀態輪詢、T+1日對賬等用以確保支付狀態數據的最終一致性。但是除了核心支付外,支付成功的結果是需要通知到支付賬務系統、以及業務端系統,而為了確保性能,一般后續的通知就不會與主流程一樣設計成實時同步,而是通過MQ異步解耦發送消息給獨立的“通知響應模塊”,而“通知響應模塊”此時就可以通過分布式事務消息來與支付賬戶系統、業務端等系統實現數據一致性,從而減少需要補償手段處理的范圍,提高系統的數據一致性等級和靈敏度。