TCC Demo 代碼實現
簡介
設計實現一個 TCC 分布式事務框架的簡單 Demo,實現事務管理器,不需要實現全局事務的持久化和恢復、高可用等
工程運行
- 工程地址:TCCDemo
需要MySQL數據庫,保存全局事務信息,相關TCC步驟都會打印在控制台上
- 1:啟動MySQL,創建一個數據庫 test
- 2.運行當前工程的:TccDemoApplication,啟動以后自動創建數據庫的表
- 3.訪問:http://localhost:8080/transaction/commit,confirm示例
- 4.訪問:http://localhost:8080/transaction/cancel,cancel示例
大致實現思路
- 1.初始化:想事務管理器注冊新事務,生成全局事務唯一ID
- 2.try階段執行:try相關的代碼執行,期間注冊相應的調用記錄,發送try執行結果到事務管理器,執行成功由事務管理器執行confirm或者cancel步驟
- 3.confirm階段:事務管理器收到try執行成功信息,根據事務ID,進入事務confirm階段執行,confirm失敗進入cancel,成功則結束
- 4.cancel階段:事務管理器收到try執行失敗或者confirm執行失敗,根據事務ID,進入cancel階段執行后結束,如果失敗了,打印日志或者告警,讓人工參與處理
前置知識
TCC 原理
TCC分布式事務主要的三個階段:
- 1.Try:主要是對業務系統做檢測及資源預留
- 2.Confirm:確認執行業務操作
- 3.Cancel:取消執行業務操作
下面以一個例子來說明三個階段需要做的事:比如現在有兩個數據庫,一個用戶賬戶數據庫、一個商品庫存數據庫,現在提供一個買貨的接口,當買賣成功時,扣除用戶賬戶和商品庫存,大致偽代碼如下:
public void buy() {
// 用戶賬戶操作
userAccount();
// 商品賬戶操作
StoreAccount();
}
在上面這個操作做,兩個函數的操作必須同時成功,不然就會出現數據不一致問題,也就是需要保證事務原子性。
因為設定的場景是數據在兩個不同的數據庫,所有沒有辦法利用單個數據庫的事務機制,它是跨數據庫的,所以需要分布式事務的機制。
下面簡單模擬下,在不使用TCC事務管理器,按照TCC的思想,在代碼中如何保證事務原子性
TCC 無事務管理器 Demo 偽代碼
使用上面的場景,代碼大致如下:
class Demo {
public void buy() {
// try 階段:比如去判斷用戶和商品的余額和存款是否充足,進行預扣款和預減庫存
if (!userServer.tryDeductAccount()) {
// 用戶預扣款失敗,相關數據沒有改變,返回錯誤即可
}
if (!storeService.tryDeductAccount()) {
// cancel 階段: 商品預減庫存失敗,因為前面進行了用戶預扣款,所以需要進入cancel階段,恢復用戶賬戶
userService.cancelDeductAccount();
}
// Confirm 階段:try 成功就進行confirm階段,這部分操作比如是將扣款成功狀態和減庫存狀態設置為完成
if (!userService.confirmDeductAccount() || !storeService.confirmDeductAccount()) {
// cancel 階段:confirm的任意階段失敗了,需要進行數據恢復(回滾)
userService.cancelDeductAccount();
storeService.cancelDeductAccount();
}
}
}
上面就是一個TCC事務大致代碼,可以看到:之前的每個函數操作都需要分為三個子函數,try、confirm、cancel。將其細化,在代碼中判斷執行,保證其事務原子性。
上面是兩個服務,用戶賬戶和商品存儲操作,看着寫起來不是太多,但如果是多個服務呢?try階段就會多很多的if,還有相應的cancel的動態增加,confirm也是,大致如下:
class Demo {
public void buy() {
// try 階段:比如去判斷用戶和商品的余額和存款是否充足,進行預扣款和預減庫存
if (!userServer.tryDeductAccount()) {
// 用戶預扣款失敗,相關數據沒有改變,返回錯誤即可
}
if (!storeService.tryDeductAccount()) {
// cancel 階段: 商品預減庫存失敗,因為前面進行了用戶預扣款,所以需要進入cancel階段,恢復用戶賬戶
userService.cancelDeductAccount();
}
// try增加、cancel也動態增加
if (!xxxService.tryDeductAccount()) {
xxxService.cancelDeductAccount();
xxxService.cancelDeductAccount();
}
if (!xxxService.tryDeductAccount()) {
xxxService.cancelDeductAccount();
xxxService.cancelDeductAccount();
xxxService.cancelDeductAccount();
}
........
// Confirm 階段:try 成功就進行confirm階段,這部分操作比如是將扣款成功狀態和減庫存狀態設置為完成
if (!userService.confirmDeductAccount() || !storeService.confirmDeductAccount() || ......) {
// cancel 階段:confirm的任意階段失敗了,需要進行數據恢復(回滾)
userService.cancelDeductAccount();
storeService.cancelDeductAccount();
.......
}
}
}
可以看出代碼相似性很多,工程中相似的需要分布式調用的有很多,這樣的話,大量這樣的類似代碼就會充斥在工程中,為了偷懶,引入TCC事務管理器就能簡化很多
TCC 事務管理器
為了偷懶,用事務管理器,那偷的是哪部分懶呢?在之前的代碼中,try階段還是交給本地程序去做,而confirm和cancel委托給了事務管理器。下面看下Seata和Hmily的TCC偽代碼:
interface UserService {
@TCCAction(name = "userAccount", confirmMethod = "confirm", cancelMethod = "cancel")
public void try();
public void confirm();
public void cancel();
}
interface StoreService {
@TCCAction(name = "userAccount", confirmMethod = "confirm", cancelMethod = "cancel")
public void try();
public void confirm();
public void cancel();
}
class Demo {
@TCCGlobalTransaction
public String buy() {
if (!userService.buy()) {
throw error;
}
if (!storeService.try()) {
throw error;
}
return Tcc.xid();
}
}
調試參考了Seata和Hmily的TCC,理出了大概的步驟,其中的細節方面有很多很多的東西,暫時忽略,主要看大體的實現
這里進行大量的簡化,使用連個注解即可,能大體完成TCC整個流程,下面說一下整個TCC Demo的運行流程:
- 1.初始化:想事務管理器注冊新事務,生成全局事務唯一ID
- 2.try階段執行:try相關的代碼執行,期間注冊相應的調用記錄,發送try執行結果到事務管理器,執行成功由事務管理器執行confirm或者cancel步驟
- 3.confirm階段:事務管理器收到try執行成功信息,根據事務ID,進入事務confirm階段執行,confirm失敗進入cancel,成功則結束
- 4.cancel階段:事務管理器收到try執行失敗或者confirm執行失敗,根據事務ID,進入cancel階段執行后結束,如果失敗了,打印日志或者告警,讓人工參與處理
1.初始化
此步驟主要是,注冊生成新的全局事務,獲取新事務的唯一標識ID。
@TCCGlobalTransaction,這個注解就是用於標識一個事務的開始,注冊新的事務,將新事務的ID放入當前的threadLocal中,后面的函數執行也能獲取到當前的事務ID,進行自己的操作
2.try階段
此階段主要是各個被調用服務的try操作的執行,比如:userService.try()/storeService.try()
在其上加了 @TCCAction 注解,用於注冊改事務的函數調用記錄:因為事務的數量是不確定的,當加這個注解的時候,調用進行攔截后,會根據從threadLocal中獲取的事務ID,想事務管理器注冊改事務的子事務的confirm和cancel方法,用於后面事務管理能根據事務ID,推動相關confirm和cancel的執行
3.confirm階段:
當上面的buy()函數執行完成以后,並成功以后,發送消息給事務管理器,事務管理器就通過事務ID,來推動接下來confirm階段的執行
4.cancel階段:
當buy函數執行失敗,或者confirm執行失敗后,根據當前的事務ID,事務管理器推動進入cancel階段的執行
代碼實現
代碼實現部分只列出關鍵代碼了,代碼還是稍微有點多的,代碼實現的邏輯線如下:
- 1.對 @TCCGlobalTransaction 進行攔截處理:生成全局事務ID
- 2.在事務函數執行的過程中,對 @TCCAction 進行攔截:將分支事務調用信息注冊到全局事務管理數據庫中
- 3.當 try 階段執行成功或者失敗的時候,向全局事務管理器發送消息
- 4.全局事務管理器說道 try 節點的執行結束觸發點,發送信息推動各個分支事務的 confirm 或者 cancel 階段的執行
1.對 @TCCGlobalTransaction 進行攔截處理:生成全局事務ID
定義相關的 @TCCGlobalTransaction 注解,大致代碼如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TccTransaction {
}
示例的全局事務使用如下:
@Slf4j
@Component
public class TransactionService {
@Autowired
private UserAccountServiceImpl user;
@Autowired
private StoreAccountServiceImpl store;
@TccTransaction
public void buySuccess() {
log.info("global transaction id:: " + RootContext.get());
if (!user.prepare(true)) {
log.info("user try failed");
throw new RuntimeException("user prepare failed!");
}
log.info("user try success");
if (!store.prepare(true)) {
log.info("store try failed");
throw new RuntimeException("store prepare failed");
}
log.info("store try success");
}
}
對注解進行攔截,生成全局事務ID,放入threadLocal中,后面的函數執行就能拿到這個ID,大致代碼如下:
/**
* 全局事務{@TccTransacton} 攔截處理
* 用於生成 全局事務 唯一標識ID,想事務管理器進行注冊生成
* @author lw
*/
@Aspect
@Component
@Slf4j
public class GlobalTransactionHandler {
private final TransactionInfoMapper transactionInfoMapper;
public GlobalTransactionHandler(TransactionInfoMapper transactionInfoMapper) {
this.transactionInfoMapper = transactionInfoMapper;
}
@Pointcut("@annotation(com.tcc.demo.demo.annotation.TccTransaction)")
public void globalTransaction() {}
/**
* 對全局事務進行攔截處理
* @param point
* @return
* @throws UnknownHostException
*/
@Around("globalTransaction()")
public Object globalTransactionHandler(ProceedingJoinPoint point) throws UnknownHostException {
log.info("Global transaction handler");
// 生成全局事務ID,放入threadLocal中
String transactionId = createTransactionId();
RootContext.set(transactionId);
......
return null;
}
/**
* 生成全局事務ID:本機IP地址+本地分支事務管理器監聽端口+時間戳
* @return xid
* @throws UnknownHostException UnknownHostException
*/
private String createTransactionId() throws UnknownHostException {
String localAddress = InetAddress.getLocalHost().getHostAddress();
String timeStamp = String.valueOf(System.currentTimeMillis());
return localAddress + ":8080:" + timeStamp;
}
}
2.在事務函數執行的過程中,對 @TCCAction 進行攔截:將分支事務調用信息注冊到全局事務管理數據庫中
注解的定義大致如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TccAction {
String name();
String confirmMethod();
String cancelMethod();
}
使用示例大致如下:
@Component
@Slf4j
public class StoreAccountServiceImpl implements Service {
@Override
@TccAction(name = "prepare", confirmMethod = "commit", cancelMethod = "cancel")
public boolean prepare(boolean success) {
......
}
@Override
public boolean commit() {
......
}
@Override
public boolean cancel() {
......
}
}
在分支事務執行 try(prepare函數),需要進行攔截,將其調用信息注冊到全局事務管理中,大致代碼如下:
@Aspect
@Component
@Slf4j
public class BranchTransactionHandler {
private final TccClientService tccClientService;
public BranchTransactionHandler(TccClientService tccClientService) {
this.tccClientService = tccClientService;
}
@Pointcut(value = "@annotation(com.tcc.demo.demo.annotation.TccAction)")
public void branchTransaction() {}
@Before("branchTransaction()")
public void branchTransactionHandler(JoinPoint point) throws Throwable {
log.info("Branch transaction handler :: " + RootContext.get());
// 獲取分支事務服務類名,用於后面反射類加載
Object target = point.getTarget().getClass();
String className = ((Class) target).getName();
MethodSignature methodSignature = (MethodSignature) point.getSignature();
Method method = methodSignature.getMethod();
TccAction tccActionAnnotation = method.getAnnotation(TccAction.class);
// 獲取 confirm 和 cancel 的對應方法名稱
String commitMethodName = tccActionAnnotation.confirmMethod();
String cancelMethodName = tccActionAnnotation.cancelMethod();
// 寫入全局事務管理的數據中
tccClientService.register(RootContext.get(), className, commitMethodName, cancelMethodName);
}
}
3.當 try 階段執行成功或者失敗的時候,向全局事務管理器發送消息
在 @TccTransaction 中會調用整個函數的執行,其過程就會觸發各個分支事務 @TCCAction的執行,也就是 try 階段的執行。當 try 執行失敗或者成功后,全局事務管理 推動進入 confirm 或者 cancel 階段。大致代碼如下:
/**
* 全局事務{@TccTransacton} 攔截處理
* 用於生成 全局事務 唯一標識ID,想事務管理器進行注冊生成
* @author lw
*/
@Aspect
@Component
@Slf4j
public class GlobalTransactionHandler {
private final TransactionInfoMapper transactionInfoMapper;
public GlobalTransactionHandler(TransactionInfoMapper transactionInfoMapper) {
this.transactionInfoMapper = transactionInfoMapper;
}
@Pointcut("@annotation(com.tcc.demo.demo.annotation.TccTransaction)")
public void globalTransaction() {}
/**
* 對全局事務進行攔截處理
* @param point
* @return
* @throws UnknownHostException
*/
@Around("globalTransaction()")
public Object globalTransactionHandler(ProceedingJoinPoint point) throws UnknownHostException {
log.info("Global transaction handler");
// 生成全局事務ID,放入threadLocal中
String transactionId = createTransactionId();
RootContext.set(transactionId);
try {
// try 階段的執行
point.proceed();
} catch (Throwable throwable) {
// try 失敗以后,在數據庫中更新所有分支事務的狀態
log.info("global update transaction status to try failed");
updateTransactionStatus(transactionId, TransactionStatus.TRY_FAILED);
log.info("global update transaction status to try failed end");
// 發送消息推動進入 cancel 階段
log.info(transactionId + " global transaction try failed, will rollback");
sendTryMessage(transactionId);
return null;
}
// try 成功,在數據庫中更新所有分支事務的狀態
log.info("global update transaction status to try success");
updateTransactionStatus(transactionId, TransactionStatus.TRY_SUCCESS);
log.info("global update transaction status to try success end");
// 發送消息推動進入 confirm 階段,如果 confirm 失敗,則再次發送消息推動進入 cancel 階段
log.info(transactionId + " global transaction try success, will confirm");
if (!sendTryMessage(transactionId)) {
log.info(transactionId + " global transaction confirm failed, will cancel");
sendTryMessage(transactionId);
}
return null;
}
/**
* 發送消息到 分支事務管理器(TM)
* TM 收到消息后,查詢事務數據庫,根據事務狀態,判斷執行 confirm 或者 cancel
* 這里使用HTTP作為通信方式(為了簡便,當然也可以使用其他的,如dubbo之類的)
* @param transactionId xid
* @return execute result
*/
private boolean sendTryMessage(String transactionId) {
log.info("send message to local TM to execute next step");
String[] slice = transactionId.split(":");
String targetHost = slice[0];
String targetPort = slice[1];
RestTemplate restTemplate = new RestTemplate();
String url = "http://" + targetHost + ":" + targetPort + "/tm/tryNext?xid=" + transactionId;
Boolean response = restTemplate.getForObject(url, boolean.class, new HashMap<>(0));
if (response == null || !response) {
log.info("try next step execute failed, please manual check");
return false;
} else {
log.info("try next step execute success");
return true;
}
}
/**
* 生成全局事務ID:本機IP地址+本地分支事務管理器監聽端口+時間戳
* @return xid
* @throws UnknownHostException UnknownHostException
*/
private String createTransactionId() throws UnknownHostException {
String localAddress = InetAddress.getLocalHost().getHostAddress();
String timeStamp = String.valueOf(System.currentTimeMillis());
return localAddress + ":8080:" + timeStamp;
}
/**
* 根據 xid 更新 所有分支事務的執行狀態
* @param xid xid
* @param status status
*/
private void updateTransactionStatus(String xid, int status) {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setXid(xid);
transactionInfo.setStatus(status);
try {
transactionInfoMapper.updateOne(transactionInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.全局事務管理器說道 try 節點的執行結束觸發點,發送信息推動各個分支事務的 confirm 或者 cancel 階段的執行
分支事務管理器(TM)收到了消息,就根據 xid 撈出所有的分支事務,根據狀態,判斷執行 confirm 或者 cancel
只需要處理注冊上來的分支事務即可,try 執行完的必然注冊上來了,后面執行 confirm即可。try 沒有執行的,肯定是前面的分支事務出錯了,只要恢復前面的數據即可。
大致代碼如下:
@Service
@Slf4j
public class TccClientService {
private final TransactionInfoMapper transactionInfoMapper;
public TccClientService(TransactionInfoMapper transactionInfoMapper) {
this.transactionInfoMapper = transactionInfoMapper;
}
/**
* 收到 全局事務管理器(TC)的信息后執行
* 查詢數據庫,存在一個分支事務失敗狀態則進入 cancel,全成功則進入 confirm 階段
* @param xid xid
* @return 返回 confirm 或者 cancel 的執行結果
*/
public boolean transactionHandle(String xid) {
// 根據 xid 查詢出所有的分支事務信息
Map<String, Object> condition = new HashMap<>(1);
condition.put("xid", xid);
List<Map<String, Object>> branchTransactions = transactionInfoMapper.query(condition);
// 判斷是否所有事務的 try 都執行成功,如果成功則 confirm,反之 cancel
boolean executeConfirm = true;
for (Map<String, Object> item: branchTransactions) {
if (item.get("status").equals(TransactionStatus.TRY_FAILED) || item.get("status").equals(TransactionStatus.CONFIRM_FAILED)) {
executeConfirm = false;
break;
}
}
// 執行 confirm 或者 cancel
if (executeConfirm) {
return executeMethod(branchTransactions, TransactionMethod.CONFIRM);
} else {
return executeMethod(branchTransactions, TransactionMethod.CANCEL);
}
}
/**
* 通過分支事務注冊的 類名和方法名,反射調用相應的 confirm 或者 cancel 方法
* 這里是串行的,也可以使用線程池進行並行操作
* @param branchTransactions 分支事務信息
* @param methodName confirm 或者 cancel
* @return bool
*/
private boolean executeMethod(List<Map<String, Object>> branchTransactions, String methodName) {
for (Map<String, Object> item: branchTransactions) {
log.info("service info:: " + item.toString());
log.info("service method :: " + item.get(methodName).toString());
try {
Class<?> clazz = Class.forName(item.get("class_name").toString());
log.info("Service Class::" + clazz.getName());
Method method = clazz.getDeclaredMethod(item.get(methodName).toString());
log.info("Service Method::" + method.toString());
Object service = clazz.newInstance();
Object ret = method.invoke(service);
log.info("execute method return: " + ret.toString());
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
e.printStackTrace();
return false;
}
}
return true;
}
}
總結
到此就實現了一個非常簡陋的 TCC Demo了。其中 TC 和 TM 的角色不是特別清晰,因為他們基本嵌入到一個應用里面去了,但還是體現了大致的思路。當然,TC也完全是可以分離的,像Seata就是一個獨立的Server。
TC 和 TM 的通信方法也是可以用其他的,這里為了方便使用的HTTP,也可以使用 RPC之類的。
完整的工程如下:TCCDemo