Spring事務(三)事務增強器


摘要: 本文結合《Spring源碼深度解析》來分析Spring 5.0.6版本的源代碼。若有描述錯誤之處,歡迎指正。

 

目錄

一、創建事務

1. 獲取事務

2. 處理已經存在的事務

3. 准備事務信息

二、回滾處理

1. 回滾條件

2. 回滾處理

3. 回滾后的信患清除

三、事務提交

 

TransactionInterceptor支撐這整個事務功能的架構,邏輯還是相對復雜的,那么我們現在切入正題來分析此攔截器是如何實現事務特性的。TransactionInterceptor類繼承自MethodInterceptor,所以調用該類是從其invoke方法開始的,首先預覽下這個方法:

@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 獲取對應事務屬性
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 獲取beanFactory中的transactionManager
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 構造方法唯一標識(類.方法,如:service.UserServiceImpl.save)
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    // 聲明式事務處理
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 創建TransactionInfo
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 執行增強方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            // 異常回滾
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            // 消除信息
            cleanupTransactionInfo(txInfo);
        }
        // 提交事務
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }

    else {
        final ThrowableHolder throwableHolder = new ThrowableHolder();

        // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
        try {
            // 編程式事務處理
            Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                try {
                    return invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                    if (txAttr.rollbackOn(ex)) {
                        // A RuntimeException: will lead to a rollback.
                        if (ex instanceof RuntimeException) {
                            throw (RuntimeException) ex;
                        }
                        else {
                            throw new ThrowableHolderException(ex);
                        }
                    }
                    else {
                        // A normal return value: will lead to a commit.
                        throwableHolder.throwable = ex;
                        return null;
                    }
                }
                finally {
                    cleanupTransactionInfo(txInfo);
                }
            });

            // Check result state: It might indicate a Throwable to rethrow.
            if (throwableHolder.throwable != null) {
                throw throwableHolder.throwable;
            }
            return result;
        }
        catch (ThrowableHolderException ex) {
            throw ex.getCause();
        }
        catch (TransactionSystemException ex2) {
            if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                ex2.initApplicationException(throwableHolder.throwable);
            }
            throw ex2;
        }
        catch (Throwable ex2) {
            if (throwableHolder.throwable != null) {
                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            }
            throw ex2;
        }
    }
}

從上面的函數中,我們嘗試整理下事務處理的脈絡,在Spring中支持兩種事務處理的方式, 分別是聲明式事務處理與編程式事務處理,兩者相對於開發人員來講差別很大,但是對於Spring中的實現來講,大同小異。在invoke中我們也可以看到這兩種方式的實現。考慮到對事務的應用比聲明式的事務處理使用起來方便,也相對流行些,我們就以此種方式進行分析。對於聲明式的事務處理主要有以下幾個步驟。

(1)獲取事務的屬性。

對於事務處理來說,最基礎或者說最首要的工作便是獲取事務屬性了,這是支撐整個事務功能的基石,如果沒有事務屬性,其他功能也無從談起,在分析事務准備階段時我們己經分析了事務屬性提取的功能,大家應該有所了解。

(2)加載配置中配置的TransactionManager。

(3)不同的事務處理方式使用不同的邏輯。

對於聲明式事務的處理與編程式事務的處理,第一點區別在於事務屬性上,因為編程式的事務處理是不需要有事務屬性的,第二點區別就是在TransactionManager上,CallbackPreferringPlatformTransactionManager 實現 PlatformTransactionManager 接口,暴露出一個方法用於執行事務處理中的回調。所以,這兩種方式都可以用作事務處理方式的判斷。

(4)在目標方法執行前獲取事務並收集事務信息。

事務信息與事務屬性並不相同,也就是Transactionlnfo與TransactionAttribute並不相同,Transactionlnfo 中包含 TransactionAttribute 信息,但是,除了 TransactionAttribute 外還有其他事務信息,例如 PlatformTransactionManager 以及 TransactionStatus 相關信息。

(5)執行目標方法。

(6)—旦出現異常,嘗試異常處理。

並不是所有異常,Spring都會將其回滾,默認只對RuntimeException回滾。

(7)提交亊務前的事務信息清除。

(8)提交事務。

上面的步驟分析旨在讓大家對事務功能與步驟有個大致的了解,具體的功能還需要詳細地分析。

一、創建事務

我們先分析事務創建的過程。

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    // 如果沒有名稱指定則使用方法唯一標識,並使用DelegatingTransactionAttribute封裝txAttr
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 獲取TransactionStatus
            status = tm.getTransaction(txAttr);
        }
        else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                        "] because no transaction manager has been configured");
            }
        }
    }
    // 根據指定的屬性與status准備一個TransactionInfo
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

對於createTransactionlfNecessary函數主要做了這樣幾件事情。

(1)使用 DelegatingTransactionAttribute 封裝傳入的 TransactionAttribute 實例。

對於傳入的TransactionAttribute類型的參數txAttr,當前的實際類型是RuleBasedTransactionAttribute,是由獲取事務屬性時生成,主要用於數據承載,而這里之所以使用DelegatingTransactionAttribute進行封裝,當然是提供了更多的功能。

(2)獲取事務。

事務處理當然是以事務為核心,那么獲取事務就是最重要的事情。

(3)構建事務信息。

根據之前幾個步驟獲取的信息構建Transactionlnfo並返回。

我們分別對以上步驟進行詳細的解析。

1. 獲取事務

Spring中使用getTransaction來處理事務的准備工作,包括事務獲取以及信息的構建。

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    if (definition == null) {
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }

    // 判斷當前線程是否存在事務,判斷依據為當前線程記錄的連接不為空且連接(connectionHolder)中的transactionActive屬性不為空
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        // 當前線程已經存在事務
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    // 事務超時設置驗證
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 如果當前線程不存在事務,但是PropagationBehavior卻被聲明為PROPAGATION_MANDATORY拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事務
        // 空掛起
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 構造transaction,包括設置connectionHolder、隔離級別、timeout
            // 如果是新連接、綁定到當前線程
            doBegin(transaction, definition);
            // 新同步事務的設置,針對與當前線程的設置
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

當然,在Spring中每個復雜的功能實現,並不是一次完成的,而是會通過入口函數進行一 個框架的搭建,初步構建完整的邏輯,而將實現細節分攤給不同的函數。那么,讓我們看看事務的准備工作都包括哪些。

(1)獲取事務

創建對應的事務實例,這里使用的是DataSourceTransactionManager中的doGetTransaction方法,創建基於JDBC的事務實例。如果當前線程中存在關於dataSource的連接,那么直接使用。這里有一個對保存點的設置,是否開啟允許保存點取決於是否設置了允許嵌入式事務。

@Override
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // 如果當前線程已經記錄數據庫連接則使用原有連接
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    // false表示非新創建連接
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

(2)如果當先線程存在事務,貝轉向嵌套事務的處理。

(3)事務超時設罝驗證。

(4)事務propagationBehavior屬性的設置驗證。

(5)構建 DefaultTransactionStatus。

(6)完善transaction,包括設置ConnectionHolder、隔離級別、timeout,如果是新連接,則綁定到當前線程。

對於一些隔離級別、timeout等功能的設置並不是由Spring來完成的,而是委托給底層的數據庫連接去做的,而對於數據庫連接的設置就是在doBegin函數中處理的。

    /**
     * This implementation sets the isolation level but ignores the timeout.
     * 構造transaction,包括設置ConnectionHolder、隔離級別、timeout
     * 如果是新連接,綁定到當前線程
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            // 設置隔離級別
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            // 更改自動提交設置,由Spring控制提交
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }

            prepareTransactionalConnection(con, definition);
            // 設置判斷當前線程是否存在事務的依據
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                // 將當前獲取到的連接綁定到當前線程
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

可以說事務是從這個函數開始的,因為在這個函數中已經開始嘗試了對數據庫連接的獲取,當然,在獲取數據庫連接的同時,一些必要的設置也是需要同步設置的。

  • 嘗試獲取連接。

當然並不是每次都會獲取新的連接,如果當前線程中的connectionHolder已經存在,則沒有必要再次獲取,或者,對於事務同步表示設置為true的需要重新獲取連接。

  • 設置隔離級別以及只讀標識。

你是否有過這樣的錯覺?事務中的只讀配置是Spring中做了一些處理呢? Spring中確實是針對只讀操作做了一些處理,但是核心的實現是設置connection上的readOnly屬性。同樣,對於隔離級別的控制也是交由connection去控制的。

  • 更改默認的提交設置。

如果事務屬性是自動提交,那么需要改變這種設置,而將提交操作委托給Spring來處理。

  • 設置標志位,標識當前連接已經被事務激活。
  • 設置過期時間。
  • 將connectionHolder綁定到當前線程。

設置隔離級別的prepareConnectionForTransaction函數用於負責對底層數據庫連接的設置,當然,只是包含只讀標識和隔離級別的設置。由於強大的日志及異常處理,顯得函數代碼量比較大,但是單從業務角度去看,關鍵代碼其實是不多的。

@Nullable
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
        throws SQLException {

    Assert.notNull(con, "No Connection specified");

    // Set read-only flag.
    // 設置數據連接的只讀標識
    if (definition != null && definition.isReadOnly()) {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Setting JDBC Connection [" + con + "] read-only");
            }
            con.setReadOnly(true);
        }
        catch (SQLException | RuntimeException ex) {
            Throwable exToCheck = ex;
            while (exToCheck != null) {
                if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
                    // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
                    throw ex;
                }
                exToCheck = exToCheck.getCause();
            }
            // "read-only not supported" SQLException -> ignore, it's just a hint anyway
            logger.debug("Could not set JDBC Connection read-only", ex);
        }
    }

    // Apply specific isolation level, if any.
    // 設置數據庫連接的隔離級別
    Integer previousIsolationLevel = null;
    if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
        if (logger.isDebugEnabled()) {
            logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
                    definition.getIsolationLevel());
        }
        int currentIsolation = con.getTransactionIsolation();
        if (currentIsolation != definition.getIsolationLevel()) {
            previousIsolationLevel = currentIsolation;
            con.setTransactionIsolation(definition.getIsolationLevel());
        }
    }

    return previousIsolationLevel;
}

(7)將事務信息記錄在當前線程中。

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                        definition.getIsolationLevel() : null);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        TransactionSynchronizationManager.initSynchronization();
    }
}

2. 處理已經存在的事務

之前講述了普通事務建立的過程,但是Spring中支持多種事務的傳播規則,比如PROPAGATION_NESTED、PROPAGATION_REQUIRES_NEW等,這些都是在已經存在事務的基礎上進行進一步的處理,那么,對於已經存在的事務,准備操作是如何進行的呢?

/**
 * Create a TransactionStatus for an existing transaction.
 */
private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        // 新事務的建立
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // 嵌入式事務的處理
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            // 如果沒有可以使用保存點的方式控制事務回滾,那么在嵌入式事務的建立初始建立保存點
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            // 有些情況是不能使用保存操作,比如JTA,那么建立新事物
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

對於已經存在事務的處理過程中,我們看到了很多熟悉的操作,但是,也有些不同的地方,函數中對已經存在的事務處理考慮兩種情況。

(1)PROPAGATION_REQUIRES_NEW表示當前方法必須在它自己的事務里運行,一個新的事務將被啟動,而如果有一個事務正在運行的話,則在這個方法運行期間被掛起。而Spring中對於此種傳播方式的處理與新事務建立最大的不同點在於使用suspend方法將原事務掛起。 將信息掛起的目的當然是為了在當前事務執行完畢后在將原事務還原。

(2)PROPAGATION_NESTED表示如果當前正有一個事務在運行中,則該方法應該運行在一個嵌套的事務中,被嵌套的事務可以獨立於封裝事務進行提交或者回滾,如果封裝事務不存在,行為就像PROPAGATION_REQUIRES_NEW。對於嵌入式事務的處理,Spring中主要考慮了兩種方式的處理。

  • Spring中允許嵌入事務的時候,則首選設置保存點的方式作為異常處理的回滾。
  • 對於其他方式,比如JTA無法使用保存點的方式,那么處理方式與PROPAGATION_ REQUIRES_NEW相同,而一旦出現異常,則由Spring的事務異常處理機制去完成后續操作。

對於掛起操作的主要目的是記錄原有事務的狀態,以便於后續操作對事務的恢復:

@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

3. 准備事務信息

當已經建立事務連接並完成了事務信息的提取后,我們需要將所有的事務信息統一記錄在Transactionlnfo類型的實例中,這個實例包含了目標方法幵始前的所有狀態信息,一旦事務執行失敗,Spring會通過Transactionlnfo類型的實例中的信息來進行回滾等后續工作。

    protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, String joinpointIdentification,
            @Nullable TransactionStatus status) {

        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method...
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // The transaction manager will flag an error if an incompatible tx already exists.
            // 記錄事務狀態
            txInfo.newTransactionStatus(status);
        }
        else {
            // The TransactionInfo.hasTransaction() method will return false. We created it only
            // to preserve the integrity of the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled()) {
                logger.trace("Don't need to create transaction for [" + joinpointIdentification +
                        "]: This method isn't transactional.");
            }
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        txInfo.bindToThread();
        return txInfo;
    }

二、回滾處理

之前已經完成了目標方法運行前的事務准備工作,而這些准備工作最大的目的無非是對於程序沒有按照我們期待的那樣進行,也就是出現特定的錯誤,那么,當出現錯誤的時候,Spring是怎么對數據進行恢復的呢?

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    // 當拋出異常時首先判斷當前是否存在事務,這是基礎依據
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                    "] after exception: " + ex);
        }
        // 這里判斷是否回滾默認的依據是拋出的異常是否是RuntimeException或者是Error的類型
        if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            }
            catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            }
            catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                throw ex2;
            }
        }
        else {
            // We don't roll back on this exception.
            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
            // 如果不滿足回滾條件即使拋出異常也同樣會提交
            try {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
            catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            }
            catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                throw ex2;
            }
        }
    }
}

在對目標方法的執行過程中,一旦出現Throwable就會被引導至此方法處理,但是並不代表所有的Throwable都會被回滾處理,比如我們最常用的Exception,默認是不會被處理的。 默認情況下,即使出現異常,數據也會被正常提交,而這個關鍵的地方就是在txlnfo.transactionAttribute.rollbackOn(ex)這個函數。

1. 回滾條件

@Override
public boolean rollbackOn(Throwable ex) {
    return (ex instanceof RuntimeException || ex instanceof Error);
}

看到了嗎?默認情況下Spring中的亊務異常處理機制只對RuntimeException和Error兩種情況感興趣,當然你可以通過擴展來改變,不過,我們最常用的還是使用事務提供的厲性設置, 利用注解方式的使用,例如:

@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)

2. 回滾處理

當然,一旦符合回滾條件,那么Spring就會將程序引導至回滾處理函數中。

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;

        try {
            // 激活所有TransactionSynchronization中對應的方法
            triggerBeforeCompletion(status);

            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                // 如果有保存點,也就是當前事務為單獨的線程則會退到保存點
                status.rollbackToHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                // 如果當前事務為獨立的新事務,則直接回退
                doRollback(status);
            }
            else {
                // Participating in larger transaction
                if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        }
                        // 如果當前事務不是獨立的事務,那么只能標記狀態,等到事務鏈執行完畢后統一回滾
                        doSetRollbackOnly(status);
                    }
                    else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                        }
                    }
                }
                else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
                // Unexpected rollback only matters here if we're asked to fail early
                if (!isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = false;
                }
            }
        }
        catch (RuntimeException | Error ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }

        // 激活所有TransactionSynchronization中對應的方法
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

        // Raise UnexpectedRollbackException if we had a global rollback-only marker
        if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
                    "Transaction rolled back because it has been marked as rollback-only");
        }
    }
    finally {
        // 清空記錄的資源並將掛起的資源恢復
        cleanupAfterCompletion(status);
    }
}

同樣,對於在Spring中的復雜的邏輯處理過程,在入口函數一般都會給出個整體的處理脈絡,而把實現細節委托給其他函數去執行。我們嘗試總結下Spring中對於回滾處理的大致脈絡如下。

(1)首先是自定義觸發器的調用,包括在回滾前、完成回滾后的調用,當然完成回滾包括正常回滾與回滾過程中出現異常,自定義的觸發器會根據這些信息作進一步處理,而對於觸發器的注冊,常見是在回調過程中通過TransactionSynchronizationManager類中的靜態方法直接注冊:

public static void registerSynchronization(TransactionSynchronization synchronization)

(2)除了觸發監聽函數外,就是真正的回滾邏輯處理了。

  • 當之前已經保存的事務信息中有保存點信息的時候,使用保存點信息進行回滾。常用於嵌入式事務,對於嵌入式的事務的處理,內嵌的事務異常並不會引起外部事務的回滾。

根據保存點回滾的實現方式其實是根據底層的數據庫連接進行的。

public void rollbackToHeldSavepoint() throws TransactionException {
    Object savepoint = getSavepoint();
    if (savepoint == null) {
        throw new TransactionUsageException(
                "Cannot roll back to savepoint - no savepoint associated with current transaction");
    }
    getSavepointManager().rollbackToSavepoint(savepoint);
    getSavepointManager().releaseSavepoint(savepoint);
    setSavepoint(null);
}

這里使用的是JDBC的方式進行數據庫連接,那么getSavepointManager()函數返回的是JdbcTransactionObjectSupport,也就是說上面函數會調用JdbcTransactionObjectSupport 中的 rollbackToSavepoint 方法。

@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
    ConnectionHolder conHolder = getConnectionHolderForSavepoint();
    try {
        conHolder.getConnection().rollback((Savepoint) savepoint);
        conHolder.resetRollbackOnly();
    }
    catch (Throwable ex) {
        throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
    }
}
  • 當之前已經保存的事務信息中的事務為新事物,那么直接回滾。常用於單獨事務的處理。對於沒有保存點的回滾,Spring同樣是使用底層數據庫連接提供的API來操作的。由於我們使用的是DataSourceTransactionManager,那么doRollback函數會使用此類中的實現:
@Override
protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.rollback();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    }
}
  • 當前事務信息中表明是存在事務的,又不屬於以上兩種情況,多數用於JTA,只做回滾標識,等到提交的時候統一不提交。

3. 回滾后的信患清除

對於回滾邏輯執行結束后,無論回滾是否成功,都必須要做的事情就是事務結束后的收尾工作。

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    // 設置完成狀態
    status.setCompleted();
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        if (status.isDebug()) {
            logger.debug("Resuming suspended transaction after completion of inner transaction");
        }
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
        // 結束之前事務的掛起狀態
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

從函數中得知,事務處理的收尾處理工作包括如下內容。

(1)設置狀態是對事務信息做完成標識以避免重復調用。

(2)如果當前事務是新的同步狀態,需要將綁定到當前線程的事務信息清除。

(3)如果是新事務需要做些清除資源的工作。

@Override
protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

    // Remove the connection holder from the thread, if exposed.
    if (txObject.isNewConnectionHolder()) {
        // 將數據庫連接從當前線程中解除綁定
        TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }

    // Reset connection.
    // 釋放連接
    Connection con = txObject.getConnectionHolder().getConnection();
    try {
        if (txObject.isMustRestoreAutoCommit()) {
            // 恢復數據庫連接的自動提交屬性
            con.setAutoCommit(true);
        }
        // 重置數據庫連接
        DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    }
    catch (Throwable ex) {
        logger.debug("Could not reset JDBC Connection after transaction", ex);
    }

    if (txObject.isNewConnectionHolder()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
        }
        // 如果當前事務是獨立的新創建的事務則在事務完成時釋放數據庫連接
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }

    txObject.getConnectionHolder().clear();
}

(4)如果在事務執行前有事務掛起,那么當前事務執行結束后需要將掛起事務恢復。

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

三、事務提交

之前我們分析了Spring的事務異常處理機制,那么事務的執行並沒有出現任何的異常,也就意味着事務可以走正常事務提交的流程了。

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

在真正的數據提交之前,還需要做個判斷。不知道大家還有沒有印象,在我們分析事務異常處理規則的時候,當某個事務既沒有保存點又不是新事物,Spring對它的處理方式只是設置一個回滾標識。這個回滾標識在這里就會派上用場了,主要的應用場景如下。

某個事務是另一個事務的嵌入事務,但是,這些事務又不在Spring的管理范圍內,或者無法設置保存點,那么Spring會通過設置回滾標識的方式來禁止提交。首先當某個嵌入事務發生回滾的時候會設置回滾標識,而等到外部事務提交時,一旦判斷出當前事務流被設置了回滾標識,則由外部事務來統一進行整體事務的回滾。

所以,當事務沒有被異常捕獲的時候也並不意味着一定會執行提交的過程。

@Override
public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 如果在事務鏈中已經被標記回滾,那么不會嘗試提交事務,直接回滾
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }

    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    // 處理事務提交
    processCommit(defStatus);
}

而當事務執行一切都正常的時候,便可以真正地進入提交流程了。

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            // 預留
            prepareForCommit(status);
            // 添加的TransactionSynchronization中的對應方法的調用
            triggerBeforeCommit(status);
            // 添加的TransactionSynchronization中的對應方法的調用
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;

            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                // 如果存在保存點則清除保存點信息
                status.releaseHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                // 如果是獨立的事務則直接提交
                doCommit(status);
            }
            else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            }
            else {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        catch (RuntimeException | Error ex) {
            if (!beforeCompletionInvoked) {
                // 添加的TransactionSynchronization中的對應方法的調用
                triggerBeforeCompletion(status);
            }
            // 提交過程中出現異常則回滾
            doRollbackOnCommitException(status, ex);
            throw ex;
        }

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            // 添加的TransactionSynchronization中的對應方法的調用
            triggerAfterCommit(status);
        }
        finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        cleanupAfterCompletion(status);
    }
}

在提交過程中也並不是直接提交的,而是考慮了諸多的方面,符合提交的條件如下。

  • 當事務狀態中有保存點信息的話便不會去提交事務。
  • 當事務非新事務的時候也不會去執行提交事務操作。

此條件主要考慮內嵌事務的情況,對於內嵌事務,在Spring中正常的處理方式是將內嵌事務開始之前設置保存點,一旦內嵌事務出現異常便根據保存點信息進行回滾,但是如果沒有出現異常,內嵌事務並不會單獨提交,而是根據事務流由最外層事務負責提交,所以如果當前存在保存點信息便不是最外層事務,不做保存操作,對於是否是新事務的判斷也是基於此考慮。

如果程序流通過了事務的層層把關,最后順利地進入了提交流程,那么同樣,Spring會將事務提交的操作引導至底層數據庫連接的API,進行事務提交。

@Override
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM