一、分布式事務解決方案之可靠消息最終一致性
1.什么是可靠消息最終一致性事務 可靠消息最終一致性方案是指當事務發起方執行完成本地事務后並發出一條消息,事務參與方(消息消費者)一定能 夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。 此方案是利用消息中間件完成,如下圖: 事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件 之間,事務參與方(消息消費方)和消息中間件之間都是通過網絡通信,由於網絡通信的不確定性會導致分布式事 務問題。
因此可靠消息最終一致性方案要解決以下幾個問題:
2.本地事務與消息發送的原子性問題
本地事務與消息發送的原子性問題即:事務發起方在本地事務執行成功后消息必須發出去,否則就丟棄消息。即實 現本地事務和消息發送的原子性,要么都成功,要么都失敗。本地事務與消息發送的原子性問題是實現可靠消息最 終一致性方案的關鍵問題。 先來嘗試下這種操作,先發送消息,再操作數據庫:
begin transaction; //1.發送MQ //2.數據庫操作 commit transation;
這種情況下無法保證數據庫操作與發送消息的一致性,因為可能發送消息成功,數據庫操作失敗。 你立馬想到第二種方案,先進行數據庫操作,再發送消息:
begin transaction; //1.數據庫操作 //2.發送MQ commit transation;
這種情況下貌似沒有問題,如果發送MQ消息失敗,就會拋出異常,導致數據庫事務回滾。但如果是超時異常,數 據庫回滾,但MQ其實已經正常發送了,同樣會導致不一致。
3、事務參與方接收消息的可靠性
事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息。
4、消息重復消費的問題
由於網絡2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重 復消費。 要解決消息重復消費的問題就要實現事務參與方的方法冪等性。
二、RocketMQ事務消息方案
1、RocketMQ 是一個來自阿里巴巴的分布式消息中間件,於 2012 年開源,並在 2017 年正式成為 Apache 頂級項 目。據了解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之 上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。
Apache RocketMQ 4.3之后的版本正式支持事務消 息,為分布式事務實現提供了便利性支持。
RocketMQ 事務消息設計則主要是為了解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的 設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;
而 RocketMQ 本身提供的存儲機制為事務消息提供了持久化能力;
RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系 統發生異常時依然能夠保證達成事務的最終一致性。
在RocketMQ 4.3后實現了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ 內部,解決 Producer 端的消息發送與本地事務執行的原子性問題。
2、執行流程如下:
為方便理解我們還以注冊送積分的例子來描述 整個流程。
Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。
MQ訂閱方即消息消費方,本例中是積分服務,負責 新增積分。
2.1、Producer 發送事務消息 Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注 意此時這條消息消費者(MQ訂閱方)是無法消費到的。 本例中,Producer 發送 ”增加積分消息“ 到MQ Server。
2.2、MQ Server回應消息發送成功 MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。
2.3、Producer 執行本地事務 Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。 本例中,Producer 執行添加用戶操作。
2.4、消息投遞 若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息后將”增加積 分消息“ 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息后 將刪 除”增加積分消息“ 。 MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里ack默認自動回應,即 程序執行正常則自動回應ack。
2.5、事務回查 如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer 來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。 以上主干流程已由RocketMQ實現,對用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此 只需關注本地事務的執行狀態即可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener { /** ‐ 發送prepare消息成功此方法被回調,該方法用於執行本地事務 ‐ @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id ‐ @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); /** ‐ @param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
發送事務消息: 以下是RocketMQ提供用於發送事務消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //設置TransactionListener實現 producer.setTransactionListener(transactionListener); //發送事務消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null);
RocketMQ實現可靠消息最終一致性事務
3、業務說明 本實例通過RocketMQ中間件實現可靠消息最終一致性分布式事務,模擬兩個賬戶的轉賬交易過程。
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。
交易過程是,張三 給李四轉賬指定金額。
交互流程如下:
a、Bank1向MQ Server發送轉賬消息
b、Bank1執行本地事務,扣減金額
c、Bank2接收消息,執行本地事務,添加金額
創建數據庫跟表結構
創建數據庫 bank1 ,新建一個表
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號', `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼', `account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
再新建一個庫bank2,建表
CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名', `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號', `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼', `account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES (3, '李四的賬戶', '2', NULL, 0);
在bank1、bank2數據庫中新增de_duplication,交易記錄表(去重表),用於交易冪等控制。
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
啟動RocketMQ
(1)下載RocketMQ服務器
下載地址:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-binrelease.zip
(2)解壓並啟動
啟動nameserver:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑] start [rocketmq服務端解壓路徑]/bin/mqnamesrv.cmd
啟動broker:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑] start [rocketmq服務端解壓路徑]/bin/mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
這個一段時間后會自動關閉,需要再重啟,要不然項目鏈接MQ時會一直發送不了消息到mq。// todo:需要找下怎么設置不要老是自動關閉。
4、新建一個bank1項目
實現如下功能: a、張三扣減金額,提交本地事務。 b、向MQ發送轉賬消息。
完整pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ljtao</groupId> <artifactId>dtx-mqtx-bank1</artifactId> <version>0.0.1-SNAPSHOT</version> <name>dtx-mqtx-bank1</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR2</spring-cloud.version> <mybatis.boot.version>2.1.0</mybatis.boot.version> <druid.version>1.1.14</druid.version> <bitwalker.version>1.19</bitwalker.version> <kaptcha.version>2.3.2</kaptcha.version> <swagger.version>2.9.2</swagger.version> <pagehelper.boot.version>1.2.5</pagehelper.boot.version> <fastjson.version>1.2.60</fastjson.version> <oshi.version>3.9.1</oshi.version> <commons.io.version>2.5</commons.io.version> <commons.fileupload.version>1.3.3</commons.fileupload.version> <poi.version>3.17</poi.version> <velocity.version>1.7</velocity.version> <!--<mysql-connector>5.1.39</mysql-connector>--> <mysql-connector>8.0.11</mysql-connector> </properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Mysql驅動包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector}</version>
</dependency>
<!-- Spring Boot Mybatis 依賴 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.boot.version}</version>
</dependency>
<!--阿里數據庫連接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<!--常用工具類 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<!-- pagehelper 分頁插件 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.5</version>
</dependency>
<!-- JWT -->
<dependency>
<groupId>com.auth0</groupId>
<artifactId>java-jwt</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置文件
server.port=9101 master.datasource.url=jdbc:mysql://localhost:3306/springboot-dtx-1?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true master.datasource.password=root master.datasource.username=root master.datasource.driverClassName=com.mysql.cj.jdbc.Driver rocketmq.producer.group = producer_bank1 rocketmq.name-server = 127.0.0.1:9876 logging.level.root = info logging.level.org.springframework.web = info logging.level.com.ljtao.dtxmqtxbank1 = debug
springboot 結合 mybatis 的配置跟編寫,這里就貼代碼了。
數據操作
@Mapper @Component public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance+#{amount} where account_no=# {accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo); @Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo); }
service類
@Slf4j @Service public class AccountInfoService { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private AccountInfoDao accountInfoDao; /** * 更新帳號余額‐發送消息 * producer向MQ Server發送消息 * */ public void sendUpdateAccountBalance(AccountChangeEvent ace){ //構件消息體 JSONObject jsonObject=new JSONObject(); jsonObject.put("accountChange",ace); Message<String> message= MessageBuilder.withPayload(jsonObject.toJSONString()).build(); //發送一條事務消息 /** * String txProducerGroup 生產組 * String destination topic, * Message<?> message, 消息內容 * Object arg 參數 */ TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null); System.out.println("send transcation message body="+message.getPayload()+",result="+sendResult.getSendStatus()); } /** * 更新帳號余額‐本地事務 * producer發送消息完成后接收到MQ Server的回應即開始執行本地事務 */ @Transactional public void doUpdateAccountBalance(AccountChangeEvent ace) { System.out.println("開始更新本地事務,事務號:"+ace.getTxNo()); accountInfoDao.updateAccountBalance(ace.getAccountNo(),ace.getAmount() * -1); //為冪等性做准備 accountInfoDao.addTx(ace.getTxNo()); //測試 if(ace.getAmount()==2){ throw new RuntimeException("bank1更新本地事務時拋出異常"); } log.info("結束更新本地事務,事務號:{}",ace.getTxNo()); } public int transfer(String accountNo,Double amount){ return accountInfoDao.updateAccountBalance(accountNo,amount); } }
MQ消息監聽處理類
@Component @Slf4j @RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener{ @Autowired AccountInfoService accountInfoService; @Autowired AccountInfoDao accountInfoDao; /** ‐ 發送prepare消息成功此方法被回調,該方法用於執行本地事務 ‐ @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id ‐ @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try{ //解析消息內容 String jsonString = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(jsonString); AccountChangeEvent ace = JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class); //扣除金額 accountInfoService.doUpdateAccountBalance(ace); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e){ System.out.println("事務執行失敗"); e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } } /** ‐ @param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ //檢查事務執行狀態 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { RocketMQLocalTransactionState state; final JSONObject jsonObject = JSON.parseObject(new String((byte[]) message.getPayload())); AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class); //事務id String txNo = accountChangeEvent.getTxNo(); int isexistTx = accountInfoDao.isExistTx(txNo); log.info("回查事務,事務號: {} 結果: {}", accountChangeEvent.getTxNo(),isexistTx); if(isexistTx>0){ state= RocketMQLocalTransactionState.COMMIT; }else{ state= RocketMQLocalTransactionState.UNKNOWN; } return state; } }
前端調用接口
@RestController @RequestMapping("ai") public class AccountInfoController { @Autowired private AccountInfoDao accountInfoDao; @Autowired private AccountInfoService accountInfoService; @GetMapping("/transfer") public JsonData transfer(String accountNo,Double amount){ String tx_no= UUID.randomUUID().toString(); AccountChangeEvent ace = new AccountChangeEvent(accountNo, amount, tx_no); accountInfoService.sendUpdateAccountBalance(ace); return JsonData.success(); } /* 測試能不能正常連接數據庫 */ @GetMapping("/fun1/{id}") public JsonData fun1(@PathVariable("id") int id){ return JsonData.success(accountInfoDao.getById(id)); } }
實體類 AccountChangeEvent
@Data @AllArgsConstructor @NoArgsConstructor public class AccountChangeEvent implements Serializable { /** * 賬號 */ private String accountNo; /** * 變動金額 */ private double amount; /** * 事務號 */ private String txNo; }
5、再新建一個bank2項目 ,
需要實現如下功能: a、監聽MQ,接收消息。 b、接收到消息增加賬戶金額。
配置文件
server.port=9102 master.datasource.url=jdbc:mysql://localhost:3306/springboot-dtx-2?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true master.datasource.password=root master.datasource.username=root master.datasource.driverClassName=com.mysql.cj.jdbc.Driver rocketmq.producer.group = producer_bank2 rocketmq.name-server = 127.0.0.1:9876
pom.xml文件跟bank1一樣。
Service 類代碼
注意為避免消息重復發送,這里需要實現冪等。
@Slf4j @Service public class AccountInfoService { @Autowired AccountInfoDao accountInfoDao; /** * 消費消息,更新本地事務,添加金額 */ @Transactional public void addAccountInfoBalance(AccountChangeEvent ace) { log.info("bank2更新本地賬號,賬號:{},金額: {}",ace.getAccountNo(),ace.getAmount()); int tx_no = accountInfoDao.isExistTx(ace.getTxNo()); if(tx_no<=0){ accountInfoDao.updateAccountBalance(ace.getAccountNo(),ace.getAmount()); accountInfoDao.addTx(ace.getTxNo()); log.info("更新本地事務執行成功,本次事務號: {}", ace.getTxNo()); }else{ log.info("更新本地事務執行失敗,本次事務號: {}", ace.getTxNo()); } } }
MQ監聽類
@Component @RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2") @Slf4j public class TxmsgConsumer implements RocketMQListener<String> { @Autowired AccountInfoService accountInfoService; @Override public void onMessage(String s) { log.info("開始消費消息:{}",s); JSONObject jsonObject = JSONObject.parseObject(s); AccountChangeEvent ace = JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class); //為某個賬戶轉賬 ace.setAccountNo("2"); accountInfoService.addAccountInfoBalance(ace); } }
6、接下來測試幾個場景
設置 bank1本地事務失敗,則bank1不發送轉賬消息。
設置 bank2接收轉賬消息存入到賬戶時,拋出異常(失敗),會進行重試發送消息。
設置 bank2多次消費同一個消息,實現冪等。
7、小結:
可靠消息最終一致性就是保證消息從生產方經過消息中間件傳遞到消費方的一致性,
本案例使用了RocketMQ作為 消息中間件,RocketMQ主要解決了兩個功能:
a、本地事務與消息發送的原子性問題。 b、事務參與方接收消息的可靠性。
可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景。引入消息機制后,同步的事務操作變為基於消 息執行的異步操作, 避免了分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。