1.什么是可靠消息最終一致性事務
可靠消息最終一致性方案是指當事務發起方執行完成本地事務后並發出一條消息,事務參與方(消息消費者)一定能夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。
此方案是利用消息中間件完成,如下圖:
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件之間,事務參與方(消息消費方)和消息中間件之間都是通過網絡通信,由於網絡通信的不確定性會導致分布式事務問題。
因此可靠消息最終一致性方案要解決以下幾個問題:
1.本地事務與消息發送的原子性問題
本地事務與消息發送的原子性問題即:事務發起方在本地事務執行成功后消息必須發出去,否則就丟棄消息。即實現本地事務和消息發送的原子性,要么都成功,要么都失敗。本地事務與消息發送的原子性問題是實現可靠消息最終一致性方案的關鍵問題。
先來嘗試下這種操作,先發送消息,再操作數據庫:
begin transaction;
//1.發送MQ
//2.數據庫操作
commit transation;
這種情況下無法保證數據庫操作與發送消息的一致性,因為可能發送消息成功,數據庫操作失敗。
你立馬想到第二種方案,先進行數據庫操作,再發送消息:
begin transaction;
//1.數據庫操作
//2.發送MQ
commit transation;
這種情況下貌似沒有問題,如果發送MQ消息失敗,就會拋出異常,導致數據庫事務回滾。但如果是超時異常,數據庫回滾,但MQ其實已經正常發送了,同樣會導致不一致。
2、事務參與方接收消息的可靠性
事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息。
3、消息重復消費的問題
由於網絡2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重復消費。
要解決消息重復消費的問題就要實現事務參與方的方法冪等性。
2.解決方案
上節討論了可靠消息最終一致性事務方案需要解決的問題,本節討論具體的解決方案。
2.1.本地消息表方案
本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然后通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
下面以注冊送積分為例來說明:
下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。
交互流程如下:
1、用戶注冊
用戶服務在本地事務新增用戶和增加 ”積分消息日志“。(用戶表和消息表通過本地事務保證一致)
下邊是偽代碼
begin transaction;
//1.新增用戶
//2.存儲積分消息日志
commit transation;
這種情況下,本地數據庫操作與存儲積分消息日志處於同一個事務中,本地數據庫操作與記錄消息日志操作具備原子性。
2、定時任務掃描日志
如何保證將消息發送給消息隊列呢?
經過第一步消息已經寫到消息日志表中,可以啟動獨立的線程,定時對消息日志表中的消息進行掃描並發送至消息中間件,在消息中間件反饋發送成功后刪除該消息日志,否則等待定時任務下一周期重試。
3、消費消息
如何保證消費者一定能消費到消息呢?
這里可以使用MQ的ack(即消息確認)機制,消費者監聽MQ,如果消費者接收到消息並且業務處理完成后向MQ發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發送消息。
積分服務接收到”增加積分“消息,開始增加積分,積分增加成功后向消息中間件回應ack,否則消息中間件將重復投遞此消息。
由於消息會重復投遞,積分服務的”增加積分“功能需要實現冪等性。
2.2.RocketMQ事務消息方案
RocketMQ 是一個來自阿里巴巴的分布式消息中間件,於 2012 年開源,並在 2017 年正式成為 Apache 頂級項目。據了解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3之后的版本正式支持事務消息,為分布式事務實現提供了便利性支持。
RocketMQ 事務消息設計則主要是為了解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。
在RocketMQ 4.3后實現了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內部,解決 Producer 端的消息發送與本地事務執行的原子性問題。
執行流程如下:
為方便理解我們還以注冊送積分的例子來描述 整個流程。
Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
1、Producer 發送事務消息
Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。
本例中,Producer 發送 ”增加積分消息“ 到MQ Server。
2、MQ Server回應消息發送成功
MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。
3、Producer 執行本地事務
Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。
本例中,Producer 執行添加用戶操作。
4、消息投遞
若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息后將”增加積分消息“ 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息后 將刪除”增加積分消息“ 。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里ack默認自動回應,即程序執行正常則自動回應ack。
5、事務回查
如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主干流程已由RocketMQ實現,對用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態即可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener {
/**
‐ 發送prepare消息成功此方法被回調,該方法用於執行本地事務
‐ @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
‐ @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到
‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
‐ @param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態
‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
- 發送事務消息:
以下是RocketMQ提供用於發送事務消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//設置TransactionListener實現
producer.setTransactionListener(transactionListener);
//發送事務消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
3.RocketMQ實現可靠消息最終一致性事務
3.1.業務說明
本實例通過RocketMQ中間件實現可靠消息最終一致性分布式事務,模擬兩個賬戶的轉賬交易過程。
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬指定金額。
上述交易步驟,張三扣減金額與給bank2發轉賬消息,兩個操作必須是一個整體性的事務。
3.2.程序組成部分
本示例程序組成部分如下:
數據庫:MySQL-5.7.25
包括bank1和bank2兩個數據庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE
微服務框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
微服務及數據庫的關系 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操作張三賬戶,連接數據庫bank1
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操作李四賬戶,連接數據庫bank2
本示例程序技術架構如下:
交互流程如下:
- 1、Bank1向MQ Server發送轉賬消息
- 2、Bank1執行本地事務,扣減金額
- 3、Bank2接收消息,執行本地事務,添加金額
3.3.創建數據庫
導入數據庫腳本:資料\sql\bank1.sql、資料\sql\bank2.sql,已經導過不用重復導入。
創建bank1庫,並導入以下表結構和數據(包含張三賬戶)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
創建bank2庫,並導入以下表結構和數據(包含李四賬戶)
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的賬戶', '2', NULL, 0);
在bank1、bank2數據庫中新增de_duplication,交易記錄表(去重表),用於交易冪等控制。
DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
3.4.啟動RocketMQ
(1)下載RocketMQ服務器
下載地址:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin-release.zip
(2)解壓並啟動
啟動nameserver:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑]
start [rocketmq服務端解壓路徑]/bin/mqnamesrv.cmd
啟動broker:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑]
start [rocketmq服務端解壓路徑]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
3.5 導入dtx-txmsg-demo
dtx-txmsg-demo是本方案的測試工程,根據業務需求需要創建兩個dtx-txmsg-demo工程。
(1)導入dtx-txmsg-demo
導入:資料\基礎代碼\dtx-txmsg-demo到父工程dtx下。
兩個測試工程如下:
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 ,操作張三賬戶,連接數據庫bank1
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 ,操作李四賬戶,連接數據庫bank2
(2)父工程maven依賴說明
在dtx父工程中指定了SpringBoot和SpringCloud版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐dependencies</artifactId>
<version>2.1.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring‐cloud‐dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
在dtx-txmsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq‐spring‐boot‐starter</artifactId>
<version>2.0.2</version>
</dependency>
(3)配置rocketMQ
在application-local.propertis中配置rocketMQ nameServer地址及生產組:
rocketmq.producer.group = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876
3.6 dtx-txmsg-demo-bank1
dtx-txmsg-demo-bank1實現如下功能:
- 1、張三扣減金額,提交本地事務。
- 2、向MQ發送轉賬消息。
1)Dao
@Mapper
@Component
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance+#{amount} where account_no=#
{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double
amount);
@Select("select count(1) from de_duplication where tx_no = #{txNo}")
int isExistTx(String txNo);
@Insert("insert into de_duplication values(#{txNo},now());")
int addTx(String txNo);
}
2)AccountInfoService
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Autowired
private AccountInfoDao accountInfoDao;
/**
* 更新帳號余額‐發送消息
* producer向MQ Server發送消息
*
* @param accountChangeEvent
*/
@Override
public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
//構建消息體
JSONObject jsonObject = new JSONObject();
jsonObject.put("accountChange",accountChangeEvent);
Message<String> message =
MessageBuilder.withPayload(jsonObject.toJSONString()).build();
TransactionSendResult sendResult =
rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message,
null);
log.info("send transcation message body={},result=
{}",message.getPayload(),sendResult.getSendStatus());
}
/**
* 更新帳號余額‐本地事務
* producer發送消息完成后接收到MQ Server的回應即開始執行本地事務
*
* @param accountChangeEvent
*/
@Transactional
@Override
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
log.info("開始更新本地事務,事務號:{}",accountChangeEvent.getTxNo());
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmoun
t() * ‐1);
//為冪等作准備
accountInfoDao.addTx(accountChangeEvent.getTxNo());
if(accountChangeEvent.getAmount() == 2){
throw new RuntimeException("bank1更新本地事務時拋出異常");
}
log.info("結束更新本地事務,事務號:{}",accountChangeEvent.getTxNo());
}
}
3)RocketMQLocalTransactionListener
編寫RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
@Autowired
AccountInfoService accountInfoService;
@Autowired
AccountInfoDao accountInfoDao;
//消息發送成功回調此方法,此方法執行本地事務
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
//解析消息內容
try {
String jsonString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(jsonString);
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class);
//扣除金額
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("executeLocalTransaction 事務執行失敗",e);
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//此方法檢查事務執行狀態
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
RocketMQLocalTransactionState state;
final JSONObject jsonObject = JSON.parseObject(new String((byte[])
message.getPayload()));
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
//事務id
String txNo = accountChangeEvent.getTxNo();
int isexistTx = accountInfoDao.isExistTx(txNo);
log.info("回查事務,事務號: {} 結果: {}", accountChangeEvent.getTxNo(),isexistTx);
if(isexistTx>0){
state= RocketMQLocalTransactionState.COMMIT;
}else{
state= RocketMQLocalTransactionState.UNKNOWN;
}
return state;
}
}
4)Controller
@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
@GetMapping(value = "/transfer")
public String transfer(@RequestParam("accountNo")String accountNo,@RequestParam("amount") Double amount){
String tx_no = UUID.randomUUID().toString();
AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
return "轉賬成功";
}
}
3.7 dtx-txmsg-demo-bank2
dtx-txmsg-demo-bank2需要實現如下功能:
- 1、監聽MQ,接收消息。
- 2、接收到消息增加賬戶金額。
1) Service
注意為避免消息重復發送,這里需要實現冪等。
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
/**
* 消費消息,更新本地事務,添加金額
* @param accountChangeEvent
*/
@Override
@Transactional
public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
log.info("bank2更新本地賬號,賬號:{},金額:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
//冪等校驗
int existTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());
if(existTx<=0){
//執行更新
accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
//添加事務記錄
accountInfoDao.addTx(accountChangeEvent.getTxNo());
log.info("更新本地事務執行成功,本次事務號: {}", accountChangeEvent.getTxNo());
}else{
log.info("更新本地事務執行失敗,本次事務號: {}", accountChangeEvent.getTxNo());
}
}
}
2)MQ監聽類
@Component
@RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2")
@Slf4j
public class TxmsgConsumer implements RocketMQListener<String> {
@Autowired
AccountInfoService accountInfoService;
@Override
public void onMessage(String s) {
log.info("開始消費消息:{}",s);
//解析消息為對象
final JSONObject jsonObject = JSON.parseObject(s);
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
//調用service增加賬號金額
accountChangeEvent.setAccountNo("2");
accountInfoService.addAccountInfoBalance(accountChangeEvent);
}
}
3.8 測試場景
- bank1本地事務失敗,則bank1不發送轉賬消息。
- bank2接收轉賬消息失敗,會進行重試發送消息。
- bank2多次消費同一個消息,實現冪等。
4.小結
可靠消息最終一致性就是保證消息從生產方經過消息中間件傳遞到消費方的一致性,本案例使用了RocketMQ作為消息中間件,RocketMQ主要解決了兩個功能:
- 1、本地事務與消息發送的原子性問題。
- 2、事務參與方接收消息的可靠性。
可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景。引入消息機制后,同步的事務操作變為基於消息執行的異步操作, 避免了分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。