本文目標
- 理解Spring事務管理核心接口
- 理解Spring事務管理的核心邏輯
- 理解事務的傳播類型及其實現原理
版本
SpringBoot 2.3.3.RELEASE
什么是事務的傳播?
Spring 除了封裝了事務控制之外,還抽象出了 事務的傳播 這個概念,事務的傳播並不是關系型數據庫所定義的,而是Spring在封裝事務時做的增強擴展,可以通過@Transactional
指定事務的傳播,具體類型如下
事務傳播行為類型 | 說明 |
---|---|
PROPAGATION_REQUIRED |
如果當前沒有事務,就新建一個事務,如果已經存在一個事務中,加入到這個事務中。Spring的默認事務傳播類型 |
PROPAGATION_SUPPORTS |
支持當前事務,如果當前沒有事務,就以非事務方式執行。 |
PROPAGATION_MANDATORY |
使用當前的事務,如果當前沒有事務,就拋出異常。 |
PROPAGATION_REQUIRES_NEW |
新建事務,如果當前存在事務,把當前事務掛起(暫停)。 |
PROPAGATION_NOT_SUPPORTED |
以非事務方式執行操作,如果當前存在事務,就把當前事務掛起。 |
PROPAGATION_NEVER |
以非事務方式執行,如果當前存在事務,則拋出異常。 |
PROPAGATION_NESTED |
如果當前存在事務,則在嵌套事務內執行。如果當前沒有事務,則執行與PROPAGATION_REQUIRED類似的操作。 |
舉個栗子
以嵌套事務為例
@Service public class DemoServiceImpl implements DemoService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private DemoServiceImpl self; @Transactional @Override public void insertDB() { String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)"; jdbcTemplate.update(sql, uuid(), "taven"); try { // 內嵌事務將會回滾,而外部事務不會受到影響 self.nested(); } catch (Exception e) { e.printStackTrace(); } } @Transactional(propagation = Propagation.NESTED) @Override public void nested() { String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)"; jdbcTemplate.update(sql, uuid(), "nested"); throw new RuntimeException("rollback nested"); } private String uuid() { return UUID.randomUUID().toString(); } }
上述代碼中,nested()方法標記了事務傳播類型為嵌套,如果nested()
中拋出異常僅會回滾nested()
方法中的sql,不會影響到insertDB()
方法中已經執行的sql
注意:service 調用內部方法時,如果直接使用this調用,事務不會生效。因此使用this調用相當於跳過了外部的代理類,所以AOP不會生效,無法使用事務
思考
眾所周知,Spring 事務是通過AOP實現的,如果是我們自己寫一個AOP控制事務,該怎么做呢?
// 偽代碼 public Object invokeWithinTransaction() { // 開啟事務 connection.beginTransaction(); try { // 反射執行方法 Object result = invoke(); // 提交事務 connection.commit(); return result; } catch(Exception e) { // 發生異常時回滾 connection.rollback(); throw e; } }
在這個基礎上,我們來思考一下如果是我們自己做的話,事務的傳播該如何實現
以PROPAGATION_REQUIRED
為例,這個似乎很簡單,我們判斷一下當前是否有事務(可以考慮使用ThreadLocal存儲已存在的事務對象),如果有事務,那么就不開啟新的事務。反之,沒有事務,我們就創建新的事務
如果事務是由當前切面開啟的,則提交/回滾事務,反之不做處理
那么事務傳播中描述的掛起(暫停)當前事務,和內嵌事務是如何實現的?
源碼入手
要閱讀事務傳播相關的源碼,我們先來了解下Spring 事務管理的核心接口與類
-
TransactionDefinition
該接口定義了事務的所有屬性(隔離級別,傳播類型,超時時間等等),我們日常開發中經常使用的@Transactional
其實最終會被轉化為 TransactionDefinition -
TransactionStatus
事務的狀態,以最常用的實現 DefaultTransactionStatus 為例,該類存儲了當前的事務對象,savepoint,當前掛起的事務,是否完成,是否僅回滾等等 -
TransactionManager
這是一個空接口,直接繼承他的 interface 有 PlatformTransactionManager(我們平時用的就是這個,默認的實現類DataSourceTransactionManager)以及
ReactiveTransactionManager(響應式事務管理器,由於不是本文重點,我們不多說)
從上述兩個接口來看,TransactionManager 的主要作用
- 通過TransactionDefinition開啟一個事務,返回TransactionStatus
- 通過TransactionStatus 提交、回滾事務(實際開啟事務的Connection通常存儲在TransactionStatus中)
public interface PlatformTransactionManager extends TransactionManager { TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; void commit(TransactionStatus status) throws TransactionException; void rollback(TransactionStatus status) throws TransactionException; }
- TransactionInterceptor
事務攔截器,事務AOP的核心類(支持響應式事務,編程式事務,以及我們常用的標准事務),由於篇幅原因,本文只討論標准事務的相關實現
下面我們從事務邏輯的入口 TransactionInterceptor 入手,來看下Spring事務管理的核心邏輯以及事務傳播的實現
TransactionInterceptor
TransactionInterceptor 實現了MethodInvocation(這是實現AOP的一種方式),
其核心邏輯在父類TransactionAspectSupport 中,方法位置:TransactionInterceptor::invokeWithinTransaction
protected Object invokeWithinTransaction(Method method,
這里代碼很多,根據注釋的位置,我們可以把核心邏輯梳理出來
- 獲取當前事務屬性,事務管理器(以注解事務為例,這些都可以通過
@Transactional
來定義) createTransactionIfNecessary
,判斷是否有必要創建事務invocation.proceedWithInvocation
執行攔截器鏈,最終會執行到目標方法completeTransactionAfterThrowing
當拋出異常后,完成這個事務,提交或者回滾,並拋出這個異常commitTransactionAfterReturning
從方法命名來看,這個方法會提交事務。
但是深入源碼中會發現,該方法中也包含回滾邏輯,具體行為會根據當前TransactionStatus的一些狀態來決定(也就是說,我們也可以通過設置當前TransactionStatus,來控制事務回滾,並不一定只能通過拋出異常),詳見AbstractPlatformTransact ionManager::commit
我們繼續,來看看createTransactionIfNecessary做了什么
TransactionAspectSupport::createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 通過事務管理器開啟事務 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
createTransactionIfNecessary中的核心邏輯
- 通過PlatformTransactionManager(事務管理器)開啟事務
prepareTransactionInfo
准備事務信息,這個具體做了什么我們稍后再講
繼續來看PlatformTransactionManager::getTransaction
,該方法只有一個實現 AbstractPlatformTransactionManager::getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // Use defaults if no transaction definition given. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); // 獲取當前事務,該方法有繼承 AbstractPlatformTransactionManager 的子類自行實現 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // 如果目前存在事務 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } // Check definition settings for new transaction. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 傳播類型PROPAGATION_MANDATORY, 要求當前必須有事務 // No existing transaction found -> check propagation behavior to find out how to proceed. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事務時創建事務 else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 開啟事務 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
代碼很多,重點關注注釋部分即可
doGetTransaction
獲取當前事務- 如果存在事務,則調用
handleExistingTransaction
處理,這個我們稍后會講到
接下來,會根據事務的傳播決定是否開啟事務
- 如果事務傳播類型為
PROPAGATION_MANDATORY
,且不存在事務,則拋出異常 - 如果傳播類型為
PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED
,且當前不存在事務,則調用startTransaction
創建事務 - 當不滿足 3、4時,例如
PROPAGATION_NOT_SUPPORTED
,此時會執行事務同步,但是不會創建真正的事務
Spring 事務同步在之前一篇博客中有講到,傳送門👉https://www.jianshu.com/p/7880d9a98a5f
Spring 如何管理當前的事務
接下來講講上面提到的doGetTransaction
、handleExistingTransaction
,這兩個方法是由不同的TransactionManager自行實現的
我們以SpringBoot默認的TransactionManager,DataSourceTransactionManager為例
@Override protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; } @Override protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
結合 AbstractPlatformTransactionManager::getTransaction
一起來看,doGetTransaction
其實獲取的是當前的Connection。
判斷當前是否存在事務,是判斷DataSourceTransactionObject 對象中是否包含connection,以及connection是否開啟了事務。
我們繼續來看下TransactionSynchronizationManager.getResource(obtainDataSource())
獲取當前connection的邏輯
TransactionSynchronizationManager::getResource
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources"); @Nullable // TransactionSynchronizationManager::getResource public static Object getResource(Object key) { // DataSourceTransactionManager 調用該方法時,以數據源作為key // TransactionSynchronizationUtils::unwrapResourceIfNecessary 如果key為包裝類,則獲取被包裝的對象 // 我們可以忽略該邏輯 Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } /** * Actually check the value of the resource that is bound for the given key. */ @Nullable private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
看到這里,我們能明白DataSourceTransactionManager是如何管理線程之間的Connection,ThreadLocal 中存儲一個Map,key為數據源對象,value為該數據源在當前線程的Connection

DataSourceTransactionManager 在開啟事務后,會調用TransactionSynchronizationManager::bindResource
將指定數據源的Connection綁定到當前線程
AbstractPlatformTransactionManager::handleExistingTransaction
我們繼續回頭看,如果存在事務的情況,如何處理
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // 如果事務的傳播要求以非事務方式執行 拋出異常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // PROPAGATION_NOT_SUPPORTED 如果存在事務,則掛起當前事務,以非事務方式執行 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 掛起當前事務 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); // 構建一個無事務的TransactionStatus return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // PROPAGATION_REQUIRES_NEW 如果存在事務,則掛起當前事務,新建一個事務 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // PROPAGATION_NESTED 內嵌事務,就是我們開頭舉得例子 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 非JTA事務管理器都是通過savePoint實現的內嵌事務 // savePoint:關系型數據庫中事務可以創建還原點,並且可以回滾到還原點 if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 創建還原點 status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. return startTransaction(definition, transaction, debugEnabled, null); } } // 如果執行到這一步傳播類型一定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger