一、可靠消息最終一致性事務概述
可靠消息最終一致性方案是指當事務發起方執行完成本地事務后並發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。 此方案是利用消息中間件完成,如下圖:
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務參與方(消息消費方)和消息中間件之間都是通過網絡通信,由於網絡通信的不確定性會導致分布式事務問題。因此可靠消息最終一致性方案要解決以下幾個問題:
【1】本地事務與消息發送的原子性問題:事務發起方在本地事務執行成功后消息必須發出去,否則就丟棄消息。即實現本地事務和消息發送的原子性,要么都成功,要么都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。先來嘗試下這種操作,先發送消息,再操作數據庫:這種情況下無法保證數據庫操作與發送消息的一致性,因為可能發送消息成功,據庫操作失敗。
第二種方案,先進行數據庫操作,再發送消息:這種情況下貌似沒有問題,如果發送 MQ消息失敗,就會拋出異常,導致數據庫事務回滾。但如果是超時異常,數據庫回滾,但 MQ其實已經正常發送了,同樣會導致不一致。
【2】事務參與方接收消息的可靠性:事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息。
【3】消息重復消費的問題:由於步驟2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重復消費。要解決消息重復消費的問題就要實現事務參與方的方法冪等性。
二、解決方案【本地消息表方案 】
本地消息表這個方案最初是 eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然后通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。 下面以注冊送積分為例來說明:下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。
【2】定時任務掃描日志:如何保證將消息發送給消息隊列呢?經過第一步消息已經寫到消息日志表中,可以啟動獨立的線程,定時對消息日志表中的消息進行掃描並發送至消息中間件,在消息中間件反饋發送成功后刪除該消息日志,否則等待定時任務下一周期重試。
【3】消費消息:如何保證消費者一定能消費到消息呢?這里可以使用 MQ的ack(即消息確認)機制,消費者監聽MQ,如果消費者接收到消息並且業務處理完成后向MQ 發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發送消息。積分服務接收到”增加積分“消息,開始增加積分,積分增加成功后向消息中間件回應ack,否則消息中間件將重復投遞此消息。由於消息會重復投遞,積分服務的”增加積分“功能需要實現冪等性。
三、解決方案【RocketMQ事務消息方案 】
RocketMQ 是一個來自阿里巴巴的分布式消息中間件,於 2012 年開源,並在 2017 年正式成為 Apache 頂級項目。據了解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3之后的版本正式支持事務消息,為分布式事務實現提供了便利性支持。RocketMQ 事務消息設計主要為解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。在 RocketMQ 4.3后實現了完整的事務消息,實際上是對本地消息表的一個封裝,將本地消息表移動到了 MQ內部,解決Producer 端的消息發送與本地事務執行的原子性問題。
【執行流程如下】:為方便理解我們還以注冊送積分的例子來描述整個流程。Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
【1】Producer 發送事務消息:Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。本例中,Producer 發送 ”增加積分消息“ 到MQ Server。
【2】MQ Server回應消息發送成功:MQ Server接收到 Producer 發送給的消息則回應發送成功。表示 MQ已接收到消息。
【3】Producer 執行本地事務:Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。本例中,Producer 執行添加用戶操作。
【4】消息投遞:若 Producer 本地事務執行成功則自動向 MQServer發送 commit消息,MQ Server接收到 Commit消息后將“增加積分消息” 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息。若Producer 本地事務執行失敗則自動向 MQServer發送 Rollback消息,MQ Server接收到 Rollback消息后將刪除“增加積分消息”。MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里 ack默認自動回應,即程序執行正常則自動回應ack。
【5】事務回查:如果執行 Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。以上主干流程已由RocketMQ實現,對用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態(維護本地事務狀態表)即可。 RoacketMQ提供 RocketMQLocalTransactionListener接口:
【6】發送事務消息:以下是 RocketMQ提供用於發送事務消息的API:
四、RocketMQ實現可靠消息最終一致性事務
【業務說明】通過 RocketMQ中間件實現可靠消息最終一致性分布式事務,模擬兩個賬戶的轉賬交易過程。兩個賬戶在分別在不同的銀行(張三在 bank1、李四在 bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬指定金額。 上述交易步驟,張三扣減金額與給 bank2發轉賬消息,兩個操作必須是一個整體性的事務。
1 DROP TABLE IF EXISTS `de_duplication`; 2 CREATE TABLE `de_duplication` ( 3 `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, 4 `create_time` datetime(0) NULL DEFAULT NULL, 5 PRIMARY KEY (`tx_no`) USING BTREE 6 ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
【版本依賴】:在父工程中指定了rocketmq-spring-boot-starter的版本
1 <dependency> 2 <groupId>org.apache.rocketmq</groupId> 3 <artifactId>rocketmq-spring-boot-starter</artifactId> 4 <version>2.0.2</version> 5 </dependency>
【配置rocketMQ】:在application-local.propertis 中配置 rocketMQ nameServer地址及生產組。
1 rocketmq.producer.group = producer_bank2 2 rocketmq.name-server = 127.0.0.1:9876
【張三服務層代碼】:
【張三RocketMQLocalTransactionListener】:編寫 RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。
【李四服務層代碼】:
【MQ監聽類】:通過實現 RocketMQListener接口監聽目標 Topic
五、總結
可靠消息最終一致性就是保證消息從生產方經過消息中間件傳遞到消費方的一致性,本案例使用了 RocketMQ作為消息中間件,RocketMQ主要解決了兩個功能:
【1】本地事務與消息發送的原子性問題;
【2】事務參與方接收消息的可靠性;
可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景。引入消息機制后,同步的事務操作變為基於消息執行的異步操作, 避免了分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。