最近想提升,苦逼程序猿,想了想還是拿最熟悉,之前也一直想看但沒看的spring源碼來看吧,正好最近在弄事務這部分的東西,就看了下,同時寫下隨筆記錄下,以備后查。
spring tx源碼分析
這里只分析簡單事務也就是DataSourceTransactionManager
首先肯定找入口了,看過spring源碼的同學一定都會找spring tx的入口就是在TxAdviceBeanDefinitionParser這里將解析tx的配置,生成TransactionInterceptor對象,這個也就是一個普通的切面類,只要符合AOP規則的調用都會進入此切面。
在invoke方法中最重要的一段代碼:這里主要分析一個新的事務的開始過程
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);//獲取配置的TransactionAttribute信息 final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(invocation.getMethod(), targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//開啟一個新的事務 Object retVal = null; try { retVal = invocation.proceed();//原有邏輯執行 } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex);//發生異常時候對異常的處理 throw ex; } finally { cleanupTransactionInfo(txInfo);//清理TransactionInfo信息 } commitTransactionAfterReturning(txInfo);//提交事務 return retVal;
首先開啟事務,也就是調用createTransactionIfNecessary方法:
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
這里其實主要就是調用PlatformTransactionManager的getTransactionf方法來獲取TransactionStatus來開啟一個事務:
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); if (definition == null) { definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) {//這個判斷很重要,是否已經存在的一個transaction return handleExistingTransaction(definition, transaction, debugEnabled);//如果是存在的將進行一些處理 } if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED這三種類型將開啟一個新的事務 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition);//開啟新事物 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
這段代碼比較長也是比較核心的一段代碼,讓我們來慢慢分析,首先這里將執行doGetTransaction方法來獲取一個transaction
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); //這一行代碼中TransactionSynchronizationManager很重要,是對connection的核心獲取、持有、刪除等 txObject.setConnectionHolder(conHolder, false); //這里不論獲取到或者獲取不到都將此設置newConnectionHolder為false return txObject; }
這段代碼中主要是根據this.dataSource來獲取ConnectionHolder,這個ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次來獲取,肯定得到是null。
接着代碼往下將執行到isExistingTransaction(transaction),這里主要是依據下面代碼判斷:
txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()
如果是第一次開啟事務這里必然是false,否則將返回true。
我們這里先討論第一次進入的情況,也就是false的時候將繼續往下執行到了判斷事務Propagation的時候了,如果Propagation為:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一個將開啟一個新事物,new一個新的DefaultTransactionStatus ,並且
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection();//從dataSource中獲取一個Connection txObject.setConnectionHolder(new ConnectionHolder(newCon), true);//為當前Transaction設置ConnectionHolder,並且設置newConnectionHolder為true } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //這里主要是根據definition對connection進行一些設置 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (con.getAutoCommit()) {//開啟事務,設置autoCommit為false txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } //這里設置transactionActive為true,還記得簽名判斷是否存在的transaction吧?就是根據這個 txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { //這里將當前的connection放入TransactionSynchronizationManager中持有,如果下次調用可以判斷為已有的事務 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } }
這里其實主要就是從dataSource中獲取一個新的connection,形成一個ConnectionHolder,並且放入TransactionSynchronizationManager中持有,記得前面doGetTransaction方法吧,如果同一個線程,再此進入執行的話就會獲取到同一個ConnectionHolder,在后面的isExistingTransaction方法也可以判定為是已有的transaction。
接下來將執行prepareSynchronization方法,主要是對TransactionSynchronizationManager的一系列設置。
然后將返回上層代碼執行prepareTransactionInfo方法
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { txInfo.newTransactionStatus(status); } txInfo.bindToThread(); return txInfo; }
這里其實比較簡單主要生成一個TransactionInfo並綁定到當前線程的ThreadLocal
private void bindToThread() { this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); }
形成了一個鏈表,具體啥用我也暫時沒看到,唯一看到的就是通過TransactionAspectSupport.currentTransactionStatus()可以獲取當前的transaction狀態。
然后再返回到上層代碼,接着就是執行相應的邏輯代碼了
retVal = invocation.proceed();
執行過程的finally代碼塊將執行cleanupTransactionInfo(txInfo);
private void restoreThreadLocalStatus() { transactionInfoHolder.set(this.oldTransactionInfo); }
這里就是將txInfo進行重置工作,讓它恢復到前一個狀態。
然后就是提交操作(commitTransactionAfterReturning)或者是回滾操作(completeTransactionAfterThrowing)了。
這里就拿提交操作來為例來說明,回滾操作類似:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
實際就是執行的processCommit方法
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { doCommit(status); } if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
首先將執行一些提交前的准備工作,這里將進行是否有savepoint判斷status.hasSavepoint(),如果有的話將進行釋放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);
接着就判斷是否是新的transaction:status.isNewTransaction(),如果是的話將執行 doCommit(status);
protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); try { con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
其實也就是調用了Connection的commit()方法。
最后無論成功與否都將調用finally塊中的cleanupAfterCompletion(status)
private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear();//TransactionSynchronizationManager清理工作 } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction());//這個比較重要 } if (status.getSuspendedResources() != null) { resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); } }
首先對TransactionSynchronizationManager進行一系列清理工作,然后就將執行doCleanupAfterCompletion方法:
protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; if (txObject.isNewConnectionHolder()) { //從TransactionSynchronizationManager中解綁相應的connectionHolder TransactionSynchronizationManager.unbindResource(this.dataSource); } Connection con = txObject.getConnectionHolder().getConnection(); try { //對獲取到的Connection進行一些還原 if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); }//對獲取到的Connection進行一些還原 DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { } if (txObject.isNewConnectionHolder()) { //如果是newConnection將這個鏈接關閉,如果是連接池將還給連接池 DataSourceUtils.releaseConnection(con, this.dataSource); } //這里將這只transactionActive為false txObject.getConnectionHolder().clear(); }
其實就是將TransactionSynchronizationManager中持有的connectionHolder釋放,並且還原當前Connection 的狀態,然后將對當前的transaction進行清理包括設置transactionActive為false等。
至此整個spring的事務過程也就結束了。
兩篇比較好的關於事務的博客:
http://www.iteye.com/topic/78674
http://www.cnblogs.com/yangy608/archive/2011/06/29/2093478.html