spring--事務原理


Spring支持以下7種事務傳播行為。 

傳播行為

XML文件

propagation

含義

PROPAGATION_REQUIRED

 REQUIRED

表示當前方法必須在一個具有事務的上下文中運行。

如果當前沒有事務,就新建一個事務;如果已經存在一個事務,就加入到這個事務中。

如果被調用端發生調用端需要回滾的異常,那么調用端和被調用端事務都將回滾;如果被調用端異常不是調用端需要回滾的,那么調用端終止執行,已執行操作正常提交

PROPAGATION_SUPPORTS

 SUPPORTS

表示當前方法不必需要具有一個事務上下文。

如果當前沒有事務,就以非事務方式執行;但是如果有一個事務的話,它就加入到這個事務中運行。

PROPAGATION_MANDATORY

 MANDATORY

表示當前方法必須在一個事務中運行。

如果當前沒有事務,就拋出異常;如果有事務,就加入到這個事務中。

PROPAGATION_REQUIRES_NEW REQUIRES_NEW

表示當前方法必須運行在它自己的事務中,新建事務,如果當前存在事務,把當前事務掛起,直到新的事務提交或者回滾才恢復執行。

(如果被調用端拋出rollback異常,則被調用端回滾,如果同時是調用端的rollback異常,調用端同時回滾)

PROPAGATION_NOT_SUPPORTED

NOT_SUPPORTED

表示該方法不應該在一個事務中運行,應該以非事務方式執行,每句sql馬上提交。

如果當前存在事務,就把當前事務掛起。

(如果被調用端拋出調用端rollback異常,則調用端回滾)

PROPAGATION_NEVER

 NEVER

表示當前方法不應該在一個事務中運行,應該以非事務方式執行。

如果存在一個事務,則拋出異常

PROPAGATION_NESTED

 NESTED

如果當前沒有事務,則執行與PROPAGATION_REQUIRED類似的操作;如果當前有一個事務,則該方法應該運行在一個嵌套事務中,被嵌套的事務可以獨立於被封裝的事務中進行提交或者回滾。

(如果封裝事務存在,並且外層事務拋出異常回滾,那么內層事務必須回滾;反之,內層事務並不影響外層事務,如果內層拋出異常不需rollback,則忽略,如果內層拋出異常需要rollback,則回滾到外層的savepoint)

 

spring中配置事務時,往往這樣

<tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <tx:method name="do*" read-only="false" rollback-for="java.lang.Exception"/>
            <tx:method name="*" propagation="SUPPORTS" read-only="true"/>
        </tx:attributes>
</tx:advice>

<aop:config>
        <aop:pointcut id="pc" expression="execution(* ffm.web.service.*.*(..))"/>
        <aop:advisor pointcut-ref="pc" advice-ref="txAdvice"/>
</aop:config>

但是是怎么起作用的呢?

tx是TransactionNameSpace,對應的是handler是TxNamespaceHandler,<tx:advice />最終解析出一個以TransactionInerceptor為classname的beandefinition並且注冊這個bean,攔截方法如下:

public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class targetClass = invocation.getThis() != null?AopUtils.getTargetClass(invocation.getThis()):null;
        return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
        });
    }

1. 調用父類 TransactionAspectSupport的invokeWithinTransaction方法:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
//事務隔離級別、事務傳播策略、只讀、回滾等屬性信息
final TransactionAttribute txAttr = this.getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = this.determineTransactionManager(txAttr); final String joinpointIdentification = this.methodIdentification(method, targetClass); if(txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) { try { Object ex1 = ((CallbackPreferringPlatformTransactionManager)tm).execute(txAttr, new TransactionCallback() { public Object doInTransaction(TransactionStatus status) { TransactionAspectSupport.TransactionInfo txInfo = TransactionAspectSupport.this.prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); TransactionAspectSupport.ThrowableHolder var4; try { Object ex = invocation.proceedWithInvocation(); return ex; } catch (Throwable var8) { if(txAttr.rollbackOn(var8)) { if(var8 instanceof RuntimeException) { throw (RuntimeException)var8; } throw new TransactionAspectSupport.ThrowableHolderException(var8); } var4 = new TransactionAspectSupport.ThrowableHolder(var8); } finally { TransactionAspectSupport.this.cleanupTransactionInfo(txInfo); } return var4; } }); if(ex1 instanceof TransactionAspectSupport.ThrowableHolder) { throw ((TransactionAspectSupport.ThrowableHolder)ex1).getThrowable(); } else { return ex1; } } catch (TransactionAspectSupport.ThrowableHolderException var14) { throw var14.getCause(); } } else {
       // step 1.1 TransactionAspectSupport.TransactionInfo ex
= this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { retVal = invocation.proceedWithInvocation(); } catch (Throwable var15) { this.completeTransactionAfterThrowing(ex, var15); throw var15; } finally {
          // step 1.2
this.cleanupTransactionInfo(ex); } this.commitTransactionAfterReturning(ex); return retVal; } }

1.1 關鍵方法createTransactionIfNecessary

protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, final TransactionAttribute txAttr, final String joinpointIdentification) {
        if(txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
            txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;
        if(txAttr != null) {
            if(tm != null) {
          //step 1.1.1 調用org.springframework.jdbc.datasource.DataSourceTransactionManager status
= tm.getTransaction((TransactionDefinition)txAttr); } else if(this.logger.isDebugEnabled()) { this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } }      //step 1.1.2綁定到當前線程 return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status); }

1.1.1 調用AbstractPlatformTransactionManager的getTransaction

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//step 1.1.1.1 Object transaction
= this.doGetTransaction(); boolean debugEnabled = this.logger.isDebugEnabled(); if(definition == null) { definition = new DefaultTransactionDefinition(); }
//step 1.1.1.2
if(this.isExistingTransaction(transaction)) { return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled); } else if(((TransactionDefinition)definition).getTimeout() < -1) { throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout()); } else if(((TransactionDefinition)definition).getPropagationBehavior() == 2) { throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation \'mandatory\'"); } else if(((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) { boolean newSynchronization1 = this.getTransactionSynchronization() == 0; return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization1, debugEnabled, (Object)null); } else { AbstractPlatformTransactionManager.SuspendedResourcesHolder newSynchronization = this.suspend((Object)null); if(debugEnabled) { this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition); } try { boolean err = this.getTransactionSynchronization() != 2; DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, err, debugEnabled, newSynchronization); this.doBegin(transaction, (TransactionDefinition)definition); this.prepareSynchronization(status, (TransactionDefinition)definition); return status; } catch (RuntimeException var7) { this.resume((Object)null, newSynchronization); throw var7; } catch (Error var8) { this.resume((Object)null, newSynchronization); throw var8; } } }

1.1.1.1調用 DataSourceTransactionManager doGetTransaction方法

protected Object doGetTransaction() {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject(null);
        txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
        ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource);
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

生成一個DataSourceTransactionObject 事物對象,設置它的ConnectionHolder為當前線程上綁定的key=this.dataSource的ConnectionHolder(當然有可能為空)

1.1.1.2開始處理這個新的 DataSourceTransactionObject 事物對象

 DataSourceTransactionManager 的isExistingTransaction方法:

protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        return txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive();
    }

如果返回true,

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
        if(definition.getPropagationBehavior() == 5) {
            throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation \'never\'");
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder newSynchronization3;
            boolean isoConstants2;
            if(definition.getPropagationBehavior() == 4) {
                if(debugEnabled) {
                    this.logger.debug("Suspending current transaction");
                }

                newSynchronization3 = this.suspend(transaction);
                isoConstants2 = this.getTransactionSynchronization() == 0;
                return this.prepareTransactionStatus(definition, (Object)null, false, isoConstants2, debugEnabled, newSynchronization3);
            } else if(definition.getPropagationBehavior() == 3) {
                if(debugEnabled) {
                    this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
                }

                newSynchronization3 = this.suspend(transaction);

                try {
                    isoConstants2 = this.getTransactionSynchronization() != 2;
                    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, isoConstants2, debugEnabled, newSynchronization3);
                    this.doBegin(transaction, definition);
                    this.prepareSynchronization(status, definition);
                    return status;
                } catch (RuntimeException var7) {
                    this.resumeAfterBeginException(transaction, newSynchronization3, var7);
                    throw var7;
                } catch (Error var8) {
                    this.resumeAfterBeginException(transaction, newSynchronization3, var8);
                    throw var8;
                }
            } else {
                boolean newSynchronization1;
                if(definition.getPropagationBehavior() == 6) {
                    if(!this.isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify \'nestedTransactionAllowed\' property with value \'true\'");
                    } else {
                        if(debugEnabled) {
                            this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                        }

                        if(this.useSavepointForNestedTransaction()) {
                            DefaultTransactionStatus newSynchronization2 = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
                            newSynchronization2.createAndHoldSavepoint();
                            return newSynchronization2;
                        } else {
                            newSynchronization1 = this.getTransactionSynchronization() != 2;
                            DefaultTransactionStatus isoConstants1 = this.newTransactionStatus(definition, transaction, true, newSynchronization1, debugEnabled, (Object)null);
                            this.doBegin(transaction, definition);
                            this.prepareSynchronization(isoConstants1, definition);
                            return isoConstants1;
                        }
                    }
                } else {
                    if(debugEnabled) {
                        this.logger.debug("Participating in existing transaction");
                    }

                    if(this.isValidateExistingTransaction()) {
                        if(definition.getIsolationLevel() != -1) {
                            Integer newSynchronization = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                            if(newSynchronization == null || newSynchronization.intValue() != definition.getIsolationLevel()) {
                                Constants isoConstants = DefaultTransactionDefinition.constants;
                                throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (newSynchronization != null?isoConstants.toCode(newSynchronization, "ISOLATION_"):"(unknown)"));
                            }
                        }

                        if(!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                            throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
                        }
                    }

                    newSynchronization1 = this.getTransactionSynchronization() != 2;
                    return this.prepareTransactionStatus(definition, transaction, false, newSynchronization1, debugEnabled, (Object)null);
                }
            }
        }
    }

初始下這個返回false,走下面的判斷,根據傳播機制

最后一個else有個關鍵方法doBegin,DataSourceTransactionManager 實現了這個抽象方法

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        Connection con = null;

        try {
            if(txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection ex = this.dataSource.getConnection();
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("Acquired Connection [" + ex + "] for JDBC transaction");
                }

                txObject.setConnectionHolder(new ConnectionHolder(ex), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            Integer ex1 = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(ex1);
            if(con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if(this.logger.isDebugEnabled()) {
                    this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }

                con.setAutoCommit(false);
            }

            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = this.determineTimeout(definition);
            if(timeout != -1) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            if(txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
            }

        } catch (Throwable var7) {
            if(txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder((ConnectionHolder)null, false);
            }

            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
        }
    }

可以看出,這里給DataSourceTransactionObject 事物對象設置它的ConnectionHolder,真正打開連接激活事務,並把這個ConnectionHolder綁定在當前線程上(key=this.dataSource)

1.1.2 生成一個TransactionInfo,設置TransactionStatus為上一步1.1.1得到的TransactionStatus,並把自己綁定 到當前線程

protected TransactionAspectSupport.TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
        TransactionAspectSupport.TransactionInfo txInfo = new TransactionAspectSupport.TransactionInfo(tm, txAttr, joinpointIdentification);
        if(txAttr != null) {
            if(this.logger.isTraceEnabled()) {
                this.logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }

            txInfo.newTransactionStatus(status);
        } else if(this.logger.isTraceEnabled()) {
            this.logger.trace("Don\'t need to create transaction for [" + joinpointIdentification + "]: This method isn\'t transactional.");
        }

        txInfo.bindToThread();
        return txInfo;
    }
TransactionInfo的bindToThread方法:
private void bindToThread() {
            this.oldTransactionInfo = (TransactionAspectSupport.TransactionInfo)TransactionAspectSupport.transactionInfoHolder.get();
            TransactionAspectSupport.transactionInfoHolder.set(this);
        }

1.2 重置當前線程的TransactionInfo為oldTransactionInfo 

private void restoreThreadLocalStatus() {
TransactionAspectSupport.transactionInfoHolder.set(this.oldTransactionInfo);
}

以常用的propagation="SUPPORTS" / "REQUIRED"為例,分析跟蹤事務的過程

  • SUPPORTS->REQUIRED,`一路到1.1.1.2,isExistingTransaction返回false,進入
if(((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
            boolean newSynchronization1 = this.getTransactionSynchronization() == 0; return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization1, debugEnabled, (Object)null);

以非事務方式運行,接着調用REQUIRED的方法,直到1.1.1.2,isExistingTransaction返回false,進入最后一個else,打開新事務

  • REQUIRED->SUPPORTS,到1.1.1.2時,isExistingTransaction返回false,進入最后一個else,打開新事務。接着調用SUPPORTS的方法,到達1.1.1.2時發現有事務,進入handleExistingTransaction處理,最終加入已有事務

 

 

參考文章:

1. 揭開Spring事務處理

2. Spring中的事務控制學習中(轉)

3. Spring 事務管理高級應用難點剖析--轉

4. spring源碼分析之——spring 事務管理實現方式

5. Spring @Transactional 如何開啟事務

6. 詳解spring事務屬性

7. Spring事務處理的實現

8. spring事務傳播機制實例講解

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM