TCC Demo 代碼實現


TCC Demo 代碼實現


簡介

    設計實現一個 TCC 分布式事務框架的簡單 Demo,實現事務管理器,不需要實現全局事務的持久化和恢復、高可用等

工程運行

    需要MySQL數據庫,保存全局事務信息,相關TCC步驟都會打印在控制台上

大致實現思路

  • 1.初始化:想事務管理器注冊新事務,生成全局事務唯一ID
  • 2.try階段執行:try相關的代碼執行,期間注冊相應的調用記錄,發送try執行結果到事務管理器,執行成功由事務管理器執行confirm或者cancel步驟
  • 3.confirm階段:事務管理器收到try執行成功信息,根據事務ID,進入事務confirm階段執行,confirm失敗進入cancel,成功則結束
  • 4.cancel階段:事務管理器收到try執行失敗或者confirm執行失敗,根據事務ID,進入cancel階段執行后結束,如果失敗了,打印日志或者告警,讓人工參與處理

前置知識

TCC 原理

    TCC分布式事務主要的三個階段:

  • 1.Try:主要是對業務系統做檢測及資源預留
  • 2.Confirm:確認執行業務操作
  • 3.Cancel:取消執行業務操作

    下面以一個例子來說明三個階段需要做的事:比如現在有兩個數據庫,一個用戶賬戶數據庫、一個商品庫存數據庫,現在提供一個買貨的接口,當買賣成功時,扣除用戶賬戶和商品庫存,大致偽代碼如下:

public void buy() {
    // 用戶賬戶操作
    userAccount();
    // 商品賬戶操作
    StoreAccount();
}

    在上面這個操作做,兩個函數的操作必須同時成功,不然就會出現數據不一致問題,也就是需要保證事務原子性。

    因為設定的場景是數據在兩個不同的數據庫,所有沒有辦法利用單個數據庫的事務機制,它是跨數據庫的,所以需要分布式事務的機制。

    下面簡單模擬下,在不使用TCC事務管理器,按照TCC的思想,在代碼中如何保證事務原子性

TCC 無事務管理器 Demo 偽代碼

    使用上面的場景,代碼大致如下:

class Demo {
    
    public void buy() {
        // try 階段:比如去判斷用戶和商品的余額和存款是否充足,進行預扣款和預減庫存
        if (!userServer.tryDeductAccount()) {
            // 用戶預扣款失敗,相關數據沒有改變,返回錯誤即可
        }
        if (!storeService.tryDeductAccount()) {
            // cancel 階段: 商品預減庫存失敗,因為前面進行了用戶預扣款,所以需要進入cancel階段,恢復用戶賬戶
            userService.cancelDeductAccount();
        }

        // Confirm 階段:try 成功就進行confirm階段,這部分操作比如是將扣款成功狀態和減庫存狀態設置為完成
        if (!userService.confirmDeductAccount() || !storeService.confirmDeductAccount()) {
            // cancel 階段:confirm的任意階段失敗了,需要進行數據恢復(回滾)
            userService.cancelDeductAccount();
            storeService.cancelDeductAccount();
        }
    }
}

    上面就是一個TCC事務大致代碼,可以看到:之前的每個函數操作都需要分為三個子函數,try、confirm、cancel。將其細化,在代碼中判斷執行,保證其事務原子性。

    上面是兩個服務,用戶賬戶和商品存儲操作,看着寫起來不是太多,但如果是多個服務呢?try階段就會多很多的if,還有相應的cancel的動態增加,confirm也是,大致如下:

class Demo {
    
    public void buy() {
        // try 階段:比如去判斷用戶和商品的余額和存款是否充足,進行預扣款和預減庫存
        if (!userServer.tryDeductAccount()) {
            // 用戶預扣款失敗,相關數據沒有改變,返回錯誤即可
        }
        if (!storeService.tryDeductAccount()) {
            // cancel 階段: 商品預減庫存失敗,因為前面進行了用戶預扣款,所以需要進入cancel階段,恢復用戶賬戶
            userService.cancelDeductAccount();
        }
        // try增加、cancel也動態增加
        if (!xxxService.tryDeductAccount()) {
            xxxService.cancelDeductAccount();
            xxxService.cancelDeductAccount();
        }
        if (!xxxService.tryDeductAccount()) {
            xxxService.cancelDeductAccount();
            xxxService.cancelDeductAccount();
            xxxService.cancelDeductAccount();
        }
        ........

        // Confirm 階段:try 成功就進行confirm階段,這部分操作比如是將扣款成功狀態和減庫存狀態設置為完成
        if (!userService.confirmDeductAccount() || !storeService.confirmDeductAccount() || ......) {
            // cancel 階段:confirm的任意階段失敗了,需要進行數據恢復(回滾)
            userService.cancelDeductAccount();
            storeService.cancelDeductAccount();
            .......
        }
    }
}

    可以看出代碼相似性很多,工程中相似的需要分布式調用的有很多,這樣的話,大量這樣的類似代碼就會充斥在工程中,為了偷懶,引入TCC事務管理器就能簡化很多

TCC 事務管理器

    為了偷懶,用事務管理器,那偷的是哪部分懶呢?在之前的代碼中,try階段還是交給本地程序去做,而confirm和cancel委托給了事務管理器。下面看下Seata和Hmily的TCC偽代碼:

interface UserService {

    @TCCAction(name = "userAccount", confirmMethod = "confirm", cancelMethod = "cancel")
    public void try();

    public void confirm();

    public void cancel();
}

interface StoreService {

    @TCCAction(name = "userAccount", confirmMethod = "confirm", cancelMethod = "cancel")
    public void try();

    public void confirm();

    public void cancel();
}

class Demo {

    @TCCGlobalTransaction
    public String buy() {
        if (!userService.buy()) {
            throw error;
        }
        if (!storeService.try()) {
            throw error;
        }
        return Tcc.xid();
    }
}

    調試參考了Seata和Hmily的TCC,理出了大概的步驟,其中的細節方面有很多很多的東西,暫時忽略,主要看大體的實現

    這里進行大量的簡化,使用連個注解即可,能大體完成TCC整個流程,下面說一下整個TCC Demo的運行流程:

  • 1.初始化:想事務管理器注冊新事務,生成全局事務唯一ID
  • 2.try階段執行:try相關的代碼執行,期間注冊相應的調用記錄,發送try執行結果到事務管理器,執行成功由事務管理器執行confirm或者cancel步驟
  • 3.confirm階段:事務管理器收到try執行成功信息,根據事務ID,進入事務confirm階段執行,confirm失敗進入cancel,成功則結束
  • 4.cancel階段:事務管理器收到try執行失敗或者confirm執行失敗,根據事務ID,進入cancel階段執行后結束,如果失敗了,打印日志或者告警,讓人工參與處理
1.初始化

    此步驟主要是,注冊生成新的全局事務,獲取新事務的唯一標識ID。

    @TCCGlobalTransaction,這個注解就是用於標識一個事務的開始,注冊新的事務,將新事務的ID放入當前的threadLocal中,后面的函數執行也能獲取到當前的事務ID,進行自己的操作

2.try階段

    此階段主要是各個被調用服務的try操作的執行,比如:userService.try()/storeService.try()

    在其上加了 @TCCAction 注解,用於注冊改事務的函數調用記錄:因為事務的數量是不確定的,當加這個注解的時候,調用進行攔截后,會根據從threadLocal中獲取的事務ID,想事務管理器注冊改事務的子事務的confirm和cancel方法,用於后面事務管理能根據事務ID,推動相關confirm和cancel的執行

3.confirm階段:

    當上面的buy()函數執行完成以后,並成功以后,發送消息給事務管理器,事務管理器就通過事務ID,來推動接下來confirm階段的執行

4.cancel階段:

    當buy函數執行失敗,或者confirm執行失敗后,根據當前的事務ID,事務管理器推動進入cancel階段的執行

代碼實現

    代碼實現部分只列出關鍵代碼了,代碼還是稍微有點多的,代碼實現的邏輯線如下:

  • 1.對 @TCCGlobalTransaction 進行攔截處理:生成全局事務ID
  • 2.在事務函數執行的過程中,對 @TCCAction 進行攔截:將分支事務調用信息注冊到全局事務管理數據庫中
  • 3.當 try 階段執行成功或者失敗的時候,向全局事務管理器發送消息
  • 4.全局事務管理器說道 try 節點的執行結束觸發點,發送信息推動各個分支事務的 confirm 或者 cancel 階段的執行

1.對 @TCCGlobalTransaction 進行攔截處理:生成全局事務ID

    定義相關的 @TCCGlobalTransaction 注解,大致代碼如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TccTransaction {
}

    示例的全局事務使用如下:

@Slf4j
@Component
public class TransactionService {

    @Autowired
    private UserAccountServiceImpl user;

    @Autowired
    private StoreAccountServiceImpl store;

    @TccTransaction
    public void buySuccess() {
        log.info("global transaction id:: " + RootContext.get());
        if (!user.prepare(true)) {
            log.info("user try failed");
            throw new RuntimeException("user prepare failed!");
        }
        log.info("user try success");
        if (!store.prepare(true)) {
            log.info("store try failed");
            throw new RuntimeException("store prepare failed");
        }
        log.info("store try success");
    }
}

    對注解進行攔截,生成全局事務ID,放入threadLocal中,后面的函數執行就能拿到這個ID,大致代碼如下:

/**
 * 全局事務{@TccTransacton} 攔截處理
 * 用於生成 全局事務 唯一標識ID,想事務管理器進行注冊生成
 * @author lw
 */
@Aspect
@Component
@Slf4j
public class GlobalTransactionHandler {

    private final TransactionInfoMapper transactionInfoMapper;

    public GlobalTransactionHandler(TransactionInfoMapper transactionInfoMapper) {
        this.transactionInfoMapper = transactionInfoMapper;
    }

    @Pointcut("@annotation(com.tcc.demo.demo.annotation.TccTransaction)")
    public void globalTransaction() {}

    /**
     * 對全局事務進行攔截處理
     * @param point
     * @return
     * @throws UnknownHostException
     */
    @Around("globalTransaction()")
    public Object globalTransactionHandler(ProceedingJoinPoint point) throws UnknownHostException {
        log.info("Global transaction handler");

        // 生成全局事務ID,放入threadLocal中
        String transactionId = createTransactionId();
        RootContext.set(transactionId);

        ......

        return null;
    }

    /**
     * 生成全局事務ID:本機IP地址+本地分支事務管理器監聽端口+時間戳
     * @return xid
     * @throws UnknownHostException UnknownHostException
     */
    private String createTransactionId() throws UnknownHostException {
        String localAddress = InetAddress.getLocalHost().getHostAddress();
        String timeStamp = String.valueOf(System.currentTimeMillis());
        return localAddress + ":8080:" + timeStamp;
    }
}

2.在事務函數執行的過程中,對 @TCCAction 進行攔截:將分支事務調用信息注冊到全局事務管理數據庫中

    注解的定義大致如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TccAction {

    String name();

    String confirmMethod();

    String cancelMethod();
}

    使用示例大致如下:

@Component
@Slf4j
public class StoreAccountServiceImpl implements Service {

    @Override
    @TccAction(name = "prepare", confirmMethod = "commit", cancelMethod = "cancel")
    public boolean prepare(boolean success) {
        ......
    }

    @Override
    public boolean commit() {
        ......
    }

    @Override
    public boolean cancel() {
        ......
    }
}

    在分支事務執行 try(prepare函數),需要進行攔截,將其調用信息注冊到全局事務管理中,大致代碼如下:

@Aspect
@Component
@Slf4j
public class BranchTransactionHandler {

    private final TccClientService tccClientService;

    public BranchTransactionHandler(TccClientService tccClientService) {
        this.tccClientService = tccClientService;
    }

    @Pointcut(value = "@annotation(com.tcc.demo.demo.annotation.TccAction)")
    public void branchTransaction() {}

    @Before("branchTransaction()")
    public void branchTransactionHandler(JoinPoint point) throws Throwable {
        log.info("Branch transaction handler :: " + RootContext.get());

        // 獲取分支事務服務類名,用於后面反射類加載
        Object target = point.getTarget().getClass();
        String className = ((Class) target).getName();

        MethodSignature methodSignature = (MethodSignature) point.getSignature();
        Method method = methodSignature.getMethod();
        TccAction tccActionAnnotation = method.getAnnotation(TccAction.class);

        // 獲取 confirm 和 cancel 的對應方法名稱
        String commitMethodName = tccActionAnnotation.confirmMethod();
        String cancelMethodName = tccActionAnnotation.cancelMethod();

        // 寫入全局事務管理的數據中
        tccClientService.register(RootContext.get(), className, commitMethodName, cancelMethodName);
    }
}

3.當 try 階段執行成功或者失敗的時候,向全局事務管理器發送消息

    在 @TccTransaction 中會調用整個函數的執行,其過程就會觸發各個分支事務 @TCCAction的執行,也就是 try 階段的執行。當 try 執行失敗或者成功后,全局事務管理 推動進入 confirm 或者 cancel 階段。大致代碼如下:

/**
 * 全局事務{@TccTransacton} 攔截處理
 * 用於生成 全局事務 唯一標識ID,想事務管理器進行注冊生成
 * @author lw
 */
@Aspect
@Component
@Slf4j
public class GlobalTransactionHandler {

    private final TransactionInfoMapper transactionInfoMapper;

    public GlobalTransactionHandler(TransactionInfoMapper transactionInfoMapper) {
        this.transactionInfoMapper = transactionInfoMapper;
    }

    @Pointcut("@annotation(com.tcc.demo.demo.annotation.TccTransaction)")
    public void globalTransaction() {}

    /**
     * 對全局事務進行攔截處理
     * @param point
     * @return
     * @throws UnknownHostException
     */
    @Around("globalTransaction()")
    public Object globalTransactionHandler(ProceedingJoinPoint point) throws UnknownHostException {
        log.info("Global transaction handler");

        // 生成全局事務ID,放入threadLocal中
        String transactionId = createTransactionId();
        RootContext.set(transactionId);

        try {
            // try 階段的執行
            point.proceed();
        } catch (Throwable throwable) {
            // try 失敗以后,在數據庫中更新所有分支事務的狀態
            log.info("global update transaction status to try failed");
            updateTransactionStatus(transactionId, TransactionStatus.TRY_FAILED);
            log.info("global update transaction status to try failed end");

            // 發送消息推動進入 cancel 階段
            log.info(transactionId + " global transaction try failed, will rollback");
            sendTryMessage(transactionId);
            return null;
        }

        // try 成功,在數據庫中更新所有分支事務的狀態
        log.info("global update transaction status to try success");
        updateTransactionStatus(transactionId, TransactionStatus.TRY_SUCCESS);
        log.info("global update transaction status to try success end");

        // 發送消息推動進入 confirm 階段,如果 confirm 失敗,則再次發送消息推動進入 cancel 階段
        log.info(transactionId + " global transaction try success, will confirm");
        if (!sendTryMessage(transactionId)) {
            log.info(transactionId + " global transaction confirm failed, will cancel");
            sendTryMessage(transactionId);
        }

        return null;
    }

    /**
     * 發送消息到 分支事務管理器(TM)
     * TM 收到消息后,查詢事務數據庫,根據事務狀態,判斷執行 confirm 或者 cancel
     * 這里使用HTTP作為通信方式(為了簡便,當然也可以使用其他的,如dubbo之類的)
     * @param transactionId xid
     * @return execute result
     */
    private boolean sendTryMessage(String transactionId) {
        log.info("send message to local TM to execute next step");
        String[] slice = transactionId.split(":");
        String targetHost = slice[0];
        String targetPort = slice[1];

        RestTemplate restTemplate = new RestTemplate();
        String url = "http://" + targetHost + ":" + targetPort + "/tm/tryNext?xid=" + transactionId;
        Boolean response = restTemplate.getForObject(url, boolean.class, new HashMap<>(0));

        if (response == null || !response) {
            log.info("try next step execute failed, please manual check");
            return false;
        } else {
            log.info("try next step execute success");
            return true;
        }
    }

    /**
     * 生成全局事務ID:本機IP地址+本地分支事務管理器監聽端口+時間戳
     * @return xid
     * @throws UnknownHostException UnknownHostException
     */
    private String createTransactionId() throws UnknownHostException {
        String localAddress = InetAddress.getLocalHost().getHostAddress();
        String timeStamp = String.valueOf(System.currentTimeMillis());
        return localAddress + ":8080:" + timeStamp;
    }

    /**
     * 根據 xid 更新 所有分支事務的執行狀態
     * @param xid xid
     * @param status status
     */
    private void updateTransactionStatus(String xid, int status) {
        TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.setXid(xid);
        transactionInfo.setStatus(status);
        try {
            transactionInfoMapper.updateOne(transactionInfo);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.全局事務管理器說道 try 節點的執行結束觸發點,發送信息推動各個分支事務的 confirm 或者 cancel 階段的執行

    分支事務管理器(TM)收到了消息,就根據 xid 撈出所有的分支事務,根據狀態,判斷執行 confirm 或者 cancel

    只需要處理注冊上來的分支事務即可,try 執行完的必然注冊上來了,后面執行 confirm即可。try 沒有執行的,肯定是前面的分支事務出錯了,只要恢復前面的數據即可。

    大致代碼如下:

@Service
@Slf4j
public class TccClientService {

    private final TransactionInfoMapper transactionInfoMapper;

    public TccClientService(TransactionInfoMapper transactionInfoMapper) {
        this.transactionInfoMapper = transactionInfoMapper;
    }

    /**
     * 收到 全局事務管理器(TC)的信息后執行
     * 查詢數據庫,存在一個分支事務失敗狀態則進入 cancel,全成功則進入 confirm 階段
     * @param xid xid
     * @return 返回 confirm 或者 cancel 的執行結果
     */
    public boolean transactionHandle(String xid) {
        // 根據 xid 查詢出所有的分支事務信息
        Map<String, Object> condition = new HashMap<>(1);
        condition.put("xid", xid);
        List<Map<String, Object>> branchTransactions = transactionInfoMapper.query(condition);

        // 判斷是否所有事務的 try 都執行成功,如果成功則 confirm,反之 cancel
        boolean executeConfirm = true;
        for (Map<String, Object> item: branchTransactions) {
            if (item.get("status").equals(TransactionStatus.TRY_FAILED) || item.get("status").equals(TransactionStatus.CONFIRM_FAILED)) {
                executeConfirm = false;
                break;
            }
        }

        // 執行 confirm 或者 cancel
        if (executeConfirm) {
            return executeMethod(branchTransactions, TransactionMethod.CONFIRM);
        } else {
            return executeMethod(branchTransactions, TransactionMethod.CANCEL);
        }
    }

    /**
     * 通過分支事務注冊的 類名和方法名,反射調用相應的 confirm 或者 cancel 方法
     * 這里是串行的,也可以使用線程池進行並行操作
     * @param branchTransactions 分支事務信息
     * @param methodName confirm 或者 cancel
     * @return bool
     */
    private boolean executeMethod(List<Map<String, Object>> branchTransactions, String methodName) {
        for (Map<String, Object> item: branchTransactions) {
            log.info("service info:: " + item.toString());
            log.info("service method :: " + item.get(methodName).toString());

            try {
                Class<?> clazz = Class.forName(item.get("class_name").toString());
                log.info("Service Class::" + clazz.getName());

                Method method = clazz.getDeclaredMethod(item.get(methodName).toString());
                log.info("Service Method::" + method.toString());

                Object service = clazz.newInstance();
                Object ret = method.invoke(service);
                log.info("execute method return: " + ret.toString());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                e.printStackTrace();
                return false;
            }
        }
        return true;
    }
}

總結

    到此就實現了一個非常簡陋的 TCC Demo了。其中 TC 和 TM 的角色不是特別清晰,因為他們基本嵌入到一個應用里面去了,但還是體現了大致的思路。當然,TC也完全是可以分離的,像Seata就是一個獨立的Server。

    TC 和 TM 的通信方法也是可以用其他的,這里為了方便使用的HTTP,也可以使用 RPC之類的。

    完整的工程如下:TCCDemo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM