什么。你還沒有搞懂Spring事務增強器 ,一篇文章讓你徹底搞懂Spring事務,雖然很長但是干貨滿滿


上一篇文章主要講解了事務的Advisor是如何注冊進Spring容器的,也講解了Spring是如何將有配置事務的類配置上事務的,也講解了Advisor,pointcut驗證流程;但是還未提到的那個Advisor里面的advice,想要知道這個我們就先來看一下TransactionInterceptor這個類吧:

TransactionInterceptor這個類繼承自TransactionAspectSupport並且實現了MethodInterceptor接口。所以調用該類是從invoke方法開始;接下來我們就看一下:

  • 看源碼(TransactionInterceptor.java)
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
    // Work out the target class: may be {@code null}.
    // The TransactionAttributeSource should be passed the target class
    // as well as the method, which may be from an interface.
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
        @Override
        @Nullable
        public Object proceedWithInvocation() throws Throwable {
            return invocation.proceed();
        }
        @Override
        public Object getTarget() {
            return invocation.getThis();
        }
        @Override
        public Object[] getArguments() {
            return invocation.getArguments();
        }
    }
    );
}

注意invoke方法里面的invokeWithinTransaction這個方法,我們繼續來追蹤一下

  • 看源碼(TransactionAspectSupport.java)
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 獲取對應事務屬性
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 獲取beanFactory中的transactionManager屬性
    final TransactionManager tm = determineTransactionManager(txAttr);
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
        Boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
        Boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
                            COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
        if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
            throw new IllegalStateException("Coroutines invocation not supported: " + method);
        }
        CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);
        ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
                        Class<?> reactiveType =
                                (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
        if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                                        method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
    }
    );
    InvocationCallback callback = invocation;
    if (corInv != null) {
        callback = () -> CoroutinesUtils.invokeSuspendingFunction(method, corInv.getTarget(), corInv.getArguments());
    }
    Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, (ReactiveTransactionManager) tm);
    if (corInv != null) {
        Publisher<?> pr = (Publisher<?>) result;
        return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
                                KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
    }
    return result;
}
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 構造方法唯一標識(類.方法,如:service.UserServiceImpl.save)
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 聲明式事務處理
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    // Standard transaction demarcation with getTransaction and commit/rollback calls.
    // 創建 TransactionInfo
    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    Object retVal;
    try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // 執行原方法
        // 繼續調用方法攔截器鏈,這里一般會調用目標類方法;如:AccountByXMLServiceImpl.save方法
        retVal = invocation.proceedWithInvocation();
    }
    catch (Throwable ex) {
        // target invocation exception
        // 異常回滾
        completeTransactionAfterThrowing(txInfo, ex);
        // 手動向上拋出異常,則下面的提交事務不會執行
        // 如果自事務出現異常,則外層事務代碼需catch住子事務的代碼,不然外層事務也會回滾
        throw ex;
    }
    finally {
        // 消除信息
        cleanupTransactionInfo(txInfo);
    }
    if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
        // Set rollback-only in case of Vavr failure matching our rollback rules...
        TransactionStatus status = txInfo.getTransactionStatus();
        if (status != null && txAttr != null) {
            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
        }
    }
    // 提交事務
    commitTransactionAfterReturning(txInfo);
    return retVal;
} else {
    Object result;
    final ThrowableHolder throwableHolder = new ThrowableHolder();
    // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
    try {
        // 編程式事務處理
        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
            try {
                Object retVal = invocation.proceedWithInvocation();
                if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                    // Set rollback-only in case of Vavr failure matching our rollback rules...
                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
                return retVal;
            }
            catch (Throwable ex) {
                if (txAttr.rollbackOn(ex)) {
                    // A RuntimeException: will lead to a rollback.
                    if (ex instanceof RuntimeException) {
                        throw (RuntimeException) ex;
                    } else {
                        throw new ThrowableHolderException(ex);
                    }
                } else {
                    // A normal return value: will lead to a commit.
                    throwableHolder.throwable = ex;
                    return null;
                }
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
        }
        );
    }
    catch (ThrowableHolderException ex) {
        throw ex.getCause();
    }
    catch (TransactionSystemException ex2) {
        if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            ex2.initApplicationException(throwableHolder.throwable);
        }
        throw ex2;
    }
    catch (Throwable ex2) {
        if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        }
        throw ex2;
    }
    // Check result state: It might indicate a Throwable to rethrow.
    if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
    }
    return result;
  }
}

創建事務Info對象(TransactionInfo)

然后我們繼續分析上面代碼中的創建事務Info的函數,也就是:TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

  • 看源碼(TransactionAspectSupport.java)
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // If no name specified, apply method identification as transaction name.
    // 如果沒有指定名稱,則使用方法唯一標識,並使用 DelegatingTransactionAttribute 封裝 txAttr
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        }
        ;
    }
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 獲取Transaction
            status = tm.getTransaction(txAttr);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                                            "] because no transaction manager has been configured");
            }
        }
    }
    // 根據指定的屬性 與 status准備一個TransactionInfo
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
  • 源碼分析

針對createTransactionIfNecessary這個函數,主要做了以下幾個事情:

  • 使用DelegatingTransactionAttribute封裝傳入TransactionAttribute實例

    對於TransactionAttribute類型的參數txAttr,當前的實際類型是RuleBasedTransactionAttribute,是由獲取事務屬性時生成,主要用於數據承載,而這里之所以使用DelegatingTransactionAttribute進行封裝,當然是提供了更多功能。

  • 獲取事務

    事務處理當然是以事務為核心,那么獲取事務就是最重要的事

  • 構建事務信息

    根據之前幾個步驟獲取的信息構建TransactionInfo並返回

獲取事務

其主要核心就是在createTransactionIfNecessary函數中的getTransaction方法中:

  • 看源碼(AbstractPlatformTransactionManager.java)
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    //  獲取一個transaction
    Object transaction = doGetTransaction();
    Boolean debugEnabled = logger.isDebugEnabled();
    // 如果在這之前已經存在事務,就進入存在事務的方法中
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 事務超時驗證
    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }
    // 走到這里說明此時沒有存在事務,如果事務的傳播特性是 MANDATORY 則拋出異常
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                            "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 如果此時不存在事務,當傳播特性是 REQUIRED  REQUIRES_NEW  NESTED 都會進入if語句塊 else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // PROPAGATION_REQUIRED PROPAGATION_REQUIRES_NEW  PROPAGATION_NESTED 都需要新建事務,、
        // 因為此時不存在事務,將null 掛起
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }
        try {
            // 注意這個方法
            // new 一個status,存放剛剛創建的transaction,然后將其標記為新事務
            // 新開一個連接的地方,非常重要
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    } else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                                    "isolation level will effectively be ignored: " + def);
        }
        // 其它的事務傳播特性一律返回一個空事務,transaction=null
        // 當前不存在事務,且傳播機制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,這三種情況,創建“空”事務
        Boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

簡單說一下上面函數中的startTransaction方法:

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
            Boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    Boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // new 一個status,存放剛剛創建的transaction,然后將其標記為新事務
    // 這里的 transaction 后面的一個參數決定是否是新事務
    DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 新開一個連接的地方,非常重要
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

接下來繼續查看getTransaction這個函數,看看是如何將transaction創建出來的;看方法doGetTransaction:(這里這里看的是實現類DataSourceTransactionManager的)

  • 看源碼(DataSourceTransactionManager.java)
@Override
protected Object doGetTransaction() {
    // 這里的 DataSourceTransactionObject 是一個事務管理器的一個內部類
    // DataSourceTransactionObject 就是一個transaction 這里直接new 了一個
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // 解綁與綁定的作用在此時體現,如果當前線程有綁定的話,將會取出holder
    // 第一次 conHolder 指定是null
    ConnectionHolder conHolder =
                    (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    // 此時的 holder被標記成一個舊holder
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}
  • 源碼解析

看到上面的源碼后我們不難發現其實創建事務的過程其實很簡單,接下來我們繼續分析創建完事務它又做了什么?回到getTransaction這個方法,發現它接着就會判斷當前是否存在事務(也就是isExistingTransaction(transaction)):

  • 看源碼(DataSourceTransactionManager.java)
@Override
protected Boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

接着看一下這個函數中的hasConnectionHolder方法

public boolean hasConnectionHolder() {
		return (this.connectionHolder != null);
}
  • 源碼解析

這里判斷是否存在事務的依據主要是獲取holder中的transactionActive變量是否為true,holder直接為null判斷不存在了,如果是第二此進入事務transactionActive變量是為true的(后面會提到在哪里把它變為true的),由此來判斷當前是否已經存在事務了。

到這里源碼分成了2條處理線

  1. 當前已經存在事務isExistingTransaction()判斷是否存在事務,存在事務handlerExistingTransaction()根據不同傳播機制不同處理;
  2. 當前不存在事務:不同傳播機制不同處理
當前不存在事務

如果當前不存在事務,傳播特性又是REQUIRED 、 REQUIRES_NEW 、 NESTED,將會先掛起null,這個掛起方法后面再說;然后創建一個DefaultTransactionStatus,並將其標記為新事務,然后執行doBegin(transaction, definition);這個方法也是一個比較關鍵的方法;

這里我們提到了DefaultTransactionStatus 一個status對象,這是一個十分重要的對象我們記下來來簡單看一下TransactionStatus接口

  • 看源碼(TransactionStatus.java)
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
    /**
     * 返回該事務是否在內部攜帶保存點,也就是說,已經創建為基於保存點的嵌套事務。
     * @return
     */
    Boolean hasSavepoint();
    /**
     *  將會話刷新到數據存儲區
     */
    @Override
        void flush();
}

我們再來看一下TransactionStatus層次結構發現這個類繼承了TransactionExecution接口,進去查看一下

  • 看源碼(TransactionExecution.java)
package org.springframework.transaction;
public interface TransactionExecution {
    /**
     * 返回當前事務是否為新事務(否則將參與到現有事務中,或者可能一開始就不在實際事務中運行)
     * @return
     */
    Boolean isNewTransaction();
    /**
     * 設置事務僅回滾。
     */
    void setRollbackOnly();
    /**
     * 返回事務是否已標記為僅回滾
     * @return
     */
    Boolean isRollbackOnly();
    /**
     * 返回事物是否已經完成,無論提交或者回滾。
     * @return
     */
    Boolean isCompleted();
}

接下來我們再查看一下實現類DefaultTransactionStatus

  • 看源碼(DefaultTransactionStatus.java)
package org.springframework.transaction.support;
import org.springframework.lang.Nullable;
import org.springframework.transaction.NestedTransactionNotSupportedException;
import org.springframework.transaction.SavepointManager;
import org.springframework.util.Assert;
public class DefaultTransactionStatus extends AbstractTransactionStatus {
    // 事務對象
    @Nullable
    private final Object transaction;
    // 事務對象
    private final Boolean newTransaction;
    private final Boolean newSynchronization;
    private final Boolean readOnly;
    private final Boolean debug;
    // 事務對象
    @Nullable
    private final Object suspendedResources;
    public DefaultTransactionStatus(
                @Nullable Object transaction, Boolean newTransaction, Boolean newSynchronization,
                Boolean readOnly, Boolean debug, @Nullable Object suspendedResources) {
        this.transaction = transaction;
        this.newTransaction = newTransaction;
        this.newSynchronization = newSynchronization;
        this.readOnly = readOnly;
        this.debug = debug;
        this.suspendedResources = suspendedResources;
    }
    /**
     * Return the underlying transaction object.
     * @throws IllegalStateException if no transaction is active
     */
    public Object getTransaction() {
        Assert.state(this.transaction != null, "No transaction active");
        return this.transaction;
    }
    /**
     * Return whether there is an actual transaction active.
     */
    public Boolean hasTransaction() {
        return (this.transaction != null);
    }
    @Override
    public Boolean isNewTransaction() {
        return (hasTransaction() && this.newTransaction);
    }
    /**
     * Return if a new transaction synchronization has been opened
     * for this transaction.
     */
    public Boolean isNewSynchronization() {
        return this.newSynchronization;
    }
    /**
     * Return if this transaction is defined as read-only transaction.
     */
    public Boolean isReadOnly() {
        return this.readOnly;
    }
    /**
     * Return whether the progress of this transaction is debugged. This is used by
     * {@link AbstractPlatformTransactionManager} as an optimization, to prevent repeated
     * calls to {@code logger.isDebugEnabled()}. Not really intended for client code.
     */
    public Boolean isDebug() {
        return this.debug;
    }
    /**
     * Return the holder for resources that have been suspended for this transaction,
     * if any.
     */
    @Nullable
    public Object getSuspendedResources() {
        return this.suspendedResources;
    }
    @Override
    public Boolean isGlobalRollbackOnly() {
        return ((this.transaction instanceof SmartTransactionObject) &&
                        ((SmartTransactionObject) this.transaction).isRollbackOnly());
    }
    @Override
    protected SavepointManager getSavepointManager() {
        Object transaction = this.transaction;
        if (!(transaction instanceof SavepointManager)) {
            throw new NestedTransactionNotSupportedException(
                                "Transaction object [" + this.transaction + "] does not support savepoints");
        }
        return (SavepointManager) transaction;
    }
    public Boolean isTransactionSavepointManager() {
        return (this.transaction instanceof SavepointManager);
    }
    @Override
    public void flush() {
        if (this.transaction instanceof SmartTransactionObject) {
            ((SmartTransactionObject) this.transaction).flush();
        }
    }
}

看完這里我們接下來繼續回到AbstractPlatformTransactionManager中的getTransaction函數里面的
startTransaction函數。這里有這里有這么一句話。

DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

我們來查看一下newTransactionStatus函數:

  • 看源碼(AbstractPlatformTransactionManager.java)
// 這里是構造一個status對象的方法
protected DefaultTransactionStatus newTransactionStatus(
            TransactionDefinition definition, @Nullable Object transaction, Boolean newTransaction,
            Boolean newSynchronization, Boolean debug, @Nullable Object suspendedResources) {
    Boolean actualNewSynchronization = newSynchronization &&
                    !TransactionSynchronizationManager.isSynchronizationActive();
    return new DefaultTransactionStatus(
                    transaction, newTransaction, actualNewSynchronization,
                    definition.isReadOnly(), debug, suspendedResources);
}
  • 源碼解析

實際上就是封裝了事務屬性definition新建的transaction,並且將事務狀態屬性設置為新事物,最后一個參數為被掛起的事務。

簡單了解一下關鍵參數:

第二個參數transaction:事務對象,在一開頭就有創建,其就是事務管理器的一個內部類

第三個參數newTransaction:布爾值,一個標識,用於判斷是否是新的事務,用於提交或者回滾方法用。是新的才會提交或者回滾

最后一個參數suspendedResources:被掛起的對象資源,掛起操作會返回舊的holder,將其與一些事務屬性一起封裝成一個對象,就是這個suspendedResources對象了,它會放再status中,在最后的清理工作方法中判斷status中是否有這個掛起對象,如果有會恢復它

接下來我們再回到startTransaction方法中的doBegin(transaction, definition);具體實現還是看DataSourceTransactionManager。

  • 看源碼(DataSourceTransactionManager.java)
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
        // 判斷如果transaction 沒有holder的話,才去dataSource中獲取一個新的連接
        if (!txObject.hasConnectionHolder() ||
                            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 通過 dataSource獲取
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // 所以只有transaction的holder為空時,才會設置新的holder
            // 將獲取的連接封裝進 ConnectionHolder 然后封裝進 transaction 的 connectionholder 屬性
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }
        // 設置新的連接為事務同步中
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // con設置事務隔離級別為 只讀
        con = txObject.getConnectionHolder().getConnection();
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        // /DataSourceTransactionObject設置事務隔離級別
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        txObject.setReadOnly(definition.isReadOnly());
        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        // 如果是自動提交切換到手動提交
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }
        // 如果只讀,執行sql設置事務只讀
        prepareTransactionalConnection(con, definition);
        //設置connection 持有者的事務開啟狀態
        txObject.getConnectionHolder().setTransactionActive(true);
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            // 設置超時秒數
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }
        // Bind the connection holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            // 將當前獲取到的連接綁定到當前線程
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }
    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

接下來我們看一下doBegin方法中的con設置事務隔離級別的方法(Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition)😉

  • 看源碼(DataSourceUtils.java)
@Nullable
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
            throws SQLException {
    Assert.notNull(con, "No Connection specified");
    Boolean debugEnabled = logger.isDebugEnabled();
    // Set read-only flag.
    // 設置數據連接的只讀標識
    if (definition != null && definition.isReadOnly()) {
        try {
            if (debugEnabled) {
                logger.debug("Setting JDBC Connection [" + con + "] read-only");
            }
            con.setReadOnly(true);
        }
        catch (SQLException | RuntimeException ex) {
            Throwable exToCheck = ex;
            while (exToCheck != null) {
                if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
                    // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
                    throw ex;
                }
                exToCheck = exToCheck.getCause();
            }
            // "read-only not supported" SQLException -> ignore, it's just a hint anyway
            logger.debug("Could not set JDBC Connection read-only", ex);
        }
    }
    // Apply specific isolation level, if any.
    // 設置數據庫連接的隔離級別
    Integer previousIsolationLevel = null;
    if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
        if (debugEnabled) {
            logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
                                    definition.getIsolationLevel());
        }
        int currentIsolation = con.getTransactionIsolation();
        if (currentIsolation != definition.getIsolationLevel()) {
            previousIsolationLevel = currentIsolation;
            con.setTransactionIsolation(definition.getIsolationLevel());
        }
    }
    return previousIsolationLevel;
}

從上面我們可以看到都是通過Connection去設置的。

接下來我們再回到doBegin方法看這一行

TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());

這行代碼是將當前獲取到的連接綁定到當前線程,綁定解綁圍繞一個線程變量,這個變量就在TransactionSynchronizationManager中,如下:

private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");

這是一個static final修飾的線程變量,存儲的是一個Map,我們再看一下doBegin的靜態方法,也就是上面提到的:bindResource方法:

  • 看源碼(TransactionSynchronizationManager.java)
public static void bindResource(Object key, Object value) throws IllegalStateException {
    // 從上面可以知道,線程變量是一個 map ,而這個key就是dataSource,這個value 就是 holder
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    // 獲取這個線程變量的 map
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found
    if (map == null) {
        map = new HashMap<>();
        resources.set(map);
    }
    // 將新的 holder 作為 value ,dataSource作為 key 放入到當前線程 map 中
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
        oldValue = null;
    }
    if (oldValue != null) {
        throw new IllegalStateException(
                            "Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
    }
}
擴充知識點

這里提一下,mybatis中獲取的數據庫連接,就是dataSource從ThreadLocal中獲取的,以查詢舉例,會調用Executor#doQuery方法

image

最終會調用DataSourceUtils#doGetConnection方法獲取,真正的數據庫連接,其中的TransactionSynchronization中保存的就是方法調用前,spring增強方法中綁定到線程的connection,從而保證整個事務過程中connection的一致性

image

image

我們繼續來看看TransactionSynchronizationManagergetResource(Object key)這個方法

  • 看源碼(TransactionSynchronizationManager.java)
@Nullable
public static Object getResource(Object key) {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    return doGetResource(actualKey);
}

繼續追蹤一下里面的doGetResource方法

  • 看源碼(TransactionSynchronizationManager.java)
@Nullable
private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        map.remove(actualKey);
        // Remove entire ThreadLocal if empty...
        if (map.isEmpty()) {
            resources.remove();
        }
        value = null;
    }
    return value;
}

到這里我們就知道了getResource這個方法就說明了就是從線程變量的Map中根據DataSource獲取的ConnectionHolder

已經存在的事務

前面我們提到過,第一次事務開始時必會新創建一個holder然后做綁定操作,此時線程變量是有holder的且active為true,如果第二個事務進來,去new一個transaction之后去線程變量中去holder,holder是不為空的且active是為true的;所以會進入handleExistingTransaction方法;回到AbstractPlatformTransactionManagergetTransaction函數:查看handleExistingTransaction方法:

  • 看源碼(AbstractPlatformTransactionManager.java)
private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, Boolean debugEnabled)
            throws TransactionException {
    // 1. PROPAGATION_NEVER(不支持當前事務,如果當前事務存在,則拋出異常) 報錯
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                            "Existing transaction found for transaction marked with propagation 'never'");
    }
    // PROPAGATION_NOT_SUPPORTED (不支持當前事務,現有同步將掛起),掛起當前事務,返回一個空事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 這里會將原來的事務掛起,並返回被掛起的對象。
        Object suspendedResources = suspend(transaction);
        Boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 這里可以看到,第二個參數transaction傳了一個空事務,第三個參數false為舊標記
        // 最后一個參數就是將前面的掛起的對象封裝進了一個新的Status中,當前事務執行完成后,就恢復suspendedResources
        return prepareTransactionStatus(
                            definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    // 掛起當前事務,創建新事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                                    definition.getName() + "]");
        }
        // 將原事務掛起,此時新建事務,不與原事務有關系。
        // 會將transaction 中的holder 設置為 null ,然后解綁
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 注意這個函數。
            // new一個status出來,傳入transaction,並且為新事務標記,然后傳入掛起事務
            // 這里也做了一次doBegin,此時的transaction中holer是為空的,因為之前的事務被掛起了
            // 所以這里會取一次新的連接,並且綁定!
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // 如果此時的傳播特性是 PROPAGATION_NESTED,不會掛起事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                                    "Transaction manager does not allow nested transactions by default - " +
                                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        // 這里如果是JTA事務管理器,就不可以用savePoint了,將不會進入此方法
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            // 這里不會掛起事務,說明NESTED的特性是原事務的子事務而已
            // new一個status,傳入transaction,傳入舊事務標記,傳入掛起對象=null
            DefaultTransactionStatus status =
                                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            // 這里是NESTED特性特殊的地方,在先前存在事務的情況下會建立一個savePoint
            status.createAndHoldSavepoint();
            return status;
        } else {
            // JTA事務走這個分支,創建新事務
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            // JTA事務走這個分支,創建新事務
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }
    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                            (currentIsolationLevel != null ?
                                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                    "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                            definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    // 到這里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,存在事務加入事務即可,標記為舊事務,空掛起
    Boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
  • 源碼解析

從上述源碼中,對於已經存在事務的處理過程中,我們看到很多熟悉的操作,但是也有些不同的地方,函數中對已經存在的事務處理考慮分兩種情況:

  1. PROPAGATION_REQUIRES_NEW表示當前方法必須在它自己的事務里運行,一個新的事務將被啟動,而如果有一個事務正在運行的話,則在這個方法運行期間被掛起。而Spring中對於此種傳播方式的處理和新建事務建立的最大的不同點就是使用suspend方法將原事務掛起。將信息掛起的目的當然是為了在當前事務執行完畢后再將原事務還原。

  2. PROPAGATION_NESTED表示如果當前有一個事務正在運行中,則該方法應該運行在一個嵌套的事務中,被嵌套的事務可以獨立於封裝事務進行提交或者回滾,如果封裝事務不存在,行為就像PROPAGATION_REQUIRES_NEW。對於嵌入式事務的處理,Spring中主要考慮了兩種處理方式:

    • Spring中允許嵌入事務的時候,則首選設置保存點的方式作為異常處理的回滾
    • 對於其他方式,比如JTA無法使用保存點的方式,那么處理方式PROPAGATION_REQUIRES_NEW相同,而一旦出現異常,則由Spring的事務異常處理機制去完成后續操作

    對於掛起操作的主要目的是記錄原有事務的狀態,以便於后續操作對事務的恢復。

總結:

到這里我們可以知道,在當前存在事務的情況下,根據傳播特性去決定是否為新事務,是否掛起當前事務。

PROPAGATION_NOT_SUPPORTED:會掛起事務,不運行doBegin方法傳空transaction,標記為舊事務。封裝status對象。

return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);

PROPAGATION_REQUIRES_NEW將會掛起事務且運行doBegin方法,標記為新事務。封裝status對象

// 注意這個函數。
// new一個status出來,傳入transaction,並且為新事務標記,然后傳入掛起事務
// 這里也做了一次doBegin,此時的transaction中holer是為空的,因為之前的事務被掛起了
// 所以這里會取一次新的連接,並且綁定!
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
// startTransaction函數
...
DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
...

PROPAGATION_NESTED:不會掛起事務且不會運行doBegin方法,標記為舊事務,但會創建savePoint。封裝status對象:

DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);

其他事務例如:PROPAGATION_REQUIRED:不會掛起事務,封裝原有的transaction不會運行doBegin方法,標記舊事務,封裝status對象:

return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
掛起:

對於掛起操作的主要目的是記錄原有事務的狀態,以便后續操作對事務的恢復:

  • 看源碼(AbstractPlatformTransactionManager.java)
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                // 這里是真正做掛起的方法,這里返回的是一個holder
                suspendedResources = doSuspend(transaction);
            }
            // 這里將名稱、隔離級別等信息從線程變量中取出並設置對應屬性為null到線程變量
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            Boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            Boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            // 將事務各個屬性與掛起的holder一並封裝進SuspendedResourcesHolder對象中
            return new SuspendedResourcesHolder(
                                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    } else if (transaction != null) {
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    } else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

我們看一下doSuspend方法,在實現類DataSourceTransactionManager

  • 看源碼(DataSourceTransactionManager.java)
@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    // 將transaction中的holder屬性設置為空
    txObject.setConnectionHolder(null);
    // ConnnectionHolder從線程變量中解綁!
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

我們緊接着繼續追unbindResource函數里面的doUnbindResource

  • 看源碼(TransactionSynchronizationManager.java)
@Nullable
private static Object doUnbindResource(Object actualKey) {
    // 取得當前線程的線程變量Map
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    // 將key為dataSourece的value移除出Map,然后將舊的Holder返回
    Object value = map.remove(actualKey);
    // Remove entire ThreadLocal if empty...
    // 如果此時map為空,直接清除線程變量
    if (map.isEmpty()) {
        resources.remove();
    }
    // Transparently suppress a ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        value = null;
    }
    // 將舊Holder返回
    return value;
}

回顧:

簡單回顧一下解綁的操作。其實主要做了三件事:

  1. 將transaction中的holder屬性設置為空
  2. 解綁(會返回線程中的那個舊的holder出來,從而封裝到SuspendedResourcesHolder對象中)
  3. 將SuspendedResourcesHolder放入到status中,方便后期子事務完成后,恢復外層事務。

好了本次文章到這里就告一段落了,希望對你能有所幫助。

歡迎微信搜索【碼上遇見你】獲取更多精彩內容


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM