Spring事務解析4-切面織入


BeanFactoryTransactionAttributeSourceAdvisor作為Advisor的實現類,自然要遵從Advisor的處理方式,當代理被調用時會調用這個類的增強方法,也就是此bean的Advise,又因為在解析事務定義標簽時我們把TransactionInterceptor類型的bean注入到了BeanFactoryTransactionAttributeSourceAdvisor中,所以,在調用事務增強器增強的代理類時會首先執行TransactionInterceptor進行增強,同時,也就是在TransactionInterceptor類中的invoke方法中完成了整個事務的邏輯。

  BeanFactoryTransactionAttributeSourceAdvisor.java
    @Override
    public Advice getAdvice() {
        synchronized (this.adviceMonitor) {
            if (this.advice == null && this.adviceBeanName != null) {
                Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve 'adviceBeanName'");
                this.advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
            }
            return this.advice;
        }
    }
//見標簽解析,所以獲取的是TransactionInterceptor
//將interceptorName的bean注入advisorDef的adviceBeanName屬性中
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);

TransactionInterceptor支撐着整個事務功能的架構,邏輯還是相對復雜的,那么現在我們切入正題來分析此攔截器是如何實現事務特性的。TransactionInterceptor類繼承自MethodInterceptor,所以調用該類是從其invoke方法開始的

    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            @Override
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
        });
    }
    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {
        //獲取對應事務屬性
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        //獲取beanFactory中的transactionManager
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        //構造方法唯一標識(類.方法,如service.UserServiceImpl.save)
        final String joinpointIdentification = methodIdentification(method, targetClass);
        //聲明式事務處理
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //創建TransactionInfo
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                //執行被增強方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                //異常回滾
 completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                //清除信息
                cleanupTransactionInfo(txInfo);
            }
            //提交事務
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        //編程式事務處理
        else {
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                            @Override
                            public Object doInTransaction(TransactionStatus status) {
                                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                try {
                                    return invocation.proceedWithInvocation();
                                }
                                catch (Throwable ex) {
                                    if (txAttr.rollbackOn(ex)) {
                                        // A RuntimeException: will lead to a rollback.
                                        if (ex instanceof RuntimeException) {
                                            throw (RuntimeException) ex;
                                        }
                                        else {
                                            throw new ThrowableHolderException(ex);
                                        }
                                    }
                                    else {
                                        // A normal return value: will lead to a commit.
                                        return new ThrowableHolder(ex);
                                    }
                                }
                                finally {
                                    cleanupTransactionInfo(txInfo);
                                }
                            }
                        });

                // Check result: It might indicate a Throwable to rethrow.
                if (result instanceof ThrowableHolder) {
                    throw ((ThrowableHolder) result).getThrowable();
                }
                else {
                    return result;
                }
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
        }
    }

從上面的函數中,在Spring中支持兩種事務處理的方式,分別是聲明式事務處理與編程式事務處理,兩者相對於開發人員來講差別很大,但是對於Spring中的實現來講,大同小異。在invoke中我們也可以看到這兩種方式的實現。考慮到對事務的應用比聲明式的事務處理使用起來方便,也相對流行些,我們就以此種方式進行分析。對於聲明式的事務處理主要有以下幾個步驟。

(1)獲取事務的屬性。對於事務處理來說,最基礎或者說最首要的工作便是獲取事務屬性了,這是支撐整個事務功能的基石,如果沒有事務屬性,其他功能也無從談起,根據傳播特性等屬性進行事務創建。

(2)加載配置中配置的TransactionManager。

(3)不同的事務處理方式使用不同的邏輯。對於聲明式事務的處理與編程式事務的處理,第一點區別在於事務屬性上,因為編程式的事務處理是不需要有事務屬性的,第二點區別就是在TransactionManager上,CallbackPreferringPlatformTransactionManager實現PlatformTransactionManager接口,暴露出一個方法用於執行事務處理中的回調。所以,這兩種方式都可以用作事務處理方式的判斷。

(4)在目標方法執行前獲取事務並收集事務信息。事務信息與事務屬性並不相同,也就是TransactionInfo與TransactionAttribute並不相同,TransactionInfo中包含TransactionAttribute信息,但是,除了TransactionAttribute外還有其他事務信息,例如PlatformTransactionManager以及TransactionStatus相關信息。

(5)執行目標方法。

(6)一旦出現異常,嘗試異常處理。並不是所有異常,Spring都會將其回滾,默認情況下Spring中的事務異常處理機制只對RuntimeException和Error兩種情況感興趣,當然你可以通過擴展來改變,不過,我們最常用的還是使用事務提供的屬性設置,利用注解方式的使用,例如:@Transactional(propagation=Propagation.REQUIRED,rollbackFor=Exception.class)

(7)提交事務前的事務信息清除。

(8)提交事務。

創建事務

    protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
        //如果沒有名稱指定則使用方法唯一標識,並使用DelegatingTransactionAttribute封裝txAttr
        //對於傳入的TransactionAttribute類型的參數txAttr,當前的實際類型是RuleBasedTransactionAttribute,是由獲取事務屬性時生成,主要用於數據承載,
        //而這里之所以使用Delegating TransactionAttribute進行封裝,當然是提供了更多的功能。
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                //獲取事務,獲取TransactionStatus
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        //構建事務信息,根據指定的屬性與status准備一個TransactionInfo
        //當已經建立事務連接並完成了事務信息的提取后,我們需要將所有的事務信息統一記錄在TransactionInfo類型的實例中,
        //這個實例包含了目標方法開始前的所有狀態信息,一旦事務執行失敗,Spring會通過TransactionInfo類型的實例中的信息來進行回滾等后續工作。
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
    @Override
    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        //創建對應的事務實例,這里使用的是DataSourceTransactionManager中的doGetTransaction方法
        //創建基於JDBC的事務實例。如果當前線程中存在關於dataSource的連接,那么直接使用。
        Object transaction = doGetTransaction();
        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();
        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }
        //判斷當前線程是否存在事務,判讀依據為當前線程記錄的連接不為空且連接中(connectionHolder)中的transactionActive屬性不為空
        if (isExistingTransaction(transaction)) {
            //當前線程已經存在事務,處理已經存在的事務。
       //Spring中支持多種事務的傳播規則,比如PROPAGATION_ NESTED、PROPAGATION_REQUIRES_NEW等,
       //這些都是在已經存在事務的基礎上進行進一步的處理
       //(1)PROPAGATION_REQUIRES_NEW表示當前方法必須在它自己的事務里運行,一個新的事務將被啟動,而如果有一個事務正在運行的話,則在這個方法運行期間被掛起。而Spring
            //中對於此種傳播方式的處理與新事務建立最大的不同點在於使用suspend方法將原事務掛起。將信息掛起的目的當然是為了在當前事務執行完畢后在將原事務還原。
       //(2)PROPAGATION_NESTED表示如果當前正有一個事務在運行中,則該方法應該運行在一個嵌套的事務中,被嵌套的事務可以獨立於封裝事務進行提交或者回滾,如果封裝事務不存
            //在,行為就像PROPAGATION_REQUIRES_NEW。對於嵌入式事務的處理,Spring中主要考慮了兩種方式的處理。
            //Spring中允許嵌入事務的時候,則首選設置保存點的方式作為異常處理的回滾。對於其他方式,比如JTA無法使用保存點的方式,那么處理方式與PROPAGATION_REQUIRES_NEW相同
            //而一旦出現異常,則由Spring的事務異常處理機制去完成后續操作。
            //對於掛起操作的主要目的是記錄原有事務的狀態,以便於后續操作對事務的恢復。
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }
        //事務超時設置驗證
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }
        //如果當前線程不存在事務,但是propagationBehavior卻被聲明為PROPAGATION_MANDATORY拋出異常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        //PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事務
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                //構建DefaultTransactionStatus
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //構造transaction,包括設置ConnectionHolder、隔離級別、timout如果是新連接,綁定到當前線程
 doBegin(transaction, definition);
                //新同步事務的設置,針對於當前線程的設置
 prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
    @Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        //如果當前線程已經記錄數據庫連接則使用原有連接
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        //false表示非新創建連接。
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

對於一些隔離級別、timeout等功能的設置並不是由Spring來完成的,而是委托給底層的數據庫連接去做的,而對於數據庫連接的設置就是在doBegin函數中處理的。

可以說事務是從這個函數開始的,因為在這個函數中已經開始嘗試了對數據庫連接的獲取,當然,在獲取數據庫連接的同時,一些必要的設置也是需要同步設置的。

  1. 嘗試獲取連接。當然並不是每次都會獲取新的連接,如果當前線程中的connectionHolder已經存在,則沒有必要再次獲取,或者,對於事務同步表示設置為true的需要重新獲取連接。
  2. 設置隔離級別以及只讀標識。你是否有過這樣的錯覺?事務中的只讀配置是Spring中做了一些處理呢?Spring中確實是針對只讀操作做了一些處理,但是核心的實現是設置connection上的readOnly屬性。同樣,對於隔離級別的控制也是交由connection去控制的。
  3. 更改默認的提交設置。如果事務屬性是自動提交,那么需要改變這種設置,而將提交操作委托給Spring來處理。
  4. 設置標志位,標識當前連接已經被事務激活。
  5. 設置過期時間。
  6. 將connectionHolder綁定到當前線程。
    //構造transaction,包括設置ConnectionHolder、隔離級別、timeout,如果是新連接,綁定到當前線程
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
        try {
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //設置隔離級別
       //設置隔離級別的prepareConnectionForTransaction函數用於負責對底層數據庫連接的設置,
       //當然,只是包含只讀標識和隔離級別的設置。由於強大的日志及異常處理,顯得函數代碼量比較大,但是單從業務角度去看,關鍵代碼其實是不多的。 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); //更改自動提交設置,由Spring控制提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } txObject.getConnectionHolder().setTransactionActive(true); //設置判斷當前線程是否存在事務的依據 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } //將當前獲取到的連接綁定到當前線程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }

將事務信息記錄在當前線程中

    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }

Spring中對於回滾處理completeTransactionAfterThrowing的大致脈絡如下:

(1)首先是自定義觸發器的調用,包括在回滾前、完成回滾后的調用,當然完成回滾包括正常回滾與回滾過程中出現異常,自定義的觸發器會根據這些信息作進一步處理,而對於觸發器的注冊,常見是在回調過程中通過TransactionSynchronizationManager類中的靜態方法直接注冊:

public static void registerSynchronization(TransactionSynchronization synchronization)

(2)除了觸發監聽函數外,就是真正的回滾邏輯處理了。當之前已經保存的事務信息中有保存點信息的時候,使用保存點信息進行回滾。常用於嵌入式事務,對於嵌入式的事務的處理,內嵌的事務異常並不會引起外部事務的回滾。根據保存點回滾的實現方式其實是根據底層的數據庫連接進行的。這里使用的是JDBC的方式進行數據庫連接,那么getSavepointManager()函數返回的是JdbcTransactionObjectSupport,也就是說上面函數會調用JdbcTransactionObjectSupport中的rollbackToSavepoint方法。當之前已經保存的事務信息中的事務為新事物,那么直接回滾。常用於單獨事務的處理。對於沒有保存點的回滾,Spring同樣是使用底層數據庫連接提供的API來操作的。由於我們使用的是DataSourceTransactionManager,那么doRollback函數會使用此類中的實現。當前事務信息中表明是存在事務的,又不屬於以上兩種情況,多數用於JTA,只做回滾標識,等到提交的時候統一不提交。

(3)回滾后的信息清除,對於回滾邏輯執行結束后,無論回滾是否成功,都必須要做的事情就是事務結束后的收尾工作。事務處理的收尾處理工作包括如下內容。

  1. 設置狀態是對事務信息作完成標識以避免重復調用。
  2. 如果當前事務是新的同步狀態,需要將綁定到當前線程的事務信息清除。
  3. 如果是新事物需要做些清除資源的工作。將數據庫連接從當前線程中解除綁定,釋放鏈接,恢復數據庫連接的自動提交屬性,重置數據庫連接,如果當前事務時獨立的新創建的事務則在事務完成時釋放數據庫連接。

(4)如果在事務執行前有事務掛起,那么當前事務執行結束后需要將掛起事務恢復。

Spring中對於提交事務commitTransactionAfterReturning的大致脈絡如下:

在真正的數據提交之前,還需要做個判斷。在我們分析事務異常處理規則的時候,當某個事務既沒有保存點又不是新事物,Spring對它的處理方式只是設置一個回滾標識。這個回滾標識在這里就會派上用場了,主要的應用場景如下,某個事務是另一個事務的嵌入事務,但是,這些事務又不在Spring的管理范圍內,或者無法設置保存點,那么Spring會通過設置回滾標識的方式來禁止提交。首先當某個嵌入事務發生回滾的時候會設置回滾標識,而等到外部事務提交時,一旦判斷出當前事務流被設置了回滾標識,則由外部事務來統一進行整體事務的回滾。所以,當事務沒有被異常捕獲的時候也並不意味着一定會執行提交的過程。

在提交過程中也並不是直接提交的,而是考慮了諸多的方面,符合提交的條件如下:

  1. 當事務狀態中有保存點信息的話便不會去提交事務。
  2. 當事務非新事務的時候也不會去執行提交事務操作。

此條件主要考慮內嵌事務的情況,對於內嵌事務,在Spring中正常的處理方式是將內嵌事務開始之前設置保存點,一旦內嵌事務出現異常便根據保存點信息進行回滾,但是如果沒有出現異常,內嵌事務並不會單獨提交,而是根據事務流由最外層事務負責提交,所以如果當前存在保存點信息便不是最外層事務,不做保存操作,對於是否是新事務的判斷也是基於此考慮。如果程序流通過了事務的層層把關,最后順利地進入了提交流程,那么同樣,Spring會將事務提交的操作引導至底層數據庫連接的API,進行事務提交。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM