spring事務源碼分析結合mybatis源碼(一)


最近想提升,苦逼程序猿,想了想還是拿最熟悉,之前也一直想看但沒看的spring源碼來看吧,正好最近在弄事務這部分的東西,就看了下,同時寫下隨筆記錄下,以備后查。

spring tx源碼分析

這里只分析簡單事務也就是DataSourceTransactionManager

首先肯定找入口了,看過spring源碼的同學一定都會找spring tx的入口就是在TxAdviceBeanDefinitionParser這里將解析tx的配置,生成TransactionInterceptor對象,這個也就是一個普通的切面類,只要符合AOP規則的調用都會進入此切面。

在invoke方法中最重要的一段代碼:這里主要分析一個新的事務的開始過程

 

    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    final TransactionAttribute txAttr =
            getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);//獲取配置的TransactionAttribute信息
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(invocation.getMethod(), targetClass);
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//開啟一個新的事務
        Object retVal = null;
        try {
            retVal = invocation.proceed();//原有邏輯執行
        }
        catch (Throwable ex) {
            completeTransactionAfterThrowing(txInfo, ex);//發生異常時候對異常的處理
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);//清理TransactionInfo信息
        }
        commitTransactionAfterReturning(txInfo);//提交事務
        return retVal;

 

首先開啟事務,也就是調用createTransactionIfNecessary方法:

protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                status = tm.getTransaction(txAttr);
            }
            else {
                }
            }
        }
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

這里其實主要就是調用PlatformTransactionManager的getTransactionf方法來獲取TransactionStatus來開啟一個事務:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        Object transaction = doGetTransaction();
        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }
        if (isExistingTransaction(transaction)) {//這個判斷很重要,是否已經存在的一個transaction
            return handleExistingTransaction(definition, transaction, debugEnabled);//如果是存在的將進行一些處理
        }
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED這三種類型將開啟一個新的事務
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);//開啟新事物
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

這段代碼比較長也是比較核心的一段代碼,讓我們來慢慢分析,首先這里將執行doGetTransaction方法來獲取一個transaction

protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        //這一行代碼中TransactionSynchronizationManager很重要,是對connection的核心獲取、持有、刪除等
        txObject.setConnectionHolder(conHolder, false);
        //這里不論獲取到或者獲取不到都將此設置newConnectionHolder為false
        return txObject;
    }

這段代碼中主要是根據this.dataSource來獲取ConnectionHolder,這個ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次來獲取,肯定得到是null。

接着代碼往下將執行到isExistingTransaction(transaction),這里主要是依據下面代碼判斷:

txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()

如果是第一次開啟事務這里必然是false,否則將返回true。

我們這里先討論第一次進入的情況,也就是false的時候將繼續往下執行到了判斷事務Propagation的時候了,如果Propagation為:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一個將開啟一個新事物,new一個新的DefaultTransactionStatus ,並且

newTransaction設置為true,這個狀態很重要,因為后面的不論回滾、提交都是根據這個屬性來判斷是否在這個TransactionStatus上來進行。
接着將執行doBegin方法:
protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
        try {
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();//從dataSource中獲取一個Connection
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);//為當前Transaction設置ConnectionHolder,並且設置newConnectionHolder為true
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //這里主要是根據definition對connection進行一些設置
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            if (con.getAutoCommit()) {//開啟事務,設置autoCommit為false
                txObject.setMustRestoreAutoCommit(true);
                con.setAutoCommit(false);
            }
            //這里設置transactionActive為true,還記得簽名判斷是否存在的transaction吧?就是根據這個
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            if (txObject.isNewConnectionHolder()) {
                //這里將當前的connection放入TransactionSynchronizationManager中持有,如果下次調用可以判斷為已有的事務
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }
    }

這里其實主要就是從dataSource中獲取一個新的connection,形成一個ConnectionHolder,並且放入TransactionSynchronizationManager中持有,記得前面doGetTransaction方法吧,如果同一個線程,再此進入執行的話就會獲取到同一個ConnectionHolder,在后面的isExistingTransaction方法也可以判定為是已有的transaction。

接下來將執行prepareSynchronization方法,主要是對TransactionSynchronizationManager的一系列設置。

然后將返回上層代碼執行prepareTransactionInfo方法

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            txInfo.newTransactionStatus(status);
        }
        txInfo.bindToThread();
        return txInfo;
    }

這里其實比較簡單主要生成一個TransactionInfo並綁定到當前線程的ThreadLocal

    private void bindToThread() {
            this.oldTransactionInfo = transactionInfoHolder.get();
            transactionInfoHolder.set(this);
        }

形成了一個鏈表,具體啥用我也暫時沒看到,唯一看到的就是通過TransactionAspectSupport.currentTransactionStatus()可以獲取當前的transaction狀態。

然后再返回到上層代碼,接着就是執行相應的邏輯代碼了

retVal = invocation.proceed();

執行過程的finally代碼塊將執行cleanupTransactionInfo(txInfo);

    private void restoreThreadLocalStatus() {
            transactionInfoHolder.set(this.oldTransactionInfo);
        }

這里就是將txInfo進行重置工作,讓它恢復到前一個狀態。

然后就是提交操作(commitTransactionAfterReturning)或者是回滾操作(completeTransactionAfterThrowing)了。

這里就拿提交操作來為例來說明,回滾操作類似:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

實際就是執行的processCommit方法

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    status.releaseHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    doCommit(status);
                }
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

首先將執行一些提交前的准備工作,這里將進行是否有savepoint判斷status.hasSavepoint(),如果有的話將進行釋放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);

接着就判斷是否是新的transaction:status.isNewTransaction(),如果是的話將執行 doCommit(status);

protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

其實也就是調用了Connection的commit()方法。

最后無論成功與否都將調用finally塊中的cleanupAfterCompletion(status)

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();//TransactionSynchronizationManager清理工作
        }
        if (status.isNewTransaction()) {
            doCleanupAfterCompletion(status.getTransaction());//這個比較重要
        }
        if (status.getSuspendedResources() != null) {
            resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }

首先對TransactionSynchronizationManager進行一系列清理工作,然后就將執行doCleanupAfterCompletion方法:

    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        if (txObject.isNewConnectionHolder()) {
            //從TransactionSynchronizationManager中解綁相應的connectionHolder
            TransactionSynchronizationManager.unbindResource(this.dataSource);
        }
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            //對獲取到的Connection進行一些還原
            if (txObject.isMustRestoreAutoCommit()) {
                con.setAutoCommit(true);
            }//對獲取到的Connection進行一些還原
            DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
        }
        catch (Throwable ex) {
        }
        if (txObject.isNewConnectionHolder()) {
            //如果是newConnection將這個鏈接關閉,如果是連接池將還給連接池
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }
        //這里將這只transactionActive為false
        txObject.getConnectionHolder().clear();
    }

其實就是將TransactionSynchronizationManager中持有的connectionHolder釋放,並且還原當前Connection 的狀態,然后將對當前的transaction進行清理包括設置transactionActive為false等。

至此整個spring的事務過程也就結束了。

兩篇比較好的關於事務的博客:

http://www.iteye.com/topic/78674

http://www.cnblogs.com/yangy608/archive/2011/06/29/2093478.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM