三、TransactionalTemplate處理全局事務


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

上一篇文章中,我們看了一下GlobalTransactionalInterceptor這個攔截器,知道了@GlobalTransactional注解的主體邏輯被委托給了TransactionalTemplate來實現。

本文,將看一下TransationalTemplate這個模板方法處理全局事務的主體邏輯。

 

execute方法

首先,我們跟進TransactionalTemplate的execute方法

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. 獲取或者創建一個全局事務
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    // 1.1 獲取事務信息
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    try {
        // 2. 開始全局事務
 beginTransaction(txInfo, tx);
        Object rs = null;
        try {
            // 執行業務邏輯
            rs = business.execute();
        } catch (Throwable ex) {
            // 3.rollback全局事務
 completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }
        // 4. commit全局事務
 commitTransaction(tx);
        return rs;
    } finally {
        //5. 清理
        triggerAfterCompletion();
        cleanUp();
    }
}

execute方法的邏輯我們應該非常的熟悉,這和JDBC的API非常的相似。同樣是經歷:begin -> commit || rollback,這樣一個邏輯。

步驟主要分為如下幾個:

1)獲取或者創建一個全局事務;

2)begin全局事務;

3)異常rollback事務;

4)正常commit事務;

下面,我們將逐步閱讀對應步驟的代碼

 

getCurrentOrCreate

GlobalTransactionContext作為一個上下文,提供了獲取或者創建全局事務的方法,跟進它的getCurrentOrCreate方法

public static GlobalTransaction getCurrentOrCreate() {
    GlobalTransaction tx = getCurrent();
    // 如果獲取不到現有的全局事務,那么創建一個
    if (tx == null) {
        return createNew();
    }
    return tx;
}

我們,先看看它是如何獲取到已經存在的全局事務的,跟進getCurrent方法

private static GlobalTransaction getCurrent() {
    // 獲取全局事務的XID
    String xid = RootContext.getXID();
    // XID不存在,表示不存在全局事務 
    if (xid == null) {
        return null;
    }
    // 否則以參與者的身份加入全局事務中
    return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}

顯然,判斷是否存在於全局事務中是根據傳遞而來的XID是否存在而決定的。我們跟進getXID看看從哪里獲取XID

private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();

public static String getXID() {
    String xid = CONTEXT_HOLDER.get(KEY_XID);
    if (StringUtils.isNotBlank(xid)) {
        return xid;
    }

    String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
    if (StringUtils.isNotBlank(xidType) && xidType.indexOf("_") > -1) {
        return xidType.split("_")[0];
    }

    return null;
}

可以看到,CONTEXT_HOLDER是一個靜態對象,通過load方法加載的。跟進load,我們看一下加載邏輯

private static class ContextCoreHolder {
    private static ContextCore instance;

    static {
        // SPI機制加載自定義配置
        ContextCore contextCore = EnhancedServiceLoader.load(ContextCore.class);
        if (contextCore == null) {
            // 默認采用ThreadLocal實現
            contextCore = new ThreadLocalContextCore();
        }
        instance = contextCore;
    }
}


public static ContextCore load() {
    return ContextCoreHolder.instance;
}

seata沒有直接采用JAVA提供的SPI機制,而是自己寫了一套。通過load方法加載自定義實現,不過默認是選擇ThreadLocal來實現的。也就是說,在同一個線程中可以直接獲取XID,不同線程中通過set到threadLocal中實現傳遞。

總得來說,只要有XID,那么就表示已經存在全局事務。

我們再回到getCurrentOrCreate方法中

public static GlobalTransaction getCurrentOrCreate() {
    GlobalTransaction tx = getCurrent();
    // 如果獲取不到現有的全局事務,那么創建一個
    if (tx == null) {
        return createNew();
    }
    return tx;
}

如果XID不存在,那么tx就會為null,這時候將會創建初始的GlobalTransaction。跟進createNew方法看看創建過程

private static GlobalTransaction createNew() {
    GlobalTransaction tx = new DefaultGlobalTransaction();
    return tx;
}

創建過程非常簡單,直接new了一個默認實現類。那么,DefaultGlobalTransaction的構造方法有沒有做什么?

DefaultGlobalTransaction() {
    // status是unknow表示初始狀態,role是launcher表示發起者
    this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}

可以看到,和加入一個事務不同,創建一個新的事務狀態是unknow而不是begin,角色是創建人而不是參與者。

到這里我們應該明白,分布式事務中XID的傳遞顯得非常重要,這樣你才能把不同機器中的本地事務關聯到全局事務當中,而如果存在XID丟失的問題就比較可怕了。

 

beginTransaction

經過getCurrentOrCreate以后,我們已經獲得了一個新的或者既有的全局事務。在業務邏輯執行之前,我們需要先做一次begin。跟進beginTransaction

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin();
        // 調用的是全局事務的begin方法
 tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
    }
}

繼續跟進DefaultGlobalTransaction的begin方法

public void begin(int timeout, String name) throws TransactionException {
    // 參與者不做后續處理
    if (role != GlobalTransactionRole.Launcher) {
        return;
    }
    // 省略
    // 調用TransactionManager,begin全局事務
    xid = transactionManager.begin(null, null, name, timeout);
    // status從UNKNOW變成BEGIN
    status = GlobalStatus.Begin;
    // 綁定XID到threadLocal
    RootContext.bind(xid);
}

可以看到,參與者這里被return了,所以全局事務只會begin一次。調用TransactionManager以后會生成一個XID標識這個全局事務,並綁定到context當中,狀態變成BEGIN。

我們跟進TransactionManager的begin方法,看看做了啥

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
    // 構建一個begin請求
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);

    // 同步請求seata的服務端
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }

    // 獲取服務端的XID
    return response.getXid();
}

TransactionManager將發起一個請求到服務端,由服務端來生成一個XID。可見,開啟一個全局事務會在服務端生成一份全局事務的信息。這時候全局事務處於begin狀態了

 

commitTransaction

業務邏輯處理完畢,將由commitTransaction來提交全局事務。跟進commitTransaction方法看看

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeCommit();
        // 提交全局事務
        tx.commit();
        triggerAfterCommit();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
    }
}

和begin一樣,將調用GlobalTransaction的commit方法,跟進commit方法

@Override
public void commit() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    // 省略
    int retry = COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                // 提交該XID的全局事務
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    } finally {
        if (RootContext.getXID() != null) {
            // 解綁XID
            if (xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
    }
}

GlobalTransaction也是調用了TransactionManager的commit方法,finally塊將解綁XID,可見如果到finally部分基本上算是結束了。而while部分做了一些重試操作。

再看看TransactionManager的commit方法

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 全局事務提交的請求
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    // 同步提交全局事務提交請求
    GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
    return response.getGlobalStatus();
}

請求Server端,響應回來的status將作為本地全局事務的status。如果一切順利,到GlobalTransaction的commit這里就已經結束了。

 

completeTransactionAfterThrowing

標准的事務邏輯除了commit還存在rollback的邏輯,當業務邏輯執行異常的時候要進行rollback回滾操作。

我們跟進completeTransactionAfterThrowing方法看看

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
    // 判斷是否回滾
    if (txInfo != null && txInfo.rollbackOn(ex)) {
        try {
            // 回滾事務
 rollbackTransaction(tx, ex);
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex);
        }
    } else {
        // 如果不需要回滾,直接commit
        commitTransaction(tx);
    }
}

可以看到,並不是所有異常都需要回滾的,txInfo包含的回滾規則會判斷該異常是否回滾,如果不需要回滾直接commit全局事務。

跟進rollbackTransaction看看回滾操作

private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {
    triggerBeforeRollback();
    // 調用GlobalTransaction的rollback操作
    tx.rollback();
    triggerAfterRollback();
    // 拋出一個回滾完成的執行異常
    throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
}

首先,和begin、commit一樣,rollback也是調用了GlobalTransaction的方法。但是要注意,這里rollback完成以后會拋出一個RollbackDone的ExecutionException異常。

ExecutionException將會被GlobalTransactionalInterceptor的handleGlobalTransaction方法給捕捉到,我們看看捕捉的位置

try {
    // 省略
} catch (TransactionalExecutor.ExecutionException e) {
    TransactionalExecutor.Code code = e.getCode();
    switch (code) {
        case RollbackDone: throw e.getOriginalException(); case BeginFailure:
            failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
            throw e.getCause();
        case CommitFailure:
            failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
            throw e.getCause();
        case RollbackFailure:
            failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
            throw e.getCause();
        default:
            throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

    }
}

可以看到,匹配到done以后,就會把原始的異常拋出。除了done以外,還有其它對應的狀態處理。

回到rollbackTransaction方法

private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException {
    triggerBeforeRollback();
    // 調用GlobalTransaction的rollback操作
    tx.rollback();
    triggerAfterRollback();
    // 拋出一個回滾完成的執行異常
    throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
}

跟進rollback

@Override
public void rollback() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    int retry = ROLLBACK_RETRY_COUNT;
    try {
        // 重試
        while (retry > 0) {
            try {
                // 向Server端發起rollback操作
                status = transactionManager.rollback(xid);
                break;
            } catch (Throwable ex) {
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global rollback", ex);
                }
            }
        }
    } finally {
        if (RootContext.getXID() != null) {
            // 解綁XID
            if (xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
    }
}

可以看到,和commit的邏輯幾乎一致。向Server端發起全局事務的rollback操作,finally塊最終解綁XID來結束這個全局事務。

 

總結

到這里,TransactionalTemplate的主體邏輯就結束了。其實邏輯並不復雜,主要就是遵循了begin->commit || rollback這個規范。與Server端的交互交給了TransactionManager來負責。

不過,靠這樣一個簡單的邏輯並不能夠實現分布式事務,還有很多保證事務一致性等機制需要后續去了解清楚的。

本文就先到這里~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM