spring---transaction(1)---源代碼分析(事務的攔截器TransactionInterceptor)


寫在前面:

  • 先了解一下spring的事務。分為分明式事務管理和注解式事務管理,對於前期的事務,spring會通過掃描攔截對於事務的方法進行增強(以后講解)。
  • 若果目標方法存在事務,spring產出的bean會是一個代理對象(cglib或者jdk)。
  • 本問討論的是spring攔截到事務,對於事務的增強處理。

  

  spring自己的一系列接口設計

    •  PlatformTransactionManager 事務管理器
    •  TransactionDefinition 事務定義
    •  TransactionStatus 事務狀態

 

 

TranctionInterceptor之前了解

  • 看過spring源碼的同學一定都會找spring tx的入口就是在TxAdviceBeanDefinitionParser這里將解析tx的配置,生成TransactionInterceptor對象,這個也就是一個普通的切面類,只要符合AOP規則的調用都會進入此切面。
  • ransactionInterceptor支撐着整個事務功能的架構,邏輯還是相對復雜的,那么現在我們切入正題來分析此攔截器是如何實現事務特性的。
  • spring在處理事務的aop增強是,主要調用了return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);

 

TranctionInterceptor

   首先看TranctionInterceptor(位於spring-tx-*.jar中的org.springframework.transaction.interceptor)的結構:

   繼承類TransactionAspectSupport:其實對其進行了增強(模板方法模式)

   實現接口MethodInterceptor:方法攔截器,執行代理類的目標方法,會觸發invoke方法執行

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    @Override
    //實現了MethodInterceptor的invoke方法
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //獲取目標類
     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
     //父類TransactionAspectSupport的模板方法
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            @Override
       //InvocationCallback接口的回調方法
            public Object proceedWithInvocation() throws Throwable {
          //執行目標方法
                return invocation.proceed();
            }
        });
    }
}

 

重點分析 抽象類TransactionAspectSupport(基類)的invokeWithinTransaction方法

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
   //protected修飾,不允許其他包和無關類調用
    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
        // 獲取對應事務屬性.如果事務屬性為空(則目標方法不存在事務)
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
     // 根據事務的屬性獲取beanFactory中的PlatformTransactionManager(spring事務管理器的頂級接口),一般這里或者的是DataSourceTransactiuonManager
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
     // 目標方法唯一標識(類.方法,如service.UserServiceImpl.save)
        final String joinpointIdentification = methodIdentification(method, targetClass);
     //如果txAttr為空或者tm 屬於非CallbackPreferringPlatformTransactionManager,執行目標增強     ①
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //看是否有必要創建一個事務,根據事務傳播行為,做出相應的判斷
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
          //回調方法執行,執行目標方法(原有的業務邏輯)
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 異常回滾
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
          //清除信息
                cleanupTransactionInfo(txInfo);
            }
        //提交事務
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
     //編程式事務處理(CallbackPreferringPlatformTransactionManager) 不做重點分析
        else {
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                            @Override
                            public Object doInTransaction(TransactionStatus status) {
                                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                try {
                                    return invocation.proceedWithInvocation();
                                }
                                catch (Throwable ex) {
                                    if (txAttr.rollbackOn(ex)) {
                                        // A RuntimeException: will lead to a rollback.
                                        if (ex instanceof RuntimeException) {
                                            throw (RuntimeException) ex;
                                        }
                                        else {
                                            throw new ThrowableHolderException(ex);
                                        }
                                    }
                                    else {
                                        // A normal return value: will lead to a commit.
                                        return new ThrowableHolder(ex);
                                    }
                                }
                                finally {
                                    cleanupTransactionInfo(txInfo);
                                }
                            }
                        });

                // Check result: It might indicate a Throwable to rethrow.
                if (result instanceof ThrowableHolder) {
                    throw ((ThrowableHolder) result).getThrowable();
                }
                else {
                    return result;
                }
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
        }
    }
}
  • ①不同的事務處理方式使用不同的邏輯。對於聲明式事務的處理與編程式事務的處理,第一點區別在於事務屬性上,因為編程式的事務處理是不需要有事務屬性的,第二點區別就是在TransactionManager上,CallbackPreferringPlatformTransactionManager實現PlatformTransactionManager接口,暴露出一個方法用於執行事務處理中的回調。所以,這兩種方式都可以用作事務處理方式的判斷。

 

   重點分析createTransactionIfNecessary方法,它會判斷是否存在事務,根據事務的傳播屬性。做出不同的處理,也是做了一層包裝,核心是通過TransactionStatus來判斷事務的屬性。

   通過持有的PlatformTransactionManager來獲取TransactionStatus

   AbstractPlatformTransactionManager.java(spring中存在很多模板方法,對於 Abstract開頭的封裝的抽象類,基本都有模板方法,且為final修飾

    @Override
    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     //這里其實主要就是調用PlatformTransactionManager的getTransactionf方法來獲取TransactionStatus來開啟一個事務: Object transaction
= doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); }      //這個判斷很重要,是否已經存在的一個transaction if (isExistingTransaction(transaction)) {
       //如果是存在的將進行一些處理
// Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); }
     //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED這三種類型將開啟一個新的事務
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
          //開啟新事物 doBegin(transaction, definition); prepareSynchronization(status, definition);
return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }

 

 

  這段代碼比較長也是比較核心的一段代碼,讓我們來慢慢分析,首先這里將執行doGetTransaction方法來獲取一個transaction,和dobegin方法如何開啟一個事務

  AbstractPlatformTransactionManager並沒有給出doGetTransaction的具體實現。而是由子類實現。

  我們以分析實現類DataSourceTransactionManager的具體方法。

  

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {
    private DataSource dataSource;

    @Override
  //這段代碼中主要是根據this.dataSource來獲取ConnectionHolder,這個ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次來獲取,肯定得到是null。
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed());
     //這一行代碼中TransactionSynchronizationManager很重要,是對connection的核心獲取、持有、刪除等 ConnectionHolder conHolder
= (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
     //這里不論獲取到或者獲取不到都將此設置newConnectionHolder為false txObject.setConnectionHolder(conHolder,
false); return txObject; }

 

  接着代碼往下將執行到isExistingTransaction(transaction),這里主要是依據下面代碼判斷:

    @Override
    protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
     //如果是第一次開啟事務這里必然是false,否則將返回true。
return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); }

  我們這里先討論第一次進入的情況,也就是false的時候將繼續往下執行到了判斷事務Propagation的時候了,如果Propagation為:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一個將開啟一個新事物,new一個新的DefaultTransactionStatus ,並且newTransaction設置為true,這個狀態很重要,因為后面的不論回滾、提交都是根據這個屬性來判斷是否在這個TransactionStatus上來進行。

  接着將執行doBegin方法:

    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
          //從dataSource中獲取一個Connection Connection newCon
= this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); }
          //為當前Transaction設置ConnectionHolder,並且設置newConnectionHolder為true txObject.setConnectionHolder(
new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection();        //這里主要是根據definition對connection進行一些設置 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); }
          //開啟事務,設置autoCommit為false con.setAutoCommit(
false); }
       //這里設置transactionActive為true,還記得簽名判斷是否存在的transaction吧?就是根據這個 txObject.getConnectionHolder().setTransactionActive(
true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the session holder to the thread. if (txObject.isNewConnectionHolder()) {
         //這里將當前的connection放入TransactionSynchronizationManager中持有,如果下次調用可以判斷為已有的事務 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } }
catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }

  TransactionSynchronizationManager中持有,記得前面doGetTransaction方法吧,如果同一個線程,再此進入執行的話就會獲取到同一個ConnectionHolder,在后面的isExistingTransaction方法也可以判定為是已有的transaction。

 

  

  接下來將執行prepareSynchronization方法,主要是對TransactionSynchronizationManager的一系列設置。然后將返回上層代碼執行prepareTransactionInfo方法

  TransactionAspectSupport.java

    protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {

        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // The transaction manager will flag an error if an incompatible tx already exists
            txInfo.newTransactionStatus(status);
        }
        else {
            // The TransactionInfo.hasTransaction() method will return
            // false. We created it only to preserve the integrity of
            // the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled())
                logger.trace("Don't need to create transaction for [" + joinpointIdentification +
                        "]: This method isn't transactional.");
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        txInfo.bindToThread();
        return txInfo;
    }

  這里其實比較簡單主要生成一個TransactionInfo並綁定到當前線程的ThreadLocal

        private void bindToThread() {
            // Expose current TransactionStatus, preserving any existing TransactionStatus
            // for restoration after this transaction is complete.
            this.oldTransactionInfo = transactionInfoHolder.get();
            transactionInfoHolder.set(this);
        }

  然后再返回到上層代碼,接着就是執行相應的邏輯代碼了

retVal = invocation.proceed();

  

  執行過程的finally代碼塊將執行cleanupTransactionInfo(txInfo);

    protected void cleanupTransactionInfo(TransactionInfo txInfo) {
        if (txInfo != null) {
       //這里就是將txInfo進行重置工作,讓它恢復到前一個狀態。 txInfo.restoreThreadLocalStatus(); } }

 

 

  然后就是提交操作(commitTransactionAfterReturning)或者是回滾操作(completeTransactionAfterThrowing)了。這里就拿提交操作來為例來說明,回滾操作類似:

    protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

  實際就是執行的processCommit方法

    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    status.releaseHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    doCommit(status);
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

  首先將執行一些提交前的准備工作,這里將進行是否有savepoint判斷status.hasSavepoint(),如果有的話將進行釋放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);

 

  接着就判斷是否是新的transaction:status.isNewTransaction(),如果是的話將執行 doCommit(status);

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
       //其實也就是調用了Connection的commit()方法。 con.commit(); }
catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }

 

 

最后無論成功與否都將調用finally塊中的cleanupAfterCompletion(status)

    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
       ////TransactionSynchronizationManager清理工作 TransactionSynchronizationManager.clear(); }
if (status.isNewTransaction()) {
       //這個比較重要(重點分析) doCleanupAfterCompletion(status.getTransaction()); }
if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } }

 

  首先對TransactionSynchronizationManager進行一系列清理工作,然后就將執行doCleanupAfterCompletion方法:

    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

        // Remove the connection holder from the thread, if exposed.
        if (txObject.isNewConnectionHolder()) {
       ////從TransactionSynchronizationManager中解綁相應的connectionHolder TransactionSynchronizationManager.unbindResource(
this.dataSource); } // Reset connection. Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) {
         //對獲取到的Connection進行一些還原 con.setAutoCommit(
true); } DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); }
////如果是newConnection將這個鏈接關閉,如果是連接池將還給連接池 DataSourceUtils.releaseConnection(con,
this.dataSource); }      //這里將這只transactionActive為false txObject.getConnectionHolder().clear(); }

  其實就是將TransactionSynchronizationManager中持有的connectionHolder釋放,並且還原當前Connection 的狀態,然后將對當前的transaction進行清理包括設置transactionActive為false等。

  至此整個spring的事務過程也就結束了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM