什么是可靠消息最終一致性事務
可靠消息最終一致性方案是指當事務發起方執行完成本地事務后並發出一條消息,事務參與方(消息消費者)一定能
夠接收消息並處理事務成功,此方案強調的是只要消息發給事務參與方最終事務要達到一致。
此方案是利用消息中間件完成,如下圖:
事務發起方(消息生產方)將消息發給消息中間件,事務參與方從消息中間件接收消息,事務發起方和消息中間件
之間,事務參與方(消息消費方)和消息中間件之間都是通過網絡通信,由於網絡通信的不確定性會導致分布式事
務問題。
因此可靠消息最終一致性方案要解決以下幾個問題:
1.本地事務與消息發送的原子性問題
本地事務與消息發送的原子性問題即:事務發起方在本地事務執行成功后消息必須發出去,否則就丟棄消息。即實
現本地事務和消息發送的原子性,要么都成功,要么都失敗。本地事務與消息發送的原子性問題是實現可靠消息最
終一致性方案的關鍵問題。
先來嘗試下這種操作,先發送消息,再操作數據庫:
begin transaction; //1.發送MQ //2.數據庫操作 commit transation;
這種情況下無法保證數據庫操作與發送消息的一致性,因為可能發送消息成功,數據庫操作失敗。
你立馬想到第二種方案,先進行數據庫操作,再發送消息:
begin transaction; //1.數據庫操作 //2.發送MQ commit transation;
這種情況下貌似沒有問題,如果發送MQ消息失敗,就會拋出異常,導致數據庫事務回滾。但如果是超時異常,數
據庫回滾,但MQ其實已經正常發送了,同樣會導致不一致。
2、事務參與方接收消息的可靠性
事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息。
3、消息重復消費的問題
由於網絡2的存在,若某一個消費節點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重
復消費。
要解決消息重復消費的問題就要實現事務參與方的方法冪等性。
解決方案
上節討論了可靠消息最終一致性事務方案需要解決的問題,本節討論具體的解決方案。
本地消息表方案
本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證數據業務操作和消息的一致性,然后
通過定時任務將消息發送至消息中間件,待確認消息發送給消費方成功再將消息刪除。
下面以注冊送積分為例來說明:
下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。
交互流程如下:
1、用戶注冊
用戶服務在本地事務新增用戶和增加 ”積分消息日志“。(用戶表和消息表通過本地事務保證一致)
下邊是偽代碼
begin transaction; //1.新增用戶 //2.存儲積分消息日志 commit transation;
這種情況下,本地數據庫操作與存儲積分消息日志處於同一個事務中,本地數據庫操作與記錄消息日志操作具備原
子性。
2、定時任務掃描日志
如何保證將消息發送給消息隊列呢?
經過第一步消息已經寫到消息日志表中,可以啟動獨立的線程,定時對消息日志表中的消息進行掃描並發送至消息
中間件,在消息中間件反饋發送成功后刪除該消息日志,否則等待定時任務下一周期重試。
3、消費消息
如何保證消費者一定能消費到消息呢?
這里可以使用MQ的ack(即消息確認)機制,消費者監聽MQ,如果消費者接收到消息並且業務處理完成后向MQ
發送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重
試向消費者來發送消息。
積分服務接收到”增加積分“消息,開始增加積分,積分增加成功后向消息中間件回應ack,否則消息中間件將重復
投遞此消息。
由於消息會重復投遞,積分服務的”增加積分“功能需要實現冪等性。
RocketMQ事務消息方案
RocketMQ 是一個來自阿里巴巴的分布式消息中間件,於 2012 年開源,並在 2017 年正式成為 Apache 頂級項
目。據了解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之
上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3之后的版本正式支持事務消
息,為分布式事務實現提供了便利性支持。
RocketMQ 事務消息設計則主要是為了解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的
設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ
本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系
統發生異常時依然能夠保證達成事務的最終一致性。
在RocketMQ 4.3后實現了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ
內部,解決 Producer 端的消息發送與本地事務執行的原子性問題。
執行流程如下:
為方便理解我們還以注冊送積分的例子來描述 整個流程。
Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責
新增積分。
1、Producer 發送事務消息
Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注
意此時這條消息消費者(MQ訂閱方)是無法消費到的。
本例中,Producer 發送 ”增加積分消息“ 到MQ Server。
2、MQ Server回應消息發送成功
MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。
3、Producer 執行本地事務
Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。
本例中,Producer 執行添加用戶操作。
4、消息投遞
若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息后將”增加積
分消息“ 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息后 將刪
除”增加積分消息“ 。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里ack默認自動回應,即
程序執行正常則自動回應ack。
5、事務回查
如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer
來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主干流程已由RocketMQ實現,對用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此
只需關注本地事務的執行狀態即可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener { /** ‐ 發送prepare消息成功此方法被回調,該方法用於執行本地事務 ‐ @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id ‐ @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); /** ‐ @param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態 ‐ @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
發送事務消息:
以下是RocketMQ提供用於發送事務消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //設置TransactionListener實現 producer.setTransactionListener(transactionListener); //發送事務消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null);
RocketMQ實現可靠消息最終一致性事務
業務說明
本實例通過RocketMQ中間件實現可靠消息最終一致性分布式事務,模擬兩個賬戶的轉賬交易過程。
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。交易過程是,張三
給李四轉賬指定金額。
上述交易步驟,張三扣減金額與給bank2發轉賬消息,兩個操作必須是一個整體性的事務。
啟動RocketMQ
(1)下載RocketMQ服務器
下載地址:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin-
release.zip
(2)解壓並啟動
啟動nameserver:和broke
bank1實現如下功能:
1、張三扣減金額,提交本地事務。
2、向MQ發送轉賬消息。
2)Dao
@Mapper @Component public interface AccountInfoDao { @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Select("select * from account_info where where account_no=#{accountNo}") AccountInfo findByIdAccountNo(@Param("accountNo") String accountNo); @Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo); @Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo); }
3)AccountInfoService
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Autowired(required = false) RocketMQTemplate rocketMQTemplate; //向mq發送轉賬消息 @Override public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { //將accountChangeEvent轉成json JSONObject jsonObject =new JSONObject(); jsonObject.put("accountChange",accountChangeEvent); String jsonString = jsonObject.toJSONString(); log.info(jsonString); //生成message類型 Message<String> message = MessageBuilder.withPayload(jsonString).build(); //發送一條事務消息 /** * String txProducerGroup 生產組 * String destination topic, * Message<?> message, 消息內容 * Object arg 參數 */ rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null); } //更新賬戶,扣減金額 @Override @Transactional public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { //冪等判斷 if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){ return ; } //扣減金額 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1); //添加事務日志 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() == 3){ throw new RuntimeException("人為制造異常"); } } }
4)RocketMQLocalTransactionListener
編寫RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。
/** * @author Administrator * @version 1.0 **/ @Component @Slf4j @RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener { @Autowired AccountInfoService accountInfoService; @Autowired AccountInfoDao accountInfoDao; //事務消息發送后的回調方法,當消息發送給mq成功,此方法被回調 @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { //解析message,轉成AccountChangeEvent String messageString = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String accountChangeString = jsonObject.getString("accountChange"); //將accountChange(json)轉成AccountChangeEvent AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //執行本地事務,扣減金額 accountInfoService.doUpdateAccountBalance(accountChangeEvent); //當返回RocketMQLocalTransactionState.COMMIT,自動向mq發送commit消息,mq將消息的狀態改為可消費 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); return RocketMQLocalTransactionState.ROLLBACK; } } //事務狀態回查,查詢是否扣減金額 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { //解析message,轉成AccountChangeEvent String messageString = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String accountChangeString = jsonObject.getString("accountChange"); //將accountChange(json)轉成AccountChangeEvent AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //事務id String txNo = accountChangeEvent.getTxNo(); log.info("事務狀態回查"); int existTx = accountInfoDao.isExistTx(txNo); if(existTx>0){ return RocketMQLocalTransactionState.COMMIT; }else{ return RocketMQLocalTransactionState.UNKNOWN; } } }
5)Controller
@RestController @Slf4j public class AccountInfoController { @Autowired private AccountInfoService accountInfoService; @GetMapping(value = "/transfer") public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){ //創建一個事務id,作為消息內容發到mq String tx_no = UUID.randomUUID().toString(); AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no); //發送消息 accountInfoService.sendUpdateAccountBalance(accountChangeEvent); return "轉賬成功"; } }
配置信息
server.port=8080 swagger.enable = true spring.datasource.driver-class-name = com.mysql.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/bank1?useUnicode=true spring.datasource.username = root spring.datasource.password = 123456 rocketmq.producer.group = producer_bank1 rocketmq.name-server = 127.0.0.1:9876 logging.level.root = info logging.level.org.springframework.web = info logging.level.com.topcheer.mq = debug
bank2需要實現如下功能:
1、監聽MQ,接收消息。
2、接收到消息增加賬戶金額。
1) Service
注意為避免消息重復發送,這里需要實現冪等。
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; //更新賬戶,增加金額 @Override @Transactional public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) { log.info("bank2更新本地賬號,賬號:{},金額:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount()); if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){ return ; } //增加金額 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount()); //添加事務記錄,用於冪等 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() == 4){ throw new RuntimeException("人為制造異常"); } } }
監聽類
@Component @Slf4j @RocketMQMessageListener(consumerGroup = "consumer_group_txmsg_bank2",topic = "topic_txmsg") public class TxmsgConsumer implements RocketMQListener<String> { @Autowired AccountInfoService accountInfoService; //接收消息 @Override public void onMessage(String message) { log.info("開始消費消息:{}",message); //解析消息 JSONObject jsonObject = JSONObject.parseObject(message); String accountChangeString = jsonObject.getString("accountChange"); //轉成AccountChangeEvent AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //設置賬號為李四的 accountChangeEvent.setAccountNo("2"); //更新本地賬戶,增加金額 accountInfoService.addAccountInfoBalance(accountChangeEvent); } }
測試:
一:能轉賬成功
2020-03-11 16:47:39.276 INFO 23240 --- [nio-8080-exec-1] c.t.m.s.impl.AccountInfoServiceImpl : {"accountChange":{"accountNo":"1","amount":10.0,"txNo":"a611b185-d3b7-45b2-ace3-b50c3b2d2d54"}} 2020-03-11 16:47:40.513 DEBUG 23240 --- [nio-8080-exec-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:47:40.526 DEBUG 23240 --- [nio-8080-exec-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: a611b185-d3b7-45b2-ace3-b50c3b2d2d54(String) 2020-03-11 16:47:40.574 DEBUG 23240 --- [nio-8080-exec-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:47:40.576 DEBUG 23240 --- [nio-8080-exec-1] c.t.m.d.A.updateAccountBalance : ==> Preparing: update account_info set account_balance=account_balance+? where account_no=? 2020-03-11 16:47:40.577 DEBUG 23240 --- [nio-8080-exec-1] c.t.m.d.A.updateAccountBalance : ==> Parameters: -10.0(Double), 1(String) 2020-03-11 16:47:40.579 DEBUG 23240 --- [nio-8080-exec-1] c.t.m.d.A.updateAccountBalance : <== Updates: 1 2020-03-11 16:47:40.580 DEBUG 23240 --- [nio-8080-exec-1] c.topcheer.mq.dao.AccountInfoDao.addTx : ==> Preparing: insert into de_duplication values(?,now()); 2020-03-11 16:47:40.580 DEBUG 23240 --- [nio-8080-exec-1] c.topcheer.mq.dao.AccountInfoDao.addTx : ==> Parameters: a611b185-d3b7-45b2-ace3-b50c3b2d2d54(String) 2020-03-11 16:47:40.582 DEBUG 23240 --- [nio-8080-exec-1] c.topcheer.mq.dao.AccountInfoDao.addTx : <== Updates: 1
2020-03-11 16:47:48.636 INFO 22956 --- [MessageThread_1] com.topcheer.mq.message.TxmsgConsumer : 開始消費消息:{"accountChange":{"accountNo":"1","amount":10.0,"txNo":"a611b185-d3b7-45b2-ace3-b50c3b2d2d54"}} 2020-03-11 16:47:48.648 INFO 22956 --- [MessageThread_1] c.t.m.s.impl.AccountInfoServiceImpl : bank2更新本地賬號,賬號:2,金額:10.0 2020-03-11 16:47:48.671 DEBUG 22956 --- [MessageThread_1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:47:48.686 DEBUG 22956 --- [MessageThread_1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: a611b185-d3b7-45b2-ace3-b50c3b2d2d54(String) 2020-03-11 16:47:48.701 DEBUG 22956 --- [MessageThread_1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:47:48.704 DEBUG 22956 --- [MessageThread_1] c.t.m.d.A.updateAccountBalance : ==> Preparing: update account_info set account_balance=account_balance+? where account_no=? 2020-03-11 16:47:48.704 DEBUG 22956 --- [MessageThread_1] c.t.m.d.A.updateAccountBalance : ==> Parameters: 10.0(Double), 2(String) 2020-03-11 16:47:48.707 DEBUG 22956 --- [MessageThread_1] c.t.m.d.A.updateAccountBalance : <== Updates: 1 2020-03-11 16:47:48.707 DEBUG 22956 --- [MessageThread_1] c.topcheer.mq.dao.AccountInfoDao.addTx : ==> Preparing: insert into de_duplication values(?,now()); 2020-03-11 16:47:48.708 DEBUG 22956 --- [MessageThread_1] c.topcheer.mq.dao.AccountInfoDao.addTx : ==> Parameters: a611b185-d3b7-45b2-ace3-b50c3b2d2d54(String) 2020-03-11 16:47:48.710 DEBUG 22956 --- [MessageThread_1] c.topcheer.mq.dao.AccountInfoDao.addTx : <== Updates: 1
二:張三轉賬失敗的時候
三:李四轉賬失敗的時候
2020-03-11 16:50:56.320 INFO 23240 --- [nio-8080-exec-8] c.t.m.s.impl.AccountInfoServiceImpl : {"accountChange":{"accountNo":"1","amount":4.0,"txNo":"3088dda5-c5a0-4a40-8f9e-715ab50563ff"}} 2020-03-11 16:50:56.325 DEBUG 23240 --- [nio-8080-exec-8] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:50:56.325 DEBUG 23240 --- [nio-8080-exec-8] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: 3088dda5-c5a0-4a40-8f9e-715ab50563ff(String) 2020-03-11 16:50:56.326 DEBUG 23240 --- [nio-8080-exec-8] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:50:56.326 DEBUG 23240 --- [nio-8080-exec-8] c.t.m.d.A.updateAccountBalance : ==> Preparing: update account_info set account_balance=account_balance+? where account_no=? 2020-03-11 16:50:56.327 DEBUG 23240 --- [nio-8080-exec-8] c.t.m.d.A.updateAccountBalance : ==> Parameters: -4.0(Double), 1(String) 2020-03-11 16:50:56.329 DEBUG 23240 --- [nio-8080-exec-8] c.t.m.d.A.updateAccountBalance : <== Updates: 1 2020-03-11 16:50:56.329 DEBUG 23240 --- [nio-8080-exec-8] c.topcheer.mq.dao.AccountInfoDao.addTx : ==> Preparing: insert into de_duplication values(?,now()); 2020-03-11 16:50:56.330 DEBUG 23240 --- [nio-8080-exec-8] c.topcheer.mq.dao.AccountInfoDao.addTx : ==> Parameters: 3088dda5-c5a0-4a40-8f9e-715ab50563ff(String) 2020-03-11 16:50:56.333 DEBUG 23240 --- [nio-8080-exec-8] c.topcheer.mq.dao.AccountInfoDao.addTx : <== Updates: 1 2020-03-11 16:51:14.092 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:51:14.092 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: ef4bc8df-5afd-4dde-b2ee-a04c931d4211(String) 2020-03-11 16:51:14.093 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:52:14.095 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:52:14.096 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: ef4bc8df-5afd-4dde-b2ee-a04c931d4211(String) 2020-03-11 16:52:14.099 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:52:56.326 INFO 23240 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[10.9.9.139:10909] result: true 2020-03-11 16:53:14.093 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:53:14.094 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: ef4bc8df-5afd-4dde-b2ee-a04c931d4211(String) 2020-03-11 16:53:14.095 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:54:14.095 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:54:14.096 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: ef4bc8df-5afd-4dde-b2ee-a04c931d4211(String) 2020-03-11 16:54:14.098 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:55:14.103 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:55:14.104 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: ef4bc8df-5afd-4dde-b2ee-a04c931d4211(String) 2020-03-11 16:55:14.106 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-11 16:56:14.105 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-11 16:56:14.107 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : ==> Parameters: ef4bc8df-5afd-4dde-b2ee-a04c931d4211(String) 2020-03-11 16:56:14.109 DEBUG 23240 --- [pool-1-thread-1] c.t.mq.dao.AccountInfoDao.isExistTx : <== Total: 1
會發現,張三的錢已經扣了,但是李四的錢一直沒有,但是mq會一直循環的去消費這個消息
小結
可靠消息最終一致性就是保證消息從生產方經過消息中間件傳遞到消費方的一致性,本案例使用了RocketMQ作為
消息中間件,RocketMQ主要解決了兩個功能:
1、本地事務與消息發送的原子性問題。
2、事務參與方接收消息的可靠性。
可靠消息最終一致性事務適合執行周期長且實時性要求不高的場景。引入消息機制后,同步的事務操作變為基於消
息執行的異步操作, 避免了分布式事務中的同步阻塞操作的影響,並實現了兩個服務的解耦。