4. 分布式事務解決方案之TCC
TCC是Try、Confirm、Cancel三個詞語的縮寫,TCC要求每個分支事務實現三個操作 :預處理Try、確認Confirm、撤銷Cancel。Try操作做業務檢查及資源預留,Confirm做業務確認操作,Cancel實現一個與Try相反的操作既回滾操作。TM首先發起所有的分支事務的try操作,任何一個分支事務的try操作執行失敗,TM將會發起所有分支事務的Cancel操作,若try操作全部成功,TM將會發起所有分支事務的Confirm操作,其中Confirm/Cancel操作若執行失敗,TM會進行重試。
分支事務失敗的情況 :
TCC分為三個階段 :
- Try階段是做業務檢查(一致性)及資源預留(隔離),此階段僅是一個初步操作,它和后續的Confirm一起才能真正構成一個完整的業務邏輯。
- Confirm階段是做確認提交,Try階段所有分支事務執行成功后開始執行Confirm。通常情況下,采用TCC則認為Confirm階段是不會出錯的。即 :只要Try成功,Confirm一定成功。若Confirm階段真的出錯了,需引入重試機制或人工處理。
- Cancel階段是在業務執行錯誤需要回滾的狀態下執行分支事務的業務取消,預留資源釋放。通常情況下,采用TCC則認為Cancel階段也是一定成功的。若Cancel階段真的出錯了,需引入重試機制或人工處理。
- TM事務管理器
TM事務管理器可以實現為獨立的服務,也可以讓全局事務發起方充當TM的角色,TM獨立出來是為了成為公用組件,是為了考慮結構和軟件復用。
TM在發起全局事務時生成全局事務記錄,全局事務ID貫穿整個分布式事務調用鏈條,用來記錄事務上下文,追蹤和記錄狀態,由於Confirm和Cancel失敗需進行重試,因此需要實現為冪等性是指同一個操作無論請求多少次,其結果都相同。
目前市面上的TCC框架眾多比如下面這幾種 :
Seata也支持TCC,但Seata的TCC模式對Spring Cloud並沒有提供支持。我們的目標是理解TCC原理以及事務協調運作的過程,因此更傾向於輕量級易於理解的框架。
Hmily是一個高性能分布式事務TCC開源框架。基於Java語言來開發(JDK1.8),支持Dubbo,Spring Cloud等RPC框架進行分布式事務。它目前支持以下特性 :
- 支持嵌套事務(Nested transaction support)。
- 采用disruptor框架進行事務日志的異步讀寫,與RPC框架的性能毫無差別。
- 支持SpringBoot-starter項目啟動,使用簡單。
- RPC框架支持 :dubbo、motan、springcloud。
- 本地事務存儲支持 :redis、mongodb、zookeeper、file、mysql。
- 事務日志序列化支持 :java、hessian、kryo、protostuff。
- 采用Aspect AOP切面思想與Spring無縫集成,天然支持集群。
- RPC事務恢復,超時異常恢復等。
Hmily利用AOP對參與分布式事務的本地方法與遠程方法進行攔截處理,通過多方攔截,事務參與者能透明的調用到另一方的Try、Confirm、Cancel方法;傳遞事務上下文;並記錄事務日志,酌情進行補償,重試等。
Hmily不需要事務協調服務,但需要提供一個數據庫(mysql/mongodb/zookeeper/redis/file)來進行日志存儲。
Hmily實現的TCC服務與普通的服務一樣,只需要暴露一個接口,也就是它的Try業務。Confirm/Cancel業務邏輯,只是因為全局事務提交/回滾的需要才提供的,因此Confirm/Cancel業務只需要被Hmily TCC事務框架發現即可,不需要被調用它的其他業務服務所感知。
官網介紹 :https://dromara.org/website/zh-cn/docs/hmily/index.html
TCC需要注意三種異常處理分別是空回滾、冪等、懸掛 :
空回滾 :
在沒有調用TCC資源Try方法的情況下,調用來二階段的Cancel方法,Cancel方法需要識別出這是一個空回滾,然后直接返回成功。
出現原因是當一個分支事務所在服務宕機或網絡異常,分支事務調用記錄為失敗,這個時候其實是沒有執行Try階段,當故障恢復后,分布式事務進行回滾則會調用二階段的Cancel方法,從而形成空回滾。
解決思路是關鍵就是要識別出這個空回滾。思路很簡單就是需要知道一階段是否執行,如果執行來,那就是正常回滾;如果沒執行,那就是空回滾。前面已經說過TM在發起全局事務時生成全局事務記錄,全局事務ID貫穿整個分布式事務調用鏈條。再額外增加一張分支事務記錄表,其中有全局事務ID和分支事務ID,第一階段Try方法里會插入一條記錄,表示一階段執行來。Cancel接口里讀取該記錄,如果該記錄存在,則正常回滾;如果該記錄不存在,則是空回滾。
冪等 :
通過前面介紹已經了解到,為了保證TCC二階段提交重試機制不會引發數據不一致,要求TCC的二階段Try、Confirm和Cancel接口保證冪等,這樣不會重復使用或者釋放資源。如果冪等控制沒有做好,很有可能導致數據不一致等嚴重問題。
解決思路在上述 “分支事務記錄”中增加執行狀態,每次執行前都查詢該狀態。
懸掛 :
懸掛就是對於一個分布式事務,其二階段Cancel接口比Try接口先執行。
出現原因是在RPC調用分支事務try時,先注冊分支事務,再執行RPC調用,如果此時RPC調用的網絡發生擁堵,通常RPC調用是有超時時間的,RPC超時以后,TM就會通知RM回滾該分布式事務,可能回滾完成后,RPC請求才到達參與者真正執行,而一個Try方法預留的業務資源,只有該分布式事務才能使用,該分布式事務第一階段預留的業務資源就再也沒有人能夠處理了,對於這種情況,我們就稱為懸掛,即業務資源預留后無法繼續處理。
解決思路是如果二階段執行完成,那一階段就不能再繼續執行。在執行一階段事務時判斷在該全局事務下,“分支事務記錄”表中是否已經有二階段事務記錄,如果有則不執行Try。
舉例,場景為A轉賬30元給B,A和B賬戶在不同的服務。
方案 1 :
賬戶A
try: 檢查余額是否夠30元
扣減30元 confirm:
空
cancel: 增加30元
賬戶B
try: 增加30元
confirm: 空
cancel: 減少30元
方案1說明:
1)賬戶A,這里的余額就是所謂的業務資源,按照前面提到的原則,在第一階段需要檢查並預留業務資源,因此, 我們在扣錢 TCC 資源的 Try 接口里先檢查 A 賬戶余額是否足夠,如果足夠則扣除 30 元。 Confirm 接口表示正式 提交,由於業務資源已經在 Try 接口里扣除掉了,那么在第二階段的 Confirm 接口里可以什么都不用做。Cancel 接口的執行表示整個事務回滾,賬戶A回滾則需要把 Try 接口里扣除掉的 30 元還給賬戶。
2)賬號B,在第一階段 Try 接口里實現給賬戶B加錢,Cancel 接口的執行表示整個事務回滾,賬戶B回滾則需要把 Try 接口里加的 30 元再減去。
方案1的問題分析:
1)如果賬戶A的try沒有執行在cancel則就多加了30元。
2)由於try,cancel、confirm都是由單獨的線程去調用,且會出現重復調用,所以都需要實現冪等。
3)賬號B在try中增加30元,當try執行完成后可能會其它線程給消費了。
4)如果賬戶B的try沒有執行在cancel則就多減了30元。
問題解決:
1)賬戶A的cancel方法需要判斷try方法是否執行,正常執行try后方可執行cancel。
2)try、cancel、confirm方法實現冪等。
3)賬戶B在try方法中不允許更新賬戶金額,在confirm中更新賬戶金額。
4)賬戶B的cancel方法需要判斷try方法是否執行,正常執行try后方可執行cancel。
優化方案:
賬戶A :
try:
try冪等校驗
try懸掛處理
檢查余額是否夠30元
扣減30元
confirm:
空
cancel:
cancel冪等校驗
cancel空回滾處理
增加可用余額30元
賬戶B :
try:
空
confirm:
confirm冪等校驗
正式增加30元
cancel:
空
通過Hmily實現TCC分布式事務,模擬兩個賬戶的轉賬交易過程。
兩個賬戶分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務。交易過程是,張三給李四轉賬制定金額。
上述交易步驟,要么一起成功,要么一起失敗,必須是一個整體性事務。
數據庫:MySQL-5.7.25
JDK:64位 jdk1.8.0_201 微服務:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE Hmily:hmily-springcloud.2.0.4-RELEASE
微服務及數據庫的關系 :
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 銀行1,操作張三賬戶, 連接數據庫bank1 dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 銀行2,操作李四賬戶,連接數據庫bank2
服務注冊中心:dtx/discover-server
創建hmily數據庫,用於存儲hmily框架記錄的數據。
CREATE DATABASE hmily
CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;
創建bank1庫,並導入以下表結構和數據(包含張三賬戶)
CREATE DATABASE bank1
CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;
創建bank2庫,並導入以下表結構和數據(包含李四賬戶)
CREATE DATABASE bank2
CHARACTER SET ‘utf8’ COLLATE ‘utf8_general_ci’;
DROP TABLE IF EXISTS account_info
; CREATE TABLE account_info
(id
bigint(20) NOT NULL AUTO_INCREMENT,account_name
varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘戶 主姓名’,account_no
varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘銀行 卡號’,account_password
varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帳戶密碼’,account_balance
double NULL DEFAULT NULL COMMENT ‘帳戶余額’,
PRIMARY KEY (id
) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO account_info
VALUES (2, ‘張三的賬戶’, ‘1’, ‘’, 10000);
每個數據庫都創建try、confirm、cancel三張日志表:
CREATE TABLE `local_try_log` (
`tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_confirm_log` (
`tx_no` varchar(64) NOT NULL COMMENT
`create_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `local_cancel_log` (
`tx_no` varchar(64) NOT NULL COMMENT `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
(1)引入maven依賴
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily‐springcloud</artifactId> <version>2.0.4‐RELEASE</version>
</dependency>
(2)配置hmily
application.yml :
org: dromara:
hmily :
serializer : kryo
recoverDelayTime : 128
retryMax : 30
scheduledDelay : 128
scheduledThreadMax : 10
repositorySupport : db
started: true
hmilyDbConfig :
driverClassName : com.mysql.jdbc.Driver
url : jdbc:mysql://localhost:3306/bank?useUnicode=true
username : root
password : root
新增配置類接收application.yml中的Hmily配置信息,並創建HmilyTransactionBootstrap Bean:
@Bean
public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){
HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);
hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));
hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime")));
hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax")));
hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay")));
hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax")));
hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport"));
hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started")));
HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();
hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName"));
hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url"));
hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username"));
hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));
hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
return hmilyTransactionBootstrap;
}
啟動類增加@EnableAspectJAutoProxy並增加org.dromara.hmily的掃描項:
@SpringBootApplication
@EnableDiscoveryClient
@EnableHystrix
@EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"}) @ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"}) public class Bank1HmilyServer {
public static void main(String[] args) { SpringApplication.run(Bank1HmilyServer.class, args);
} }
dtx-tcc-demo-bank1實現try和cancel方法,如下 :
try:
try冪等校驗
try懸掛處理
檢查余額是夠扣減金額
扣減金額
confirm:
空
cancel:
cancel冪等校驗
cancel空回滾處理
增加可用余額
- Dao
@Mapper
@Component
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance - #{amount} where account_balance>=#{amount} and account_no=#{accountNo} ")
int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
/** * 增加某分支事務try執行記錄 * @param localTradeNo 本地事務編號 * @return */
@Insert("insert into local_try_log values(#{txNo},now());")
int addTry(String localTradeNo);
@Insert("insert into local_confirm_log values(#{txNo},now());")
int addConfirm(String localTradeNo);
@Insert("insert into local_cancel_log values(#{txNo},now());")
int addCancel(String localTradeNo);
/** * 查詢分支事務try是否已執行 * @param localTradeNo 本地事務編號 * @return */
@Select("select count(1) from local_try_log where tx_no = #{txNo} ")
int isExistTry(String localTradeNo);
/** * 查詢分支事務confirm是否已執行 * @param localTradeNo 本地事務編號 * @return */
@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
int isExistConfirm(String localTradeNo);
/** * 查詢分支事務cancel是否已執行 * @param localTradeNo 本地事務編號 * @return */
@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
int isExistCancel(String localTradeNo);
}
2)try和cancel方法
@Slf4j
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
private AccountInfoDao accountInfoDao;
@Autowired
private Bank2Client bank2Client;
/** * 只要標記@Hmily就是try方法,在注解中指定confirm、cancel兩個方法的名字 * * @param accountNo * @param amount */
@Hmily(confirmMethod = "commit", cancelMethod = "rollback")
@Transactional(rollbackFor = Exception.class)
@Override
public void updateAccountBalance(String accountNo, Double amount) {
// 事務id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("Bank1 Service begin try ..." + transId);
int existTry = accountInfoDao.isExistTry(transId);
// 冪等判斷 判斷local_try_log表中是否有try日志記錄,如果有不再執行
// try冪等校驗
if (existTry > 0) {
log.info("Bank1 Service 已經執行try,無需重復執行,事務id :{}", transId);
return;
}
// try懸掛處理,如果cancel、confirm有一個已經執行了,try不再執行
if (accountInfoDao.isExistCancel(transId) > 0 || accountInfoDao.isExistConfirm(transId) > 0) {
log.info("Bank1 Service 已經執行confirm或cancel,懸掛處理,事務id :{}", transId);
return;
}
// 從賬戶扣減
if (accountInfoDao.subtractAccountBalance(accountNo, amount) <= 0) {
// 扣減失敗
throw new HmilyRuntimeException("bank1 exception, 扣減失敗,事務id :{}" + transId);
}
// 增加本地事務try成功記錄,用於冪等性控制標識
accountInfoDao.addTry(transId);
// 遠程調用bank2
if (bank2Client.transfer(amount)) {
throw new HmilyRuntimeException("bank2Client exception,事務id:{}"+transId);
}
// 異常一定要拋在Hmily里面
if (amount == 10) {
throw new RuntimeException("Bank2 make exception 10");
}
log.info("Bank2 Service end try .." + transId);
}
@Transactional(rollbackFor = Exception.class)
public void commit(String accountNo, double amount) {
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("Bank1 Service begin commit .." + transId);
}
@Transactional(rollbackFor = Exception.class)
public void rollback(String accountNo, double amount) {
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("Bank1 Service begin rollback..." + transId);
// 空回滾處理,try階段沒有執行什么也不用做。
if (accountInfoDao.isExistTry(transId) == 0) {
log.info("Bank1 try 階段失敗 。。無需rollback " + transId);
return;
}
// 冪等性校驗,已經執行過了,什么也不用做
if (accountInfoDao.isExistCancel(transId) > 0) {
log.info("Bank1 已經執行過rollback 。。無需再次rollback " + transId);
return;
}
// 再將金額加回賬戶
accountInfoDao.addAccountBalance(accountNo, amount);
// 添加cancel日志,用於冪等性控制標識
accountInfoDao.addCancel(transId);
log.info("Bank1 Service end rollback ... " + transId);
}
}
3)feignClient
@FeignClient(value = "seata-demo-bank2", fallback = Bank2Client.class)
public interface Bank2Client {
@GetMapping("/bank2/transfer")
@Hmily
Boolean transfer(@RequestParam("amount") Double amount);
}
- Controller
@RestController
public class Bank1Controller {
@Autowired
private AccountInfoService accountInfoService;
@RequestMapping("/transfer")
public String test(@RequestParam("amount") Double amount) {
accountInfoService.updateAccountBalance("1", amount);
return "bank1" + amount;
}
}
dtx-tcc-demo-bank2實現如下功能 :
try:
空
confirm:
confirm冪等校驗
正式增加金額
cancel:
空
1)Dao
@Component
@Mapper
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
/** * 增加某分支事務try執行記錄 * @param localTradeNo 本地事務編號 * @return */
@Insert("insert into local_try_log values(#{txNo},now());")
int addTry(String localTradeNo);
@Insert("insert into local_confirm_log values(#{txNo},now());")
int addConfirm(String localTradeNo);
@Insert("insert into local_cancel_log values(#{txNo},now());")
int addCancel(String localTradeNo);
/** * 查詢分支事務try是否已執行 * @param localTradeNo 本地事務編號 * @return */
@Select("select count(1) from local_try_log where tx_no = #{txNo} ")
int isExistTry(String localTradeNo);
/** * 查詢分支事務confirm是否已執行 * @param localTradeNo 本地事務編號 * @return */
@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
int isExistConfirm(String localTradeNo);
/** * 查詢分支事務cancel是否已執行 * @param localTradeNo 本地事務編號 * @return */
@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
int isExistCancel(String localTradeNo);
}
2)實現confirm方法
@Slf4j
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
private AccountInfoDao accountInfoDao;
@Transactional(rollbackFor = Exception.class)
@Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
@Override
public void updateAccountBalance(String accountNo, Double amount) {
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("Bank2 Service Begin try ... " + transId);
}
@Transactional(rollbackFor = Exception.class)
public void confirmMethod(String accountNo, Double amount) {
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("Bank2 Service commit ..." + transId);
// 冪等性校驗,已經執行過了,什么也不用做
if (accountInfoDao.isExistConfirm(transId) > 0) {
log.info("Bank2 已經執行過confirm 。。無需再次confirm " + transId);
return;
}
// 正式增加金額
accountInfoDao.addAccountBalance(accountNo, amount);
// 添加confirm日志
accountInfoDao.addConfirm(transId);
}
@Transactional(rollbackFor = Exception.class)
public void cancelMethod(String accountNo, Double amount) {
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("Bank2 Service begin cancel ... " + transId);
}
}
3)Controller
@RestController
public class Bank2Controller {
@Autowired
private AccountInfoService accountInfoService;
@RequestMapping("/transfer")
public Boolean test2(@RequestParam("amount") Double amount) {
accountInfoService.updateAccountBalance("2", amount);
return true;
}
}
- 張三向李四轉賬成功。
- 李四事務失敗,張三事務回滾成功。
- 張三事務失敗,李四分支事務回滾成功。
- 分支事務超時測試。
如果拿TCC事務的處理流程與2PC兩階段提交做比較,2PC通常都是在跨庫的DB層面,而TCC則在應用層面的處 理,需要通過業務邏輯來實現。這種分布式事務的實現方式的優勢在於,可以讓應用自己定義數據操作的粒度,使得降低鎖沖突、提高吞吐量成為可能。
而不足之處則在於對應用的侵入性非常強,業務邏輯的每個分支都需要實現try、confirm、cancel三個操作。此外,其實現難度也比較大,需要按照網絡狀態、系統故障等不同的失敗原因實現不同的回滾策略。