上一篇文章我們講解了事務的Advisor是如何注冊進Spring容器的,也講解了Spring是如何將有配置事務的類配置上事務的,實際上也就是用了AOP那一套,也講解了Advisor,pointcut驗證流程,至此,事務的初始化工作都已經完成了,在之后的調用過程,如果代理類的方法被調用,都會調用BeanFactoryTransactionAttributeSourceAdvisor這個Advisor的增強方法,也就是我們還未提到的那個Advisor里面的advise,還記得嗎,在自定義標簽的時候我們將TransactionInterceptor這個Advice作為bean注冊進IOC容器,並且將其注入進Advisor中,這個Advice在代理類的invoke方法中會被封裝到攔截器鏈中,最終事務的功能都在advise中體現,所以我們先來關注一下TransactionInterceptor這個類吧。
TransactionInterceptor類繼承自MethodInterceptor,所以調用該類是從其invoke方法開始的,首先預覽下這個方法:
@Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
重點來了,進入invokeWithinTransaction方法:
@Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. TransactionAttributeSource tas = getTransactionAttributeSource(); // 獲取對應事務屬性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 獲取beanFactory中的transactionManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 構造方法唯一標識(類.方法,如:service.UserServiceImpl.save) final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 聲明式事務處理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 創建TransactionInfo TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 執行原方法 // 繼續調用方法攔截器鏈,這里一般將會調用目標類的方法,如:AccountServiceImpl.save方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 異常回滾 completeTransactionAfterThrowing(txInfo, ex); // 手動向上拋出異常,則下面的提交事務不會執行 // 如果子事務出異常,則外層事務代碼需catch住子事務代碼,不然外層事務也會回滾 throw ex; } finally { // 消除信息 cleanupTransactionInfo(txInfo); } // 提交事務 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); try { // 編程式事務處理 Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } }
創建事務Info對象
我們先分析事務創建的過程。
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. // 如果沒有名稱指定則使用方法唯一標識,並使用DelegatingTransactionAttribute封裝txAttr if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 獲取TransactionStatus status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 根據指定的屬性與status准備一個TransactionInfo return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
對於createTransactionlfNecessary函數主要做了這樣幾件事情。
(1)使用 DelegatingTransactionAttribute 封裝傳入的 TransactionAttribute 實例。
對於傳入的TransactionAttribute類型的參數txAttr,當前的實際類型是RuleBasedTransactionAttribute,是由獲取事務屬性時生成,主要用於數據承載,而這里之所以使用DelegatingTransactionAttribute進行封裝,當然是提供了更多的功能。
(2)獲取事務。
事務處理當然是以事務為核心,那么獲取事務就是最重要的事情。
(3)構建事務信息。
根據之前幾個步驟獲取的信息構建Transactionlnfo並返回。
獲取事務
其中核心是在getTransaction方法中:
@Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 獲取一個transaction Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { definition = new DefaultTransactionDefinition(); } // 如果在這之前已經存在事務了,就進入存在事務的方法中 if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } // 事務超時設置驗證 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 走到這里說明此時沒有存在事務,如果傳播特性是MANDATORY時拋出異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 如果此時不存在事務,當傳播特性是REQUIRED或NEW或NESTED都會進入if語句塊 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事務 // 因為此時不存在事務,將null掛起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // new一個status,存放剛剛創建的transaction,然后將其標記為新事務! // 這里transaction后面一個參數決定是否是新事務! DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 新開一個連接的地方,非常重要 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } // 其他的傳播特性一律返回一個空事務,transaction = null //當前不存在事務,且傳播機制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,這三種情況,創建“空”事務 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
先來看看transaction是如何被創建出來的:
@Override protected Object doGetTransaction() { // 這里DataSourceTransactionObject是事務管理器的一個內部類 // DataSourceTransactionObject就是一個transaction,這里new了一個出來 DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 解綁與綁定的作用在此時體現,如果當前線程有綁定的話,將會取出holder // 第一次conHolder肯定是null ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); // 此時的holder被標記成一個舊holder txObject.setConnectionHolder(conHolder, false); return txObject; }
創建transaction過程很簡單,接着就會判斷當前是否存在事務:
@Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); } public boolean hasConnectionHolder() { return (this.connectionHolder != null); }
這里判斷是否存在事務的依據主要是獲取holder中的transactionActive變量是否為true,如果是第一次進入事務,holder直接為null判斷不存在了,如果是第二次進入事務transactionActive變量是為true的(后面會提到是在哪里把它變成true的),由此來判斷當前是否已經存在事務了。
至此,源碼分成了2條處理線
1.當前已存在事務:isExistingTransaction()判斷是否存在事務,存在事務handleExistingTransaction()根據不同傳播機制不同處理
2.當前不存在事務: 不同傳播機制不同處理
當前不存在事務
如果不存在事務,傳播特性又是REQUIRED或NEW或NESTED,將會先掛起null,這個掛起方法我們后面再講,然后創建一個DefaultTransactionStatus ,並將其標記為新事務,然后執行doBegin(transaction, definition);這個方法也是一個關鍵方法
神秘又關鍵的status對象
TransactionStatus接口
public interface TransactionStatus extends SavepointManager, Flushable { // 返回當前事務是否為新事務(否則將參與到現有事務中,或者可能一開始就不在實際事務中運行) boolean isNewTransaction(); // 返回該事務是否在內部攜帶保存點,也就是說,已經創建為基於保存點的嵌套事務。 boolean hasSavepoint(); // 設置事務僅回滾。 void setRollbackOnly(); // 返回事務是否已標記為僅回滾 boolean isRollbackOnly(); // 將會話刷新到數據存儲區 @Override void flush(); // 返回事物是否已經完成,無論提交或者回滾。 boolean isCompleted(); }
再來看看實現類DefaultTransactionStatus
DefaultTransactionStatus
public class DefaultTransactionStatus extends AbstractTransactionStatus { //事務對象 @Nullable private final Object transaction; //事務對象 private final boolean newTransaction; private final boolean newSynchronization; private final boolean readOnly; private final boolean debug; //事務對象 @Nullable private final Object suspendedResources; public DefaultTransactionStatus( @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) { this.transaction = transaction; this.newTransaction = newTransaction; this.newSynchronization = newSynchronization; this.readOnly = readOnly; this.debug = debug; this.suspendedResources = suspendedResources; } //略... }
我們看看這行代碼 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 這里是構造一個status對象的方法 protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
實際上就是封裝了事務屬性definition,新創建的transaction,並且將事務狀態屬性設置為新事務,最后一個參數為被掛起的事務。
簡單了解一下關鍵參數即可:
第二個參數transaction:事務對象,在一開頭就有創建,其就是事務管理器的一個內部類。
第三個參數newTransaction:布爾值,一個標識,用於判斷是否是新的事務,用於提交或者回滾方法中,是新的才會提交或者回滾。
最后一個參數suspendedResources:被掛起的對象資源,掛起操作會返回舊的holder,將其與一些事務屬性一起封裝成一個對象,就是這個suspendedResources這個對象了,它會放在status中,在最后的清理工作方法中判斷status中是否有這個掛起對象,如果有會恢復它
接着我們來看看關鍵代碼 doBegin(transaction, definition);
1 @Override 2 protected void doBegin(Object transaction, TransactionDefinition definition) { 3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; 4 Connection con = null; 5 6 try { 7 // 判斷如果transaction沒有holder的話,才去從dataSource中獲取一個新連接 8 if (!txObject.hasConnectionHolder() || 9 txObject.getConnectionHolder().isSynchronizedWithTransaction()) { 10 //通過dataSource獲取連接 11 Connection newCon = this.dataSource.getConnection(); 12 if (logger.isDebugEnabled()) { 13 logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); 14 } 15 // 所以,只有transaction中的holder為空時,才會設置為新holder 16 // 將獲取的連接封裝進ConnectionHolder,然后封裝進transaction的connectionHolder屬性 17 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); 18 } 19 //設置新的連接為事務同步中 20 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); 21 con = txObject.getConnectionHolder().getConnection(); 22 //conn設置事務隔離級別,只讀 23 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); 24 txObject.setPreviousIsolationLevel(previousIsolationLevel);//DataSourceTransactionObject設置事務隔離級別 25 26 // 如果是自動提交切換到手動提交 27 if (con.getAutoCommit()) { 28 txObject.setMustRestoreAutoCommit(true); 29 if (logger.isDebugEnabled()) { 30 logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); 31 } 32 con.setAutoCommit(false); 33 } 34 // 如果只讀,執行sql設置事務只讀 35 prepareTransactionalConnection(con, definition); 36 // 設置connection持有者的事務開啟狀態 37 txObject.getConnectionHolder().setTransactionActive(true); 38 39 int timeout = determineTimeout(definition); 40 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { 41 // 設置超時秒數 42 txObject.getConnectionHolder().setTimeoutInSeconds(timeout); 43 } 44 45 // 將當前獲取到的連接綁定到當前線程 46 if (txObject.isNewConnectionHolder()) { 47 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); 48 } 49 }catch (Throwable ex) { 50 if (txObject.isNewConnectionHolder()) { 51 DataSourceUtils.releaseConnection(con, this.dataSource); 52 txObject.setConnectionHolder(null, false); 53 } 54 throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); 55 } 56 }
conn設置事務隔離級別,只讀
@Nullable public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException { Assert.notNull(con, "No Connection specified"); // Set read-only flag. // 設置數據連接的只讀標識 if (definition != null && definition.isReadOnly()) { try { if (logger.isDebugEnabled()) { logger.debug("Setting JDBC Connection [" + con + "] read-only"); } con.setReadOnly(true); } catch (SQLException | RuntimeException ex) { Throwable exToCheck = ex; while (exToCheck != null) { if (exToCheck.getClass().getSimpleName().contains("Timeout")) { // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0 throw ex; } exToCheck = exToCheck.getCause(); } // "read-only not supported" SQLException -> ignore, it's just a hint anyway logger.debug("Could not set JDBC Connection read-only", ex); } } // Apply specific isolation level, if any. // 設置數據庫連接的隔離級別 Integer previousIsolationLevel = null; if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { if (logger.isDebugEnabled()) { logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel()); } int currentIsolation = con.getTransactionIsolation(); if (currentIsolation != definition.getIsolationLevel()) { previousIsolationLevel = currentIsolation; con.setTransactionIsolation(definition.getIsolationLevel()); } } return previousIsolationLevel; }
我們看到都是通過 Connection 去設置
線程變量的綁定
我們看 doBegin 方法的47行,將當前獲取到的連接綁定到當前線程,綁定與解綁圍繞一個線程變量,此變量在TransactionSynchronizationManager
類中:
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
這是一個 static final 修飾的 線程變量,存儲的是一個Map,我們來看看47行的靜態方法,bindResource
public static void bindResource(Object key, Object value) throws IllegalStateException { // 從上面可知,線程變量是一個Map,而這個Key就是dataSource // 這個value就是holder Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); // 獲取這個線程變量Map Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == null) { map = new HashMap<>(); resources.set(map); } // 將新的holder作為value,dataSource作為key放入當前線程Map中 Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } Thread.currentThread().getName() + "]"); } // 略... }
擴充知識點
這里再擴充一點,mybatis中獲取的數據庫連接,就是根據 dataSource 從ThreadLocal中獲取的
以查詢舉例,會調用Executor#doQuery方法:
最終會調用DataSourceUtils#doGetConnection獲取,真正的數據庫連接,其中TransactionSynchronizationManager中保存的就是方法調用前,spring增強方法中綁定到線程的connection,從而保證整個事務過程中connection的一致性
我們看看TransactionSynchronizationManager.getResource(Object key)這個方法
@Nullable public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } @Nullable private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
就是從線程變量的Map中根據 DataSource獲取 ConnectionHolder
已經存在的事務
前面已經提到,第一次事務開始時必會新創一個holder然后做綁定操作,此時線程變量是有holder的且avtive為true,如果第二個事務進來,去new一個transaction之后去線程變量中取holder,holder是不為空的且active是為true的,所以會進入handleExistingTransaction方法:
1 private TransactionStatus handleExistingTransaction( 2 TransactionDefinition definition, Object transaction, boolean debugEnabled) 3 throws TransactionException { 4 // 1.NERVER(不支持當前事務;如果當前事務存在,拋出異常)報錯 5 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { 6 throw new IllegalTransactionStateException( 7 "Existing transaction found for transaction marked with propagation 'never'"); 8 } 9 // 2.NOT_SUPPORTED(不支持當前事務,現有同步將被掛起)掛起當前事務,返回一個空事務 10 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { 11 if (debugEnabled) { 12 logger.debug("Suspending current transaction"); 13 } 14 // 這里會將原來的事務掛起,並返回被掛起的對象 15 Object suspendedResources = suspend(transaction); 16 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); 17 // 這里可以看到,第二個參數transaction傳了一個空事務,第三個參數false為舊標記 18 // 最后一個參數就是將前面掛起的對象封裝進新的Status中,當前事務執行完后,就恢復suspendedResources 19 return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources); 20 } 21 // 3.REQUIRES_NEW掛起當前事務,創建新事務 22 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { 23 if (debugEnabled) { 24 logger.debug("Suspending current transaction, creating new transaction with name [" + 25 definition.getName() + "]"); 26 } 27 // 將原事務掛起,此時新建事務,不與原事務有關系 28 // 會將transaction中的holder設置為null,然后解綁! 29 SuspendedResourcesHolder suspendedResources = suspend(transaction); 30 try { 31 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 32 // new一個status出來,傳入transaction,並且為新事務標記,然后傳入掛起事務 33 DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); 34 // 這里也做了一次doBegin,此時的transaction中holer是為空的,因為之前的事務被掛起了 35 // 所以這里會取一次新的連接,並且綁定! 36 doBegin(transaction, definition); 37 prepareSynchronization(status, definition); 38 return status; 39 } 40 catch (RuntimeException beginEx) { 41 resumeAfterBeginException(transaction, suspendedResources, beginEx); 42 throw beginEx; 43 } 44 catch (Error beginErr) { 45 resumeAfterBeginException(transaction, suspendedResources, beginErr); 46 throw beginErr; 47 } 48 } 49 // 如果此時的傳播特性是NESTED,不會掛起事務 50 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { 51 if (!isNestedTransactionAllowed()) { 52 throw new NestedTransactionNotSupportedException( 53 "Transaction manager does not allow nested transactions by default - " + 54 "specify 'nestedTransactionAllowed' property with value 'true'"); 55 } 56 if (debugEnabled) { 57 logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); 58 } 59 // 這里如果是JTA事務管理器,就不可以用savePoint了,將不會進入此方法 60 if (useSavepointForNestedTransaction()) { 61 // 這里不會掛起事務,說明NESTED的特性是原事務的子事務而已 62 // new一個status,傳入transaction,傳入舊事務標記,傳入掛起對象=null 63 DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); 64 // 這里是NESTED特性特殊的地方,在先前存在事務的情況下會建立一個savePoint 65 status.createAndHoldSavepoint(); 66 return status; 67 } 68 else { 69 // JTA事務走這個分支,創建新事務 70 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 71 DefaultTransactionStatus status = newTransactionStatus( 72 definition, transaction, true, newSynchronization, debugEnabled, null); 73 doBegin(transaction, definition); 74 prepareSynchronization(status, definition); 75 return status; 76 } 77 } 78 79 // 到這里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,存在事務加入事務即可,標記為舊事務,空掛起 80 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); 81 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); 82 }
對於已經存在事務的處理過程中,我們看到了很多熟悉的操作,但是,也有些不同的地方,函數中對已經存在的事務處理考慮兩種情況。
(1)PROPAGATION_REQUIRES_NEW表示當前方法必須在它自己的事務里運行,一個新的事務將被啟動,而如果有一個事務正在運行的話,則在這個方法運行期間被掛起。而Spring中對於此種傳播方式的處理與新事務建立最大的不同點在於使用suspend方法將原事務掛起。 將信息掛起的目的當然是為了在當前事務執行完畢后在將原事務還原。
(2)PROPAGATION_NESTED表示如果當前正有一個事務在運行中,則該方法應該運行在一個嵌套的事務中,被嵌套的事務可以獨立於封裝事務進行提交或者回滾,如果封裝事務不存在,行為就像PROPAGATION_REQUIRES_NEW。對於嵌入式事務的處理,Spring中主要考慮了兩種方式的處理。
- Spring中允許嵌入事務的時候,則首選設置保存點的方式作為異常處理的回滾。
- 對於其他方式,比如JTA無法使用保存點的方式,那么處理方式與PROPAGATION_ REQUIRES_NEW相同,而一旦出現異常,則由Spring的事務異常處理機制去完成后續操作。
對於掛起操作的主要目的是記錄原有事務的狀態,以便於后續操作對事務的恢復
小結
到這里我們可以知道,在當前存在事務的情況下,根據傳播特性去決定是否為新事務,是否掛起當前事務。
NOT_SUPPORTED :會掛起事務,不運行doBegin方法傳空transaction
,標記為舊事務。封裝status
對象:
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources)
REQUIRES_NEW :將會掛起事務且運行doBegin方法,標記為新事務。封裝status
對象:
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
NESTED :不會掛起事務且不會運行doBegin方法,標記為舊事務,但會創建savePoint
。封裝status
對象:
DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
其他事務例如REQUIRED :不會掛起事務,封裝原有的transaction不會運行doBegin方法,標記舊事務,封裝status
對象:
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
掛起
對於掛起操作的主要目的是記錄原有事務的狀態,以便於后續操作對事務的恢復:
@Nullable protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 這里是真正做掛起的方法,這里返回的是一個holder suspendedResources = doSuspend(transaction); } // 這里將名稱、隔離級別等信息從線程變量中取出並設置對應屬性為null到線程變量 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); // 將事務各個屬性與掛起的holder一並封裝進SuspendedResourcesHolder對象中 return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
@Override protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 將transaction中的holder屬性設置為空 txObject.setConnectionHolder(null); // ConnnectionHolder從線程變量中解綁! return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
我們來看看 unbindResource
private static Object doUnbindResource(Object actualKey) { // 取得當前線程的線程變量Map Map<Object, Object> map = resources.get(); if (map == null) { return null; } // 將key為dataSourece的value移除出Map,然后將舊的Holder返回 Object value = map.remove(actualKey); // Remove entire ThreadLocal if empty... // 如果此時map為空,直接清除線程變量 if (map.isEmpty()) { resources.remove(); } // Transparently suppress a ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { value = null; } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" + Thread.currentThread().getName() + "]"); } // 將舊Holder返回 return value; }
可以回頭看一下解綁操作的介紹。這里掛起主要干了三件事:
- 將transaction中的holder屬性設置為空
- 解綁(會返回線程中的那個舊的holder出來,從而封裝到SuspendedResourcesHolder對象中)
- 將SuspendedResourcesHolder放入status中,方便后期子事務完成后,恢復外層事務