寫在前面:
- 先了解一下spring的事務。分為分明式事務管理和注解式事務管理,對於前期的事務,spring會通過掃描攔截對於事務的方法進行增強(以后講解)。
- 若果目標方法存在事務,spring產出的bean會是一個代理對象(cglib或者jdk)。
- 本問討論的是spring攔截到事務,對於事務的增強處理。
spring自己的一系列接口設計
-
- PlatformTransactionManager 事務管理器
- TransactionDefinition 事務定義
- TransactionStatus 事務狀態
TranctionInterceptor之前了解
- 看過spring源碼的同學一定都會找spring tx的入口就是在TxAdviceBeanDefinitionParser這里將解析tx的配置,生成TransactionInterceptor對象,這個也就是一個普通的切面類,只要符合AOP規則的調用都會進入此切面。
- ransactionInterceptor支撐着整個事務功能的架構,邏輯還是相對復雜的,那么現在我們切入正題來分析此攔截器是如何實現事務特性的。
- spring在處理事務的aop增強是,主要調用了return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
TranctionInterceptor
首先看TranctionInterceptor(位於spring-tx-*.jar中的org.springframework.transaction.interceptor)的結構:
繼承類TransactionAspectSupport:其實對其進行了增強(模板方法模式)
實現接口MethodInterceptor:方法攔截器,執行代理類的目標方法,會觸發invoke方法執行
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { @Override //實現了MethodInterceptor的invoke方法 public Object invoke(final MethodInvocation invocation) throws Throwable { //獲取目標類 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); //父類TransactionAspectSupport的模板方法 return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override //InvocationCallback接口的回調方法 public Object proceedWithInvocation() throws Throwable { //執行目標方法 return invocation.proceed(); } }); } }
重點分析 抽象類TransactionAspectSupport(基類)的invokeWithinTransaction方法
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { //protected修飾,不允許其他包和無關類調用 protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 獲取對應事務屬性.如果事務屬性為空(則目標方法不存在事務) final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); // 根據事務的屬性獲取beanFactory中的PlatformTransactionManager(spring事務管理器的頂級接口),一般這里或者的是DataSourceTransactiuonManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 目標方法唯一標識(類.方法,如service.UserServiceImpl.save) final String joinpointIdentification = methodIdentification(method, targetClass); //如果txAttr為空或者tm 屬於非CallbackPreferringPlatformTransactionManager,執行目標增強 ① if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //看是否有必要創建一個事務,根據事務傳播行為,做出相應的判斷 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { //回調方法執行,執行目標方法(原有的業務邏輯) retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 異常回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除信息 cleanupTransactionInfo(txInfo); } //提交事務 commitTransactionAfterReturning(txInfo); return retVal; } //編程式事務處理(CallbackPreferringPlatformTransactionManager) 不做重點分析 else { try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); // Check result: It might indicate a Throwable to rethrow. if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } } }
- ①不同的事務處理方式使用不同的邏輯。對於聲明式事務的處理與編程式事務的處理,第一點區別在於事務屬性上,因為編程式的事務處理是不需要有事務屬性的,第二點區別就是在TransactionManager上,CallbackPreferringPlatformTransactionManager實現PlatformTransactionManager接口,暴露出一個方法用於執行事務處理中的回調。所以,這兩種方式都可以用作事務處理方式的判斷。
重點分析createTransactionIfNecessary方法,它會判斷是否存在事務,根據事務的傳播屬性。做出不同的處理,也是做了一層包裝,核心是通過TransactionStatus來判斷事務的屬性。
通過持有的PlatformTransactionManager來獲取TransactionStatus
AbstractPlatformTransactionManager.java(spring中存在很多模板方法,對於 Abstract開頭的封裝的抽象類,基本都有模板方法,且為final修飾)
@Override public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//這里其實主要就是調用PlatformTransactionManager的getTransactionf方法來獲取TransactionStatus來開啟一個事務: Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //這個判斷很重要,是否已經存在的一個transaction if (isExistingTransaction(transaction)) {
//如果是存在的將進行一些處理 // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); }
//如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED這三種類型將開啟一個新的事務 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//開啟新事物 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
這段代碼比較長也是比較核心的一段代碼,讓我們來慢慢分析,首先這里將執行doGetTransaction方法來獲取一個transaction,和dobegin方法如何開啟一個事務
AbstractPlatformTransactionManager並沒有給出doGetTransaction的具體實現。而是由子類實現。
我們以分析實現類DataSourceTransactionManager的具體方法。
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { private DataSource dataSource; @Override
//這段代碼中主要是根據this.dataSource來獲取ConnectionHolder,這個ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次來獲取,肯定得到是null。 protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed());
//這一行代碼中TransactionSynchronizationManager很重要,是對connection的核心獲取、持有、刪除等 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
//這里不論獲取到或者獲取不到都將此設置newConnectionHolder為false txObject.setConnectionHolder(conHolder, false); return txObject; }
接着代碼往下將執行到isExistingTransaction(transaction),這里主要是依據下面代碼判斷:
@Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//如果是第一次開啟事務這里必然是false,否則將返回true。 return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); }
我們這里先討論第一次進入的情況,也就是false的時候將繼續往下執行到了判斷事務Propagation的時候了,如果Propagation為:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一個將開啟一個新事物,new一個新的DefaultTransactionStatus ,並且newTransaction設置為true,這個狀態很重要,因為后面的不論回滾、提交都是根據這個屬性來判斷是否在這個TransactionStatus上來進行。
接着將執行doBegin方法:
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//從dataSource中獲取一個Connection Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); }
//為當前Transaction設置ConnectionHolder,並且設置newConnectionHolder為true txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //這里主要是根據definition對connection進行一些設置 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); }
//開啟事務,設置autoCommit為false con.setAutoCommit(false); }
//這里設置transactionActive為true,還記得簽名判斷是否存在的transaction吧?就是根據這個 txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the session holder to the thread. if (txObject.isNewConnectionHolder()) {
//這里將當前的connection放入TransactionSynchronizationManager中持有,如果下次調用可以判斷為已有的事務 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
TransactionSynchronizationManager中持有,記得前面doGetTransaction方法吧,如果同一個線程,再此進入執行的話就會獲取到同一個ConnectionHolder,在后面的isExistingTransaction方法也可以判定為是已有的transaction。
接下來將執行prepareSynchronization方法,主要是對TransactionSynchronizationManager的一系列設置。然后將返回上層代碼執行prepareTransactionInfo方法
TransactionAspectSupport.java
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return // false. We created it only to preserve the integrity of // the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); return txInfo; }
這里其實比較簡單主要生成一個TransactionInfo並綁定到當前線程的ThreadLocal
private void bindToThread() { // Expose current TransactionStatus, preserving any existing TransactionStatus // for restoration after this transaction is complete. this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); }
然后再返回到上層代碼,接着就是執行相應的邏輯代碼了
retVal = invocation.proceed();
執行過程的finally代碼塊將執行cleanupTransactionInfo(txInfo);
protected void cleanupTransactionInfo(TransactionInfo txInfo) { if (txInfo != null) {
//這里就是將txInfo進行重置工作,讓它恢復到前一個狀態。 txInfo.restoreThreadLocalStatus(); } }
然后就是提交操作(commitTransactionAfterReturning)或者是回滾操作(completeTransactionAfterThrowing)了。這里就拿提交操作來為例來說明,回滾操作類似:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
實際就是執行的processCommit方法
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
首先將執行一些提交前的准備工作,這里將進行是否有savepoint判斷status.hasSavepoint(),如果有的話將進行釋放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);
接着就判斷是否是新的transaction:status.isNewTransaction(),如果是的話將執行 doCommit(status);
@Override protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try {
//其實也就是調用了Connection的commit()方法。 con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
最后無論成功與否都將調用finally塊中的cleanupAfterCompletion(status)
private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) {
////TransactionSynchronizationManager清理工作 TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) {
//這個比較重要(重點分析) doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } }
首先對TransactionSynchronizationManager進行一系列清理工作,然后就將執行doCleanupAfterCompletion方法:
@Override protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // Remove the connection holder from the thread, if exposed. if (txObject.isNewConnectionHolder()) {
////從TransactionSynchronizationManager中解綁相應的connectionHolder TransactionSynchronizationManager.unbindResource(this.dataSource); } // Reset connection. Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) {
//對獲取到的Connection進行一些還原 con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); }
////如果是newConnection將這個鏈接關閉,如果是連接池將還給連接池 DataSourceUtils.releaseConnection(con, this.dataSource); } //這里將這只transactionActive為false txObject.getConnectionHolder().clear(); }
其實就是將TransactionSynchronizationManager中持有的connectionHolder釋放,並且還原當前Connection 的狀態,然后將對當前的transaction進行清理包括設置transactionActive為false等。
至此整個spring的事務過程也就結束了。