所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
上一篇文章中,我們看了一下GlobalTransactionalInterceptor這個攔截器,知道了@GlobalTransactional注解的主體邏輯被委托給了TransactionalTemplate來實現。
本文,將看一下TransationalTemplate這個模板方法處理全局事務的主體邏輯。
execute方法
首先,我們跟進TransactionalTemplate的execute方法
public Object execute(TransactionalExecutor business) throws Throwable { // 1. 獲取或者創建一個全局事務 GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 獲取事務信息 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. 開始全局事務 beginTransaction(txInfo, tx); Object rs = null; try { // 執行業務邏輯 rs = business.execute(); } catch (Throwable ex) { // 3.rollback全局事務 completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 4. commit全局事務 commitTransaction(tx); return rs; } finally { //5. 清理 triggerAfterCompletion(); cleanUp(); } }
execute方法的邏輯我們應該非常的熟悉,這和JDBC的API非常的相似。同樣是經歷:begin -> commit || rollback,這樣一個邏輯。
步驟主要分為如下幾個:
1)獲取或者創建一個全局事務;
2)begin全局事務;
3)異常rollback事務;
4)正常commit事務;
下面,我們將逐步閱讀對應步驟的代碼
getCurrentOrCreate
GlobalTransactionContext作為一個上下文,提供了獲取或者創建全局事務的方法,跟進它的getCurrentOrCreate方法
public static GlobalTransaction getCurrentOrCreate() { GlobalTransaction tx = getCurrent(); // 如果獲取不到現有的全局事務,那么創建一個 if (tx == null) { return createNew(); } return tx; }
我們,先看看它是如何獲取到已經存在的全局事務的,跟進getCurrent方法
private static GlobalTransaction getCurrent() { // 獲取全局事務的XID String xid = RootContext.getXID(); // XID不存在,表示不存在全局事務 if (xid == null) { return null; } // 否則以參與者的身份加入全局事務中 return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant); }
顯然,判斷是否存在於全局事務中是根據傳遞而來的XID是否存在而決定的。我們跟進getXID看看從哪里獲取XID
private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load(); public static String getXID() { String xid = CONTEXT_HOLDER.get(KEY_XID); if (StringUtils.isNotBlank(xid)) { return xid; } String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE); if (StringUtils.isNotBlank(xidType) && xidType.indexOf("_") > -1) { return xidType.split("_")[0]; } return null; }
可以看到,CONTEXT_HOLDER是一個靜態對象,通過load方法加載的。跟進load,我們看一下加載邏輯
private static class ContextCoreHolder { private static ContextCore instance; static { // SPI機制加載自定義配置 ContextCore contextCore = EnhancedServiceLoader.load(ContextCore.class); if (contextCore == null) { // 默認采用ThreadLocal實現 contextCore = new ThreadLocalContextCore(); } instance = contextCore; } } public static ContextCore load() { return ContextCoreHolder.instance; }
seata沒有直接采用JAVA提供的SPI機制,而是自己寫了一套。通過load方法加載自定義實現,不過默認是選擇ThreadLocal來實現的。也就是說,在同一個線程中可以直接獲取XID,不同線程中通過set到threadLocal中實現傳遞。
總得來說,只要有XID,那么就表示已經存在全局事務。
我們再回到getCurrentOrCreate方法中
public static GlobalTransaction getCurrentOrCreate() { GlobalTransaction tx = getCurrent(); // 如果獲取不到現有的全局事務,那么創建一個 if (tx == null) { return createNew(); } return tx; }
如果XID不存在,那么tx就會為null,這時候將會創建初始的GlobalTransaction。跟進createNew方法看看創建過程
private static GlobalTransaction createNew() { GlobalTransaction tx = new DefaultGlobalTransaction(); return tx; }
創建過程非常簡單,直接new了一個默認實現類。那么,DefaultGlobalTransaction的構造方法有沒有做什么?
DefaultGlobalTransaction() { // status是unknow表示初始狀態,role是launcher表示發起者 this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher); }
可以看到,和加入一個事務不同,創建一個新的事務狀態是unknow而不是begin,角色是創建人而不是參與者。
到這里我們應該明白,分布式事務中XID的傳遞顯得非常重要,這樣你才能把不同機器中的本地事務關聯到全局事務當中,而如果存在XID丟失的問題就比較可怕了。
beginTransaction
經過getCurrentOrCreate以后,我們已經獲得了一個新的或者既有的全局事務。在業務邏輯執行之前,我們需要先做一次begin。跟進beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); // 調用的是全局事務的begin方法 tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } }
繼續跟進DefaultGlobalTransaction的begin方法
public void begin(int timeout, String name) throws TransactionException { // 參與者不做后續處理 if (role != GlobalTransactionRole.Launcher) { return; } // 省略 // 調用TransactionManager,begin全局事務 xid = transactionManager.begin(null, null, name, timeout); // status從UNKNOW變成BEGIN status = GlobalStatus.Begin; // 綁定XID到threadLocal RootContext.bind(xid); }
可以看到,參與者這里被return了,所以全局事務只會begin一次。調用TransactionManager以后會生成一個XID標識這個全局事務,並綁定到context當中,狀態變成BEGIN。
我們跟進TransactionManager的begin方法,看看做了啥
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 構建一個begin請求 GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); // 同步請求seata的服務端 GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } // 獲取服務端的XID return response.getXid(); }
TransactionManager將發起一個請求到服務端,由服務端來生成一個XID。可見,開啟一個全局事務會在服務端生成一份全局事務的信息。這時候全局事務處於begin狀態了
commitTransaction
業務邏輯處理完畢,將由commitTransaction來提交全局事務。跟進commitTransaction方法看看
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeCommit(); // 提交全局事務 tx.commit(); triggerAfterCommit(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } }
和begin一樣,將調用GlobalTransaction的commit方法,跟進commit方法
@Override public void commit() throws TransactionException { if (role == GlobalTransactionRole.Participant) { return; } // 省略 int retry = COMMIT_RETRY_COUNT; try { while (retry > 0) { try { // 提交該XID的全局事務 status = transactionManager.commit(xid); break; } catch (Throwable ex) { retry--; if (retry == 0) { throw new TransactionException("Failed to report global commit", ex); } } } } finally { if (RootContext.getXID() != null) { // 解綁XID if (xid.equals(RootContext.getXID())) { RootContext.unbind(); } } } }
GlobalTransaction也是調用了TransactionManager的commit方法,finally塊將解綁XID,可見如果到finally部分基本上算是結束了。而while部分做了一些重試操作。
再看看TransactionManager的commit方法
@Override public GlobalStatus commit(String xid) throws TransactionException { // 全局事務提交的請求 GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); // 同步提交全局事務提交請求 GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit); return response.getGlobalStatus(); }
請求Server端,響應回來的status將作為本地全局事務的status。如果一切順利,到GlobalTransaction的commit這里就已經結束了。
completeTransactionAfterThrowing
標准的事務邏輯除了commit還存在rollback的邏輯,當業務邏輯執行異常的時候要進行rollback回滾操作。
我們跟進completeTransactionAfterThrowing方法看看
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException { // 判斷是否回滾 if (txInfo != null && txInfo.rollbackOn(ex)) { try { // 回滾事務 rollbackTransaction(tx, ex); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex); } } else { // 如果不需要回滾,直接commit commitTransaction(tx); } }
可以看到,並不是所有異常都需要回滾的,txInfo包含的回滾規則會判斷該異常是否回滾,如果不需要回滾直接commit全局事務。
跟進rollbackTransaction看看回滾操作
private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException { triggerBeforeRollback(); // 調用GlobalTransaction的rollback操作 tx.rollback(); triggerAfterRollback(); // 拋出一個回滾完成的執行異常 throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); }
首先,和begin、commit一樣,rollback也是調用了GlobalTransaction的方法。但是要注意,這里rollback完成以后會拋出一個RollbackDone的ExecutionException異常。
ExecutionException將會被GlobalTransactionalInterceptor的handleGlobalTransaction方法給捕捉到,我們看看捕捉的位置
try { // 省略 } catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getCause()); throw e.getCause(); default: throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code); } }
可以看到,匹配到done以后,就會把原始的異常拋出。除了done以外,還有其它對應的狀態處理。
回到rollbackTransaction方法
private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException { triggerBeforeRollback(); // 調用GlobalTransaction的rollback操作 tx.rollback(); triggerAfterRollback(); // 拋出一個回滾完成的執行異常 throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); }
跟進rollback
@Override public void rollback() throws TransactionException { if (role == GlobalTransactionRole.Participant) { return; } int retry = ROLLBACK_RETRY_COUNT; try { // 重試 while (retry > 0) { try { // 向Server端發起rollback操作 status = transactionManager.rollback(xid); break; } catch (Throwable ex) { retry--; if (retry == 0) { throw new TransactionException("Failed to report global rollback", ex); } } } } finally { if (RootContext.getXID() != null) { // 解綁XID if (xid.equals(RootContext.getXID())) { RootContext.unbind(); } } } }
可以看到,和commit的邏輯幾乎一致。向Server端發起全局事務的rollback操作,finally塊最終解綁XID來結束這個全局事務。
總結
到這里,TransactionalTemplate的主體邏輯就結束了。其實邏輯並不復雜,主要就是遵循了begin->commit || rollback這個規范。與Server端的交互交給了TransactionManager來負責。
不過,靠這樣一個簡單的邏輯並不能夠實現分布式事務,還有很多保證事務一致性等機制需要后續去了解清楚的。
本文就先到這里~