可靠消息最終一致性【本地消息表、RocketMQ 事務消息方案】


一、可靠消息最終一致性事務概述


可靠消息最終一致性方案是指當事務發起方執行完成本地事務后並發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。 此方案是利用消息中間件完成,如下圖:

事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務參與方(消息消費方)和消息中間件之間都是通過網絡通信,由於網絡通信的不確定性會導致分布式事務問題。因此可靠消息最終一致性方案要解決以下幾個問題:
【1】本地事務與消息發送的原子性問題:事務發起方在本地事務執行成功后消息必須發出去,否則就丟棄消息。即實現本地事務和消息發送的原子性,要么都成功,要么都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。先來嘗試下這種操作,先發送消息,再操作數據庫這種情況下無法保證數據庫操作與發送消息的一致性,因為可能發送消息成功,據庫操作失敗。

1 begin transaction; 
2     //1.發送MQ 
3     //2.數據庫操作 
4 commit transation;

第二種方案,先進行數據庫操作,再發送消息:這種情況下貌似沒有問題,如果發送 MQ消息失敗,就會拋出異常,導致數據庫事務回滾。但如果是超時異常,數據庫回滾,但 MQ其實已經正常發送了,同樣會導致不一致。

1 begin transaction; 
2     //1.數據庫操作 
3     //2.發送MQ 
4 commit transation;

【2】事務參與方接收消息的可靠性:事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息
【3】消息重復消費的問題:由於步驟2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重復消費。要解決消息重復消費的問題就要實現事務參與方的方法冪等性

二、解決方案【本地消息表方案


本地消息表這個方案最初是 eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然后通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。 下面以注冊送積分為例來說明:下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。

交互流程如下【1】用戶注冊 :用戶服務在本地事務新增用戶和增加 “積分消息日志”。(用戶表和消息表通過本地事務保證一致)下邊是偽代碼,這種情況下,本地數據庫操作與存儲積分消息日志處於同一個事務中,本地數據庫操作與記錄消息日志操作具備原子性。

1 begin transaction; 
2     //1.新增用戶 
3     //2.存儲積分消息日志 
4 commit transation;

【2】定時任務掃描日志:如何保證將消息發送給消息隊列呢?經過第一步消息已經寫到消息日志表中,可以啟動獨立的線程,定時對消息日志表中的消息進行掃描並發送至消息中間件,在消息中間件反饋發送成功后刪除該消息日志,否則等待定時任務下一周期重試。
【3】消費消息:如何保證消費者一定能消費到消息呢?這里可以使用 MQ的ack(即消息確認)機制,消費者監聽MQ,如果消費者接收到消息並且業務處理完成后向MQ 發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發送消息。積分服務接收到”增加積分“消息,開始增加積分,積分增加成功后向消息中間件回應ack,否則消息中間件將重復投遞此消息。由於消息會重復投遞,積分服務的”增加積分“功能需要實現冪等性

三、解決方案【RocketMQ事務消息方案


RocketMQ 是一個來自阿里巴巴的分布式消息中間件,於 2012 年開源,並在 2017 年正式成為 Apache 頂級項目。據了解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3之后的版本正式支持事務消息,為分布式事務實現提供了便利性支持。RocketMQ 事務消息設計主要為解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。在 RocketMQ 4.3后實現了完整的事務消息,實際上是對本地消息表的一個封裝,將本地消息表移動到了 MQ內部,解決Producer 端的消息發送與本地事務執行的原子性問題。

執行流程如下為方便理解我們還以注冊送積分的例子來描述整個流程。Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
【1】Producer 發送事務消息:Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。本例中,Producer 發送 ”增加積分消息“ 到MQ Server。
【2】MQ Server回應消息發送成功:MQ Server接收到 Producer 發送給的消息則回應發送成功。表示 MQ已接收到消息。
【3】Producer 執行本地事務:Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。本例中,Producer 執行添加用戶操作。
【4】消息投遞:若 Producer 本地事務執行成功則自動向 MQServer發送 commit消息,MQ Server接收到 Commit消息后將“增加積分消息” 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息。若Producer 本地事務執行失敗則自動向 MQServer發送 Rollback消息,MQ Server接收到 Rollback消息后將刪除“增加積分消息”。MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里 ack默認自動回應,即程序執行正常則自動回應ack。
【5】事務回查:如果執行 Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。以上主干流程已由RocketMQ實現,對用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態(維護本地事務狀態表)即可。 RoacketMQ提供 RocketMQLocalTransactionListener接口

 1 public interface RocketMQLocalTransactionListener {
 2     /**發送prepare消息成功此方法被回調,該方法用於執行本地事務 
 3     * @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
 4     * @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到
 5     * @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 
 6     */
 7     RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); 
 8      
 9      /**@param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態 
10       * @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 
11       */
12     RocketMQLocalTransactionState checkLocalTransaction(Message msg); 
13  }

【6】發送事務消息:以下是 RocketMQ提供用於發送事務消息的API:

1 TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); 
2 producer.setNamesrvAddr("127.0.0.1:9876"); 
3 producer.start(); 
4 //設置TransactionListener實現 
5 producer.setTransactionListener(transactionListener); 
6 //發送事務消息 
7 SendResult sendResult = producer.sendMessageInTransaction(msg, null);

四、RocketMQ實現可靠消息最終一致性事務


業務說明】通過 RocketMQ中間件實現可靠消息最終一致性分布式事務,模擬兩個賬戶的轉賬交易過程。兩個賬戶在分別在不同的銀行(張三在 bank1、李四在 bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬指定金額。 上述交易步驟,張三扣減金額與給 bank2發轉賬消息,兩個操作必須是一個整體性的事務。


核心代碼程序技術架構如下:

交互流程如下【1】Bank1向 MQ Server發送轉賬消息;
【2】Bank1執行本地事務,扣減金額;
【3】Bank2接收消息,執行本地事務,添加金額;
數據庫在bank1、bank2數據庫中新增 de_duplication交易記錄表(去重表),用於交易冪等控制。

1 DROP TABLE IF EXISTS `de_duplication`; 
2 CREATE TABLE `de_duplication` ( 
3     `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, 
4     `create_time` datetime(0) NULL DEFAULT NULL, 
5     PRIMARY KEY (`tx_no`) USING BTREE 
6 ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

版本依賴在父工程中指定了rocketmq-spring-boot-starter的版本

1 <dependency>
2     <groupId>org.apache.rocketmq</groupId>
3     <artifactId>rocketmq-spring-boot-starter</artifactId>
4     <version>2.0.2</version>
5 </dependency>

配置rocketMQ在application-local.propertis 中配置 rocketMQ nameServer地址及生產組。

1 rocketmq.producer.group = producer_bank2
2 rocketmq.name-server = 127.0.0.1:9876

張三服務層代碼

 1 import com.alibaba.fastjson.JSONObject;
 2 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.messaging.Message;
 5 import org.springframework.messaging.support.MessageBuilder;
 6 import org.springframework.stereotype.Service;
 7 import org.springframework.transaction.annotation.Transactional;
 8 
 9 /**
10  * @author Administrator
11  * @version 1.0
12  **/
13 @Service
14 @Slf4j
15 public class AccountInfoServiceImpl implements AccountInfoService {
16 
17     @Autowired
18     AccountInfoDao accountInfoDao;
19 
20     @Autowired
21     RocketMQTemplate rocketMQTemplate;
22 
23 
24     //向mq發送轉賬消息
25     @Override
26     public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
27 
28         //將accountChangeEvent轉成json
29         JSONObject jsonObject =new JSONObject();
30         jsonObject.put("accountChange",accountChangeEvent);
31         String jsonString = jsonObject.toJSONString();
32         //生成message類型
33         Message<String> message = MessageBuilder.withPayload(jsonString).build();
34         //發送一條事務消息
35         /**
36          * String txProducerGroup 生產組
37          * String destination topic,
38          * Message<?> message, 消息內容
39          * Object arg 參數
40          */
41         rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);
42 
43     }
44 
45     //更新賬戶,扣減金額
46     @Override
47     @Transactional
48     public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
49         //冪等判斷,txNo是在Ctroller中生成的 UUID,全局唯一
50         if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
51             return ;
52         }
53         //扣減金額
54         accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);
55         //添加事務日志
56         accountInfoDao.addTx(accountChangeEvent.getTxNo());
57         if(accountChangeEvent.getAmount() == 3){
58             throw new RuntimeException("人為制造異常");
59         }
60     }
61 }

張三RocketMQLocalTransactionListener編寫 RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。

 1 import com.alibaba.fastjson.JSONObject;
 2 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 3 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 4 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
 5 import org.springframework.messaging.Message;
 6 import org.springframework.transaction.annotation.Transactional;
 7 
 8 /**
 9  * @author Administrator
10  * @version 1.0
11  **/
12 @Component
13 @Slf4j
14 //生產者組與發送消息時定義組相同
15 @RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
16 public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
17 
18     @Autowired
19     AccountInfoService accountInfoService;
20 
21     @Autowired
22     AccountInfoDao accountInfoDao;
23 
24     //事務消息發送后的回調方法,當消息發送給mq成功,此方法被回調
25     @Override
26     @Transactional
27     public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
28 
29         try {
30             //解析message,轉成AccountChangeEvent
31             String messageString = new String((byte[]) message.getPayload());
32             JSONObject jsonObject = JSONObject.parseObject(messageString);
33             String accountChangeString = jsonObject.getString("accountChange");
34             //將accountChange(json)轉成AccountChangeEvent
35             AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
36             //執行本地事務,扣減金額
37             accountInfoService.doUpdateAccountBalance(accountChangeEvent);
38             //當返回RocketMQLocalTransactionState.COMMIT,自動向mq發送commit消息,mq將消息的狀態改為可消費
39             return RocketMQLocalTransactionState.COMMIT;
40         } catch (Exception e) {
41             e.printStackTrace();
42             return RocketMQLocalTransactionState.ROLLBACK;
43         }
46     }
47 
48     //事務狀態回查,查詢是否扣減金額
49     @Override
50     public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
51         //解析message,轉成AccountChangeEvent
52         String messageString = new String((byte[]) message.getPayload());
53         JSONObject jsonObject = JSONObject.parseObject(messageString);
54         String accountChangeString = jsonObject.getString("accountChange");
55         //將accountChange(json)轉成AccountChangeEvent
56         AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
57         //事務id
58         String txNo = accountChangeEvent.getTxNo();
59         int existTx = accountInfoDao.isExistTx(txNo);
60         if(existTx>0){
61             return RocketMQLocalTransactionState.COMMIT;
62         }else{
63             return RocketMQLocalTransactionState.UNKNOWN;
64         }
65     }
66 }

李四服務層代碼

 1 import org.springframework.stereotype.Service;
 2 import org.springframework.transaction.annotation.Transactional;
 3 
 4 /**
 5  * @author Administrator
 6  * @version 1.0
 7  **/
 8 @Service
 9 @Slf4j
10 public class AccountInfoServiceImpl implements AccountInfoService {
11 
12     @Autowired
13     AccountInfoDao accountInfoDao;
14 
15     //更新賬戶,增加金額
16     @Override
17     @Transactional
18     public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
19         log.info("bank2更新本地賬號,賬號:{},金額:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
20         if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
21             return ;
22         }
23         //增加金額
24         accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
25         //添加事務記錄,用於冪等
26         accountInfoDao.addTx(accountChangeEvent.getTxNo());
27         if(accountChangeEvent.getAmount() == 4){
28             throw new RuntimeException("人為制造異常");
29         }
30     }
31 }

【MQ監聽類】:通過實現 RocketMQListener接口監聽目標 Topic

 1 import com.alibaba.fastjson.JSONObject;
 2 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 3 import org.apache.rocketmq.spring.core.RocketMQListener;
 4 
 5 /**
 6  * @author Administrator
 7  * @version 1.0
 8  **/
 9 @Component
10 @Slf4j
11 @RocketMQMessageListener(consumerGroup = "consumer_group_txmsg_bank2",topic = "topic_txmsg")
12 public class TxmsgConsumer implements RocketMQListener<String> {
13 
14     @Autowired
15     AccountInfoService accountInfoService;
16 
17     //接收消息
18     @Override
19     public void onMessage(String message) {
20         log.info("開始消費消息:{}",message);
21         //解析消息
22         JSONObject jsonObject = JSONObject.parseObject(message);
23         String accountChangeString = jsonObject.getString("accountChange");
24         //轉成AccountChangeEvent
25         AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
26         //設置賬號為李四的
27         accountChangeEvent.setAccountNo("2");
28         //更新本地賬戶,增加金額
29         accountInfoService.addAccountInfoBalance(accountChangeEvent);
31     }
32 }

五、總結


可靠消息最終一致性就是保證消息從生產方經過消息中間件傳遞到消費方的一致性,本案例使用了 RocketMQ作為消息中間件,RocketMQ主要解決了兩個功能:
【1】本地事務與消息發送的原子性問題;
【2】事務參與方接收消息的可靠性;
可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景。引入消息機制后,同步的事務操作變為基於消息執行的異步操作, 避免了分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM