死磕Spring之AOP篇 - Spring事務詳解


該系列文章是本人在學習 Spring 的過程中總結下來的,里面涉及到相關源碼,可能對讀者不太友好,請結合我的源碼注釋 Spring 源碼分析 GitHub 地址 進行閱讀。

Spring 版本:5.1.14.RELEASE

在開始閱讀 Spring AOP 源碼之前,需要對 Spring IoC 有一定的了解,可查看我的 《死磕Spring之IoC篇 - 文章導讀》 這一系列文章

了解 AOP 相關術語,可先查看 《Spring AOP 常見面試題) 》 這篇文章

該系列其他文章請查看:《死磕 Spring 之 AOP 篇 - 文章導讀》

前序

前面我的一系列文章對 Spring AOP 進行了比較詳細的講述,相信對於 Spring AOP 你有一個比較深入的理解,如果你不是很了解,建議先查看我前面的這一系列文章,因為 Spring 事務是借助於 Spring AOP 實現的。由於這段時間有點忙(太懶了~),沒能及時更新 Spring AOP 在 Spring 內部的應用相關內容,趁着對它還有一點印象,我們一起來看看 Spring 事務的相關源碼。我猜應該是比較容易理解的,因為已經有了 Spring AOP 的基礎,相信對於 Spring 事務會“輕而易舉”地掌握~

我們先了解一下 Spring 事務里面的“物理事務”和“邏輯事務”,所謂的“物理事務”是指 JDBC 的事務,上一次事務和本次事務之間是沒有其他事務的,在執行一條命令(默認行為自動提交)都會產生一個事務,如果把 autocommit 設置為 false,需要主動 commit 才完成一個事務。所謂的“邏輯事務”是 Spring 對 JDBC 的一個抽象,例如 Spring 默認的事務傳播行為是 REQUIRED,當執行 @Transactional 注解標注的方法時,如果此時正處於一個事務(物理事務)中,那么加入到這個事務中,你可以理解為創建了一個“邏輯事務”,進行提交的時候不會執行 Connection 的 commit 方法,而是在外面的“物理事務”中進行 commit 時一並完成本次事務。

Spring 事務的傳播級別

  • REQUIRED:默認傳播級別,如果正處於一個事務中,則加入;否則,創建一個事務
  • SUPPORTS:如果正處於一個事務中,則加入;否則,不使用事務
  • MANDATORY:如果當前正處於一個事務中,則加入;否則,拋出異常
  • REQUIRES_NEW:無論如何都會創建一個新的事務,如果正處於一個事務中,會先掛起,然后創建
  • NOT_SUPPORTED:不使用事務,如果正處於一個事務中,則掛起,不使用事務
  • NEVER:不使用事務,如果正處於一個事務中,則拋出異常
  • NESTED:嵌套事務,如果正處於一個事務中,則創建一個事務嵌套在其中(MySQL 采用 SAVEPOINT 保護點實現的);否則,創建一個事務

關於 Spring 事務傳播級別更多的細節在接下來的源碼中進行討論

Spring 事務的使用示例

相信看到這篇文章的你對於 @Transactional 注解的使用肯定非常了解,不過這里還是列舉以下它的使用方式

Spring MVC

引入 Spring 事務相關依賴后,在 Spring MVC 中有兩種(XML 配置和注解)驅動 Spring 事務的方式,如下面所示:

方式一:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <!-- 定義一個數據源 -->
    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">...</bean>
    <!-- 事務管理器 -->
    <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <!-- 指定數據源 -->
        <property name="dataSource" ref="dataSource"/>
    </bean>
    <!-- 事務模塊驅動,指定使用上面定義事務管理器,默認值為 transactionManager -->
    <tx:annotation-driven transaction-manager="txManager"/>
</beans>

方式二:

需要在 Spring 能掃描的一個 Bean 上添加一個 @EnableTransactionManagement 注解,然后添加一個 TransactionManagementConfigurer 實現類,如下:

package tk.mybatis.simple.controller;

import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.TransactionManagementConfigurer;

import javax.annotation.Resource;
import javax.sql.DataSource;

@Configuration
@EnableTransactionManagement
public class TransactionManagerConfig implements TransactionManagementConfigurer {

    @Resource
    DataSource dataSource;

    @Override
    public TransactionManager annotationDrivenTransactionManager() {
        // 返回一個事務管理器,設置數據源
        return new DataSourceTransactionManager(dataSource);
    }
}

此時你可以使用 @Transactional 注解標注在方法(或者類)上面,使得方法的執行處於一個事務中,如下:

@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void update(){
    try {
        // 數據庫操作
    } catch (Exeception e){
        // 將事務狀態設置為回滾
        TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
    }
}

Spring Boot

在 Spring Boot 中我們使用 @Transactional 注解的時候好像不需要 @EnableTransactionManagement 注解驅動 Spring 事務模塊,這是為什么?和 Spring AOP 的 @EnableAspectJAutoProxy 注解類似,會有一個 TransactionAutoConfiguration 事務自動配置類,我們一起來看看:

@Configuration
@ConditionalOnClass(PlatformTransactionManager.class)
@AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
		DataSourceTransactionManagerAutoConfiguration.class,
		Neo4jDataAutoConfiguration.class })
@EnableConfigurationProperties(TransactionProperties.class)
public class TransactionAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public TransactionManagerCustomizers platformTransactionManagerCustomizers(
			ObjectProvider<List<PlatformTransactionManagerCustomizer<?>>> customizers) {
		return new TransactionManagerCustomizers(customizers.getIfAvailable());
	}

	@Configuration
	@ConditionalOnSingleCandidate(PlatformTransactionManager.class)
	public static class TransactionTemplateConfiguration {

		private final PlatformTransactionManager transactionManager;

		public TransactionTemplateConfiguration(
				PlatformTransactionManager transactionManager) {
			this.transactionManager = transactionManager;
		}

		@Bean
		@ConditionalOnMissingBean
		public TransactionTemplate transactionTemplate() {
			return new TransactionTemplate(this.transactionManager);
		}
	}

	@Configuration
	@ConditionalOnBean(PlatformTransactionManager.class)
	@ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
	public static class EnableTransactionManagementConfiguration {
		@Configuration
		@EnableTransactionManagement(proxyTargetClass = false)
		@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false", matchIfMissing = false)
		public static class JdkDynamicAutoProxyConfiguration { }

		@Configuration
		@EnableTransactionManagement(proxyTargetClass = true)
		@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true", matchIfMissing = true)
		public static class CglibAutoProxyConfiguration { }
	}
}

是不是很熟悉,只要存在 PlatformTransactionManager 這個 Class 對象就會將這個 Bean 注冊到 IoC 容器中,里面涉及到一些 @Conditional 注解,這里就不一一解釋了。可以看到其中會有 @EnableTransactionManagement 注解,是不是和在 Spring MVC 中以注解驅動 Spring 事務的方式一樣,但是好像沒有 PlatformTransactionManager 事務管理器。別急,我們看到這個自動配置類上面會有 @AutoConfigureAfter({DataSourceTransactionManagerAutoConfiguration.class}) 注解,表示會先加載 DataSourceTransactionManagerAutoConfiguration 這個自動配置類,我們一起來看看:

@Configuration
@ConditionalOnClass({ JdbcTemplate.class, PlatformTransactionManager.class })
@AutoConfigureOrder(Ordered.LOWEST_PRECEDENCE)
@EnableConfigurationProperties(DataSourceProperties.class)
public class DataSourceTransactionManagerAutoConfiguration {

	@Configuration
	@ConditionalOnSingleCandidate(DataSource.class)
	static class DataSourceTransactionManagerConfiguration {

		private final DataSource dataSource;

		private final TransactionManagerCustomizers transactionManagerCustomizers;

		DataSourceTransactionManagerConfiguration(DataSource dataSource,
				ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
			this.dataSource = dataSource;
			this.transactionManagerCustomizers = transactionManagerCustomizers
					.getIfAvailable();
		}

		@Bean
		@ConditionalOnMissingBean(PlatformTransactionManager.class)
		public DataSourceTransactionManager transactionManager(
				DataSourceProperties properties) {
			DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(
					this.dataSource);
			if (this.transactionManagerCustomizers != null) {
				this.transactionManagerCustomizers.customize(transactionManager);
			}
			return transactionManager;
		}
	}
}

可以看到會注入一個 DataSourceTransactionManager 事務管理器,關聯這個當前 DataSource 數據源對象

好了,通過上面的使用示例我們可以注意到 @EnableTransactionManagement 注解可以驅動整個 Spring 事務模塊,當然,<annotation-driven /> 標簽的原理和注解差不多,前面也講述了非常多 Spring 自定義標簽的實現原理,這里我們就不分析了,那么我們一起來看看 @EnableTransactionManagement 這個注解

核心 API

在開始查看 Spring 事務的源碼之前,我想有必要先簡單介紹一下涉及到的一些主要的 API,對 Spring 事務的源碼有一個印象,如下:

  • Spring 事務 @Enable 模塊驅動 - @EnableTransactionManagement
  • Spring 事務注解 - @Transactional
  • Spring 事務事件監聽器 - @TransactionalEventListener
  • Spring 事務定義 - TransactionDefinition
  • Spring 事務狀態 - TransactionStatus
  • Spring 平台事務管理器 - PlatformTransactionManager
  • Spring 事務代理配置 - ProxyTransactionManagementConfiguration
  • Spring 事務 PointAdvisor 實現 - BeanFactoryTransactionAttributeSourceAdvisor
  • Spring 事務 MethodInterceptor 實現 - TransactionInterceptor
  • Spring 事務屬性源 - TransactionAttributeSource

簡單介紹 Spring 事務:

  1. 需要通過 @EnableTransactionManagement 注解驅動整個 Spring 事務模塊

  2. 可以通過 @Transactional 注解定義在某個類或者方法上面定義一個事務(傳播性、隔離性等),開啟事務

  3. ProxyTransactionManagementConfiguration 代理配置類用來定義一個 BeanFactoryTransactionAttributeSourceAdvisor 切面,是一個用於 Spring 事務的 AOP 切面

  4. Spring 事務底層就是通過 Spring AOP 實現的,可以在上面看到有一個 PointcutAdvisor 切面,關聯的 Pointcut 內部有一個 TransactionAttributeSource 對象,會借助於 TransactionAnnotationParser 解析器解析 @Transactional 注解,將這個事務定義的一些屬性封裝成一個 TransactionDefinition 事務定義對象

  5. Spring AOP 攔截處理在 TransactionInterceptor 事務攔截器中,先借助 PlatformTransactionManager 平台事務管理器創建 TransactionStatus 事務對象,里面包含了 Transaction 事務,將 autocommit 自動提交關閉,方法的執行也就處於一個事務中

  6. 事務的相關屬性會保存在許多 ThreadLocal 中,例如 DataSource、Connection 和 SqlSession 等屬性,交由一個 TransactionSynchronizationManager 事務同步管理器進行管理,所以說 Spring 事務僅支持在一個線程中完成

Spring 事務非常復雜,接下來我們逐步分析

@EnableTransactionManagement 注解驅動

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {

	/**
	 * 默認優先使用 JDK 動態代理
	 */
	boolean proxyTargetClass() default false;

    /**
     * 默認使用 Spring AOP 代理模式
     */
	AdviceMode mode() default AdviceMode.PROXY;

	int order() default Ordered.LOWEST_PRECEDENCE;
}

可以看到有一個 @Import 注解,它的值是一個 TransactionManagementConfigurationSelector 類,也就是說 Spring 事務的驅動入口在這里面,關於 @Import 注解的原理可查看我的 《死磕Spring之IoC篇 - @Bean 等注解的實現原理》 這篇文章

TransactionManagementConfigurationSelector

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {

	@Override
	protected String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			// 默認使用代理模式
			case PROXY:
				return new String[]{
						// 注冊一個 InfrastructureAdvisorAutoProxyCreator 對象,目的是創建代理對象
						AutoProxyRegistrar.class.getName(),
						// 【關鍵】注冊一個 Spring 事務代理配置類
						ProxyTransactionManagementConfiguration.class.getName()};
			// 選擇 AspectJ 模式
			case ASPECTJ:
				return new String[]{determineTransactionAspectClass()};
			default:
				return null;
		}
	}

	private String determineTransactionAspectClass() {
		return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
				TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
				TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
	}
}

可以看到默認情況下會注冊兩個 Bean

  • AutoProxyRegistrar,注冊一個 InfrastructureAdvisorAutoProxyCreator 對象,目的是創建代理對象,在講解 Spring AOP 的時候講述過,這里不再贅述
  • ProxyTransactionManagementConfiguration,一個 Spring 務代理配置類

ProxyTransactionManagementConfiguration

org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration,Spring 事務代理配置類,定義好一個 AOP 切面

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

	@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
		// <1> 創建 PointcutAdvisor 對象,作為 @Transactional 注解的一個切面
 		BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
		// <2> 【Pointcut】設置 AnnotationTransactionAttributeSource,被關聯在 Pointcut 中
		// 借助於 TransactionAnnotationParser 解析器解析 @Transactional 注解
		advisor.setTransactionAttributeSource(transactionAttributeSource());
		// <3> 【Advice】設置 Advice 為 TransactionInterceptor 事務攔截器
		advisor.setAdvice(transactionInterceptor());
		if (this.enableTx != null) {
			advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
		}
		return advisor;
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionAttributeSource transactionAttributeSource() {
		return new AnnotationTransactionAttributeSource();
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionInterceptor transactionInterceptor() {
		// 創建 TransactionInterceptor 事務攔截器(MethodInterceptor 對象)
		TransactionInterceptor interceptor = new TransactionInterceptor();
		// 設置這個 AnnotationTransactionAttributeSource 對象,@Bean 注解標注的方法返回的都是同一個對象
		interceptor.setTransactionAttributeSource(transactionAttributeSource());
		if (this.txManager != null) {
			// 設置默認的事務管理器
			interceptor.setTransactionManager(this.txManager);
		}
		return interceptor;
	}
}

可以看到會注冊三個 Bean:

  1. BeanFactoryTransactionAttributeSourceAdvisor 切面,這個 PointcutAdvisor 對象關聯的 Pointcut 切點用於篩選 @Transactional 注解的方法(標注在類上也可以),在關聯的 Advice 中會進行事務的攔截處理

  2. Advice 通知,就是一個 TransactionInterceptor 方法攔截器,關聯着一個 AnnotationTransactionAttributeSource 對象

  3. AnnotationTransactionAttributeSource 事務屬性資源對象,被 Pointcut 和 Advice 關聯,用於解析 @Transactional 注解,在它的構造方法中會添加一個 SpringTransactionAnnotationParser 事務注解解析器,用於解析 @Transactional 注解,如下:

    // AnnotationTransactionAttributeSource.java
    public AnnotationTransactionAttributeSource() {
        this(true);
    }
    
    public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
        this.publicMethodsOnly = publicMethodsOnly;
        if (jta12Present || ejb3Present) {
            this.annotationParsers = new LinkedHashSet<>(4);
            this.annotationParsers.add(new SpringTransactionAnnotationParser());
            if (jta12Present) {
                this.annotationParsers.add(new JtaTransactionAnnotationParser());
            }
            if (ejb3Present) {
                this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
            }
        }
        else {
            this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
        }
    }
    

PointcutAdvisor 事務切面

org.springframework.transaction.interceptor.BeanFactoryTransactionAttributeSourceAdvisor,Spring 事務切面,如下:

public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

	/**
	 * 事務屬性源對象,用於解析 @Transactional 注解
	 */
	@Nullable
	private TransactionAttributeSource transactionAttributeSource;

	/**
	 * Pointcut 對象,用於判斷 JoinPoint 是否匹配
	 */
	private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
		@Override
		@Nullable
		protected TransactionAttributeSource getTransactionAttributeSource() {
			return transactionAttributeSource;
		}
	};
    
	public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
		this.transactionAttributeSource = transactionAttributeSource;
	}

	public void setClassFilter(ClassFilter classFilter) {
		this.pointcut.setClassFilter(classFilter);
	}

	@Override
	public Pointcut getPointcut() {
		return this.pointcut;
	}
}

設置的 TransactionAttributeSource 就是上面的 AnnotationTransactionAttributeSource 對象,關聯的 Pointcut 切點就是一個 TransactionAttributeSourcePointcut 對象

也就是說通過 Pointcut 事務切點篩選出來的 Bean 會創建一個代理對象,方法的攔截處理則交由 Advice 完成

Pointcut 事務切點

org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut,Spring 事務的 AOP 切點,如下:

abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {

	@Override
	public boolean matches(Method method, Class<?> targetClass) {
		// <1> 目標類是 Spring 內部的事務相關類,則跳過,不需要創建代理對象
		if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
				PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
				PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
			return false;
		}
		// <2 獲取 AnnotationTransactionAttributeSource 對象
		TransactionAttributeSource tas = getTransactionAttributeSource();
		// <3> 解析該方法相應的 @Transactional 注解,並將元信息封裝成一個 TransactionAttribute 對象
		// 且緩存至 AnnotationTransactionAttributeSource 對象中
		// <4> 如果有對應的 TransactionAttribute 對象,則表示匹配,需要進行事務的攔截處理
		return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
	}
}

判斷這個方法是否需要被 TransactionInterceptor 事務攔截器進行攔截的過程如下:

  1. 目標類是 Spring 內部的事務相關類,則跳過,不需要創建代理對象
  2. 獲取 AnnotationTransactionAttributeSource 對象
  3. 解析該方法相應的 @Transactional 注解,並將元信息封裝成一個 TransactionAttribute 對象,且緩存至 AnnotationTransactionAttributeSource 對象中
  4. 如果有對應的 TransactionAttribute 對象,則表示匹配,需要進行事務的攔截處理

3 步解析 @Transactional 注解通過 AnnotationTransactionAttributeSource#getTransactionAttribute(..) 方法完成的,我們一起來看看這個解析過程

@Transactional 注解的解析

1. getTransactionAttribute 方法
// AbstractFallbackTransactionAttributeSource.java
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
    // <1> java.lang.Object 內定義的方法跳過
    if (method.getDeclaringClass() == Object.class) {
        return null;
    }
    // First, see if we have a cached value.
    // <2> 獲取緩存 Key,MethodClassKey 對象,關聯 Method 和 Class 對象
    Object cacheKey = getCacheKey(method, targetClass);
    // <3> 嘗試從緩存中獲取該方法對應的 TransactionAttribute 對象
    TransactionAttribute cached = this.attributeCache.get(cacheKey);
    if (cached != null) {
        // Value will either be canonical value indicating there is no transaction attribute,
        // or an actual transaction attribute.
        // <3.1> 緩存中緩存的是一個空的 TransactionAttribute 對象
        // 表示沒有相應的 @Transactional 注解,返回 `null`
        if (cached == NULL_TRANSACTION_ATTRIBUTE) {
            return null;
        }
        // <3.2> 返回緩存的 TransactionAttribute 對象
        else {
            return cached;
        }
    }
    // <4> 開始解析方法對應的 @Transactional 注解
    else {
        // We need to work it out.
        // <4.1> 解析該方法或者類上面的 @Transactional 注解,封裝成 RuleBasedTransactionAttribute 對象
        // 優先從方法上面解析該注解,其次從類上解析該注解,沒有的話返回的是 `null`
        TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
        // Put it in the cache.
        // <4.2> 如果是 `null`,則緩存一個空的 TransactionAttribute 對象
        if (txAttr == null) {
            this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
        }
        // <4.3> 否則,將該 TransactionAttribute 對象緩存
        else {
            String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
            if (txAttr instanceof DefaultTransactionAttribute) {
                // 設置方法的描述符(類名.方法名)
                ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
            }
            this.attributeCache.put(cacheKey, txAttr);
        }
        // <4.4> 返回這個 TransactionAttribute 對象
        return txAttr;
    }
}

解析過程如下:

  1. Object 內定義的方法跳過
  2. 獲取緩存 Key,MethodClassKey 對象,關聯 Method 和 Class 對象
  3. 嘗試從緩存中獲取該方法對應的 TransactionAttribute 對象,如果有的話
    1. 如果緩存中緩存的是一個“空”的 TransactionAttribute 對象,表示沒有相應的 @Transactional 注解,返回 null
    2. 否則,返回緩存的 TransactionAttribute 對象
  4. 否則,開始解析方法對應的 @Transactional 注解
    1. 解析該方法或者類上面的 @Transactional 注解,封裝成 RuleBasedTransactionAttribute 對象,優先從方法上面解析該注解,其次從類上解析該注解,沒有的話返回的是 null
    2. 如果是 null,則緩存一個“空”的 TransactionAttribute 對象
    3. 否則,將該 TransactionAttribute 對象緩存
    4. 返回這個 TransactionAttribute 對象

注意,這里解析出來的 TransactionAttribute 會進行緩存,后續在 TransactionInterceptor(Advice)中無需解析,直接取緩存即可

上面第 4.1 步調用 computeTransactionAttribute(..) 方法解析 @Transactional 注解,如下:

2. computeTransactionAttribute 方法
// AbstractFallbackTransactionAttributeSource.java
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
    // Don't allow no-public methods as required.
    // 如果不允許非 public 修飾的方法(默認允許),則判斷是否為 public 修飾,不是的話返回 null
    if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
        return null;
    }

    // The method may be on an interface, but we need attributes from the target class.
    // If the target class is null, the method will be unchanged.
    // 獲取方法對象(而不是橋接方法)
    Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

    // First try is the method in the target class.
    // 通過 SpringTransactionAnnotationParser 解析方法上面的 @Transactional 注解
    // 並將注解的元信息封裝到 RuleBasedTransactionAttribute 中,沒有注解的話返回 null
    TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
    if (txAttr != null) {
        // 存在的話直接返回
        return txAttr;
    }

    // Second try is the transaction attribute on the target class.
    // 如果方法上面沒有,則嘗試解析類上面的 @Transactional 注解
    txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
    // 存在這個注解,且方法是用戶級別(不是 Spring 內部合成的)
    if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
        return txAttr;
    }

    // 如果還沒有找到 @Transactional 注解的元信息,則嘗試從原 Method 對象中查找
    if (specificMethod != method) {
        // Fallback is to look at the original method.
        // 處理過程如上
        txAttr = findTransactionAttribute(method);
        if (txAttr != null) {
            return txAttr;
        }
        // Last fallback is the class of the original method.
        // 處理過程如上
        txAttr = findTransactionAttribute(method.getDeclaringClass());
        if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
            return txAttr;
        }
    }

    return null;
}

可以看到,默認情況是只支持 public 修飾的方法,對於方法和類上面的 @Transactional 注解都是支持的,優先從方法上面解析,其次從所在類上面解析,處理過程都在 findTransactionAttribute(..) 方法中

3. findTransactionAttribute 方法
// AnnotationTransactionAttributeSource.java
@Override
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
    // 解析類上面的 @Transactional 注解,並將注解的元信息封裝到 RuleBasedTransactionAttribute 中
    return determineTransactionAttribute(clazz);
}

@Override
protected TransactionAttribute findTransactionAttribute(Method method) {
    // 解析方法上面的 @Transactional 注解,並將注解的元信息封裝到 RuleBasedTransactionAttribute 中
    return determineTransactionAttribute(method);
}

protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
    for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
        // 通過 SpringTransactionAnnotationParser 解析 @Transactional 注解
        TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
        if (attr != null) {
            return attr;
        }
    }
    return null;
}

通過 SpringTransactionAnnotationParser 解析器進行方法或者類上面的 @Transactional 注解

4. SpringTransactionAnnotationParser
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {

	@Override
	@Nullable
	public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
		// 找到這個方法的 @Transactional 注解
		AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
				element, Transactional.class, false, false);
		if (attributes != null) {
			// 將 @Transactional 注解的元信息封裝到 RuleBasedTransactionAttribute 對象中
			return parseTransactionAnnotation(attributes);
		}
		else {
			return null;
		}
	}

	public TransactionAttribute parseTransactionAnnotation(Transactional ann) {
		return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false));
	}

	protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
		RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
		// 事務傳播性
		Propagation propagation = attributes.getEnum("propagation");
		rbta.setPropagationBehavior(propagation.value());
		// 事務隔離級別
		Isolation isolation = attributes.getEnum("isolation");
		rbta.setIsolationLevel(isolation.value());
		// 超時時間
		rbta.setTimeout(attributes.getNumber("timeout").intValue());
		// 是否只讀
		rbta.setReadOnly(attributes.getBoolean("readOnly"));
		// 指定事務管理器
		rbta.setQualifier(attributes.getString("value"));

		List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
		// 設置接收到哪些 Class 對象(異常)需要回滾
		for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
			rollbackRules.add(new RollbackRuleAttribute(rbRule));
		}
		for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
			rollbackRules.add(new RollbackRuleAttribute(rbRule));
		}
		// 設置接收到哪些 Class 對象(異常)不需要回滾
		for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
		}
		for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
		}
		rbta.setRollbackRules(rollbackRules);

		return rbta;
	}
}

解析過程比較簡單,就是將 @Transactional 注解解析成 RuleBasedTransactionAttribute 對象(實現了 TransactionDefinition 接口),設置相關屬性,不存在這個注解的話返回 null

小節

在這個 PointcutAdvisor 切面關聯着一個 Pointcut 切點,為 TransactionAttributeSourcePointcut 對象,內部有一個 AnnotationTransactionAttributeSource 事務屬性資源對象。在這個切點判斷某個方法是否需要進行事務處理時,通過內部的 AnnotationTransactionAttributeSource 對象解析 @Transactional 注解(沒有的話表示不匹配),解析過程需要借助於 SpringTransactionAnnotationParser 解析器解析 @Transactional 注解,將這個事務定義的一些屬性封裝成一個 RuleBasedTransactionAttribute 事務定義對象(實現了 TransactionDefinition 接口),並緩存

TransactionInterceptor 事務攔截處理

通過 Pointcut 事務切點篩選出來的 Bean,會創建一個代理對象,Bean 內部肯定定義了 @Transactional 注解,如果是類上定義的 @Transactional 注解,每個方法都需要進行事務處理。代理對象的事務攔截處理在 TransactionInterceptor 攔截器中,實現了 MethodInterceptor 方法攔截器,也就是實現了 Object invoke(MethodInvocation invocation) 這個方法,一起來看看 TransactionInterceptor 這個類

結構類圖:

// TransactionInterceptor.java
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
    // 目標類
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    // 在事務中執行方法調用器
    return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

調用 invokeWithinTransaction(..) 方法,在事務中執行方法調用器,如下:

// TransactionAspectSupport.java
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // <1> 獲取 `@Transactional` 注解對應的 TransactionAttribute 對象(如果在 AnnotationTransactionAttributeSource 解析過則取緩存)
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // <2> 獲取 PlatformTransactionManager 事務管理器(可以指定,沒有指定則獲取默認的)
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // <3> 獲取方法的唯一標識,默認都是 `類名.方法名`
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    // <4> 如果已有 `@Transactional` 注解對應的 TransactionAttribute 對象,或者是一個非回調偏向的事務管理器(默認不是)
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // <4.1> 創建 TransactionInfo 事務信息對象,綁定在 ThreadLocal 中
        // 包含一個 DefaultTransactionStatus 事務狀態對象
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // <4.2> 繼續執行方法調用器
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // <4.3> 如果捕獲到異常,則在這里完成事務,進行回滾或者提交
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            // <4.4> `finally` 語句塊,釋放 ThreadLocal 中的 TransactionInfo 對象,設置為上一個事務信息對象(沒有的話為空)
            cleanupTransactionInfo(txInfo);
        }
        // <4.5> 正常情況,到這里完成事務
        commitTransactionAfterReturning(txInfo);
        // <4.6> 返回執行結果
        return retVal;
    }
    // <5> 否則,就是支持回調的事務管理器,編程式事務(回調偏向),暫時忽略
    else {
        // .....
    }
}

整個過程有點復雜,我們一步一步來看

  1. 獲取 @Transactional 注解對應的 TransactionAttribute 對象(如果在 AnnotationTransactionAttributeSource 解析過則取緩存),在 Pointcut 事務切點中已經分析過
  2. 獲取 PlatformTransactionManager 事務管理器,需要指定,在 Spring Boot 中默認為 DataSourceTransactionManager
  3. 獲取方法的唯一標識,默認都是 類名.方法名
  4. 如果已有 @Transactional 注解對應的 TransactionAttribute 對象,或者不是一個回調偏向的事務管理器(默認不是)
    1. 調用 createTransactionIfNecessary(..) 方法,創建 TransactionInfo 事務信息對象(包含一個 DefaultTransactionStatus 事務狀態對象),綁定在 ThreadLocal 中
    2. 繼續執行方法調用器(執行方法)
    3. 如果捕獲到異常,則在這里完成事務,進行回滾或者提交,調用 completeTransactionAfterThrowing(..) 方法
    4. finally 語句塊,釋放 ThreadLocal 中的 TransactionInfo 對象,設置為上一個事務信息對象(沒有的話為空)
    5. 正常情況,到這里完成事務,調用 commitTransactionAfterReturning(..) 方法
    6. 返回執行結果
  5. 否則,就是支持回調的事務管理器,編程式事務(回調偏向),暫時忽略

整個過程的主流程不復雜,我們可以看到上面的第 4 步,可以把這一步分為四個步驟:

  1. 為本地方法的執行創建一個事務,過程比較復雜,可以先理解為需要把 Connection 連接的 autocommit 關閉,然后根據 @Transactional 注解的屬性進行相關設置,例如根據事務的傳播級別判斷是否需要創建一個新的事務
  2. 事務准備好了,那么繼續執行方法調用器,也就是方法的執行
  3. 捕獲到異常,進行回滾,或者提交(異常類型不匹配)
  4. 正常情況,走到這里就完成事務,調用 Connection 的 commit() 方法完成本次事務(不是一定會執行,因為可能是“嵌套事務”或者“邏輯事務”等情況)

接下來,我們一起來看看 Spring 是如何創建一個事務的

1. 創建事務

createTransactionIfNecessary(..) 方法,創建一個事務(如果有必要的話),如下:

// TransactionAspectSupport.java
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // <1> 沒有設置事務名稱,則封裝成一個 DelegatingTransactionAttribute 委托對象,支持返回一個事務名稱(類名.方法名)
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }
    // <2> 獲取一個 TransactionStatus 對象
    TransactionStatus status = null;
    if (txAttr != null) {
        // <2.1> 如果存在事務管理器
        if (tm != null) {
            // 從事務管理器中獲取一個 TransactionStatus 事務狀態對象(對事務的封裝),該對象包含以下信息:
            // TransactionDefinition 事務定義、DataSourceTransactionObject 數據源事務對象(包括 DataSource 和 Connection)、
            // 是否是一個新的事務、是否是一個新的事務同步器、被掛起的事務資源對象
            status = tm.getTransaction(txAttr);
        }
        // <2.2> 否則,跳過
        else { }
    }
    // <3> 創建一個 TransactionInfo 事務信息對象,並綁定到 ThreadLocal 中
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

過程如下:

  1. 如果沒有設置事務名稱,則封裝成一個 DelegatingTransactionAttribute 委托對象,支持返回一個事務名稱(類名.方法名

  2. 獲取一個 TransactionStatus 對象(對事務的封裝)

    1. 如果存在事務管理器,Spring Boot 中默認為 DataSourceTransactionManager,則通過事務管理器根據 @Transactional 注解獲取一個 TransactionStatus 事務狀態對象,該對象是對事務的封裝,包含了以下信息:
      • TransactionDefinition 事務定義
      • DataSourceTransactionObject 數據源事務對象(包括 DataSource 和 Connection)
      • 是否是一個新的事務
      • 是否是一個新的事務同步器
      • 被掛起的事務資源對象(如果有)
    2. 否則,跳過
  3. 創建一個 TransactionInfo 事務信息對象,並綁定到 ThreadLocal 中,如下:

    // TransactionAspectSupport.java
    protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, String joinpointIdentification,
            @Nullable TransactionStatus status) {
    
        // <1> 創建一個 TransactionInfo 對象
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        // <2> 如果有 @Transactional 注解的元信息
        if (txAttr != null) {
            // 設置 DefaultTransactionStatus 事務狀態對象
            txInfo.newTransactionStatus(status);
        }
        else { }
    
        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        // <3> 將當前 TransactionInfo 對象保存至 ThreadLocal 中
        txInfo.bindToThread();
        // <4> 返回這個 TransactionInfo 對象
        return txInfo;
    }
    

    可以看到,即使沒有創建事務,也會創建一個 TransactionInfo 對象,並綁定到 ThreadLocal 中

我們繼續看到在上面第 2PlatformTransactionManager 事務管理器是如何創建一個 Spring 事務的

1.1 getTransaction 方法

PlatformTransactionManager 事務管理器接口的類圖:

該接口就定義了三個方法,如下:

public interface PlatformTransactionManager {

	TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException;
			
	void commit(TransactionStatus status) throws TransactionException;


	void rollback(TransactionStatus status) throws TransactionException;
}

三個方法分別對應創建事務,提交,回滾三個操作,關於 Spring 事務也就這三個核心步驟了,我們先來看看 getTransaction(..) 這個方法的實現,如下:

// AbstractPlatformTransactionManager.java
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    // <1> 先從當前事務管理器中獲取 DataSource 對象,然后嘗試以它作為一個 Key 從一個 ThreadLocal 的 Map 中獲取對應的 ConnectionHolder 連接對象
    // 會包裝成一個 DataSourceTransactionObject 對象返回
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    // <2> 如果沒有 @Transactional 注解對應的元信息,則創建一個默認的 TransactionDefinition 對象
    if (definition == null) {
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }

    // <3> 如果上面 `transaction` 數據源事務對象已有 Connection 連接,且正處於一個事務中,表示當前線程已經在一個事務中了
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        // <3.1> 根據 Spring 事務傳播級別進行不同的處理,同時創建一個 DefaultTransactionStatus 事務狀態對象,包含以下信息:
        // TransactionDefinition 事務定義、DataSourceTransactionObject 數據源事務對象、
        // 是否需要新創建一個事務、是否需要一個新的事務同步器、被掛起的事務資源對象
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // <4> 否則,當前線程沒有事務

    // 超時不能小於默認值
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // <4.1> 如果是 **MANDATORY** 事務傳播級別(當前線程已經在一個事務中,則加入該事務,否則拋出異常),因為當前線程沒有事務,此時拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // <4.2> 否則,如果事務傳播級別為 **REQUIRED|REQUIRES_NEW|NESTED**
    else if (
        // 如果當前線程已經在一個事務中,則加入該事務,否則新建一個事務(默認)
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        // 無論如何都會創建一個新的事務,如果當前線程已經在一個事務中,則掛起當前事務,創建一個新的事務
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
        // 執行一個嵌套事務
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) 
    {
        // <4.2.1> 創建一個“空”的掛起資源對象
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            // 是否需要新的事務同步器,默認為 true
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // <4.2.2> 創建一個 DefaultTransactionStatus 事務狀態對象,設置相關屬性
            // 這里 `newTransaction` 參數為 `true`,表示是一個新的事務
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // <4.2.3> 【關鍵】執行 begin 操作,如果沒有 Connection 數據庫連接,則通過 DataSource 創建一個新的連接
            // 設置 Connection 的隔離級別、是否只讀,並執行 Connection#setAutoCommit(false) 方法,不自動提交
            // 同時將 DataSource(數據源對象)和 ConnectionHolder(數據庫連接持有者)保存至 ThreadLocal 中
            doBegin(transaction, definition);
            // <4.2.4> 借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量
            prepareSynchronization(status, definition);
            // <4.2.5> 返回上面創建的 DefaultTransactionStatus 事務狀態對象
            return status;
        } catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    // <4.3> 否則,創建一個“空”的事務狀態對象
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        // 是否需要新的事務同步器,默認為 true
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 創建一個 DefaultTransactionStatus 事務狀態對象,設置相關屬性,這里也是一個新的事務
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

整個過程如下:

  1. 先從當前事務管理器中獲取 DataSource 對象,然后嘗試以它作為一個 Key 從一個 ThreadLocal 的 Map 中獲取對應的 ConnectionHolder 連接對象(沒有就是 null),最終包裝成一個 DataSourceTransactionObject 對象返回

    // DataSourceTransactionManager.java
    @Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        // 從事務同步管理器根據 DataSource 獲取 Connection 連接持有者
        // 就是從它的 ThreadLocal 中獲取
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }
    // TransactionSynchronizationManager.java
    /**
     * 事務資源,兩種數據鍵值對
     * 1. 會話工廠和會話,SqlSessionFactory -> SqlSessionHolder
     * 2. 數據源和連接,DataSource -> ConnectionHolder
     */
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");
    
  2. 如果沒有 @Transactional 注解對應的元信息,則創建一個默認的 TransactionDefinition 對象

  3. 如果上面 transaction 數據源事務對象已有 Connection 連接,且正處於一個事務中,表示當前線程已經在一個事務中了

    // DataSourceTransactionManager.java
    @Override
    protected boolean isExistingTransaction(Object transaction) {
        // 獲取這個數據源事務對象
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 是否已有 Connection 連接,且正處於一個事務中
        return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
    }
    
    1. 調用 handleExistingTransaction(..) 方法,根據 Spring 事務傳播級別進行不同的處理,同時創建一個 DefaultTransactionStatus 事務狀態對象並返回
  4. 否則,當前線程沒有事務

    1. 如果是 MANDATORY 事務傳播級別(當前線程已經在一個事務中,則加入該事務,否則拋出異常),因為當前線程沒有事務,此時拋出異常
    2. 否則,如果事務傳播級別為 REQUIRED | REQUIRES_NEW | NESTED
      1. 創建一個“空”的被掛起的資源對象
      2. 創建一個 DefaultTransactionStatus 事務狀態對象,設置相關屬性,這里 newTransaction 參數為 true(記住),表示是一個新的事務
      3. 【關鍵】調用 doBegin(..) 方法,執行 begin 操作,如果沒有 Connection 數據庫連接,則通過 DataSource 創建一個新的連接;會設置 Connection 的隔離級別、是否只讀,並執行 Connection#setAutoCommit(false) 方法,不自動提交;同時將 DataSource(數據源對象)和 ConnectionHolder(數據庫連接持有者)保存至 ThreadLocal 中
      4. 借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量,例如當前事務的隔離級別、是否只讀、是否處於事務中等
      5. 返回上面創建的 DefaultTransactionStatus 事務狀態對象
    3. 否則,創建一個“空”的事務狀態對象
      1. 創建一個 DefaultTransactionStatus 事務狀態對象,不使用事務

整個處理過程稍微有點復雜,不過流程非常清晰,當沒有事務時,根據事務的傳播級別決定是否需要創建一個事務,創建過程主要在上面的第 4.2.3 步;當正處於一個事務中時,在 3.1 步,根據事務的傳播級別判斷是否需要創建一個新的事務,或者加入該事務等操作;接下來我們來看看這兩種情況

1.2 doBegin 方法

doBegin(..) 方法,創建一個新的事務,如下:

// AbstractPlatformTransactionManager.java
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        // <1> 如果沒有 Connection 數據庫連接,或者連接處於事務同步狀態
        if (!txObject.hasConnectionHolder() ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // <1.1> 通過 DataSource 創建一個 Connection 數據庫連接
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // <1.2> 重置 ConnectionHolder 連接持有者,封裝剛創建的數據庫連接
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        // <2> 設置ConnectionHolder 連接持有者處於事務同步中
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // <3> 獲取 Connection 數據庫連接
        con = txObject.getConnectionHolder().getConnection();

        // <4> 設置 Connection 是否只讀和事務隔離性
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        // <5> 保存之前的事務隔離級別(被掛起的事務)
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        // <6> 將自動提交關閉
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }

        // <7> 如果需要強制設置只讀(默認不需要),且連接本身是只讀的,則這里提前設置事務的只讀性
        prepareTransactionalConnection(con, definition);
        // <8> 當前 Connection 數據庫連接正處於一個事務中
        txObject.getConnectionHolder().setTransactionActive(true);

        // <9> 設置超時時間
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // Bind the connection holder to the thread.
        // <10> 如果是新的事務,則將 DataSource(數據源對象)和 ConnectionHolder(數據庫連接持有者)保存至 ThreadLocal 中
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }
    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

過程如下:

  1. 如果沒有 Connection 數據庫連接,或者連接處於事務同步狀態
    1. 通過 DataSource 創建一個 Connection 數據庫連接
    2. 重置 ConnectionHolder 連接持有者,封裝剛創建的數據庫連接
  2. 設置 ConnectionHolder 連接持有者處於事務同步中
  3. 獲取 Connection 數據庫連接,設置是否只讀、事務隔離性、超時時間,並將 autocommit 設置為 fasle,不自動提交
  4. 保存之前的事務隔離級別(被掛起的事務)
  5. 如果需要強制設置只讀(默認不需要),且連接本身是只讀的,則這里提前設置事務的只讀性
  6. 設置當前 ConnectionHolder 數據庫連接正處於一個事務中
  7. 如果是新的事務,則將 DataSource(數據源對象)和 ConnectionHolder(數據庫連接持有者)保存至 ThreadLocal 中

整個過程關鍵在於第 3 步將 autocommit 設置為 false,不會自動提交,這樣一來,可以在一個事務中根據行為作出相應的操作,例如出現異常進行回滾,沒有問題則進行提交

接下來我們來看看 Spring 對於當前線程正處於一個事務中時,如何進行處理的

1.3 handleExistingTransaction 方法

handleExistingTransaction(..) 方法,處理已存在事務的情況,如下:

// AbstractPlatformTransactionManager.java
private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    // <1> 如果是 **NEVER** 事務傳播級別,因為當前線程正處於一個事務中,此時拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    // <2> 否則,如果是 **NOT_SUPPORTED** 事務傳播級別,因為當前線程正處於一個事務中,此時掛起事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // <2.1> 事務掛起,也就是從 ThreadLocal 中移除各種對象,並返回一個掛起的資源對象(包含所有被移除的對象)
        Object suspendedResources = suspend(transaction);
        // 是否需要一個創建一個事務同步器(默認為 true)
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // <2.2> 創建一個 DefaultTransactionStatus 事務狀態對象,設置相關屬性,包括被掛起的資源
        // 設置 `transaction` 為 null(當前沒有事務),`newTransaction` 為 `false`,表示不是一個新的事務
        // 同時借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // <3> 否則,如果是 **REQUIRES_NEW** 事務傳播級別(無論如何都會創建一個新的事務),因為當前線程正處於一個事務中,此時掛起當前事務,創建一個新的事務
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // <3.1> 事務掛起,也就是從 ThreadLocal 中移除各種對象,並返回一個掛起的資源對象(包含所有被移除的對象)
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 是否需要一個創建一個事務同步器(默認為 true)
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // <3.2> 創建一個事務狀態對象,設置相關屬性,包括被掛起的資源
            // 設置 `newTransaction` 為 `true`,表示是一個新的事務
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // <3.3> 【關鍵】執行 begin 操作,如果沒有 Connection 數據庫連接,則通過 DataSource 創建一個新的連接
            // 設置 Connection 的隔離級別、是否只讀,並執行 Connection#setAutoCommit(false) 方法,不自動提交
            // 同時將 DataSource(數據源對象)和 ConnectionHolder(數據庫連接持有者)保存至 ThreadLocal 中
            doBegin(transaction, definition);
            // <3.4> 借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量
            prepareSynchronization(status, definition);
            // <3.5> 返回上面創建的 DefaultTransactionStatus 事務狀態對象
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            // 在拋出異常前喚醒剛才被掛起的資源
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // <4> 否則,如果是 **NESTED** 事務傳播級別(執行一個嵌套事務),還是使用當前線程的事務,不過設置了保存點,相當於一個嵌套事務,在 Mysql 中是采用 SAVEPOINT 來實現的
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        // <4.1> 如果支持使用保存點(默認為 true)
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            // 創建一個事務狀態對象,設置相關屬性,這里設置了不是一個新的事務,也不是一個新的事務同步器
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            // 創建一個保存點,調用 Connection#setSavepoint(String) 方法
            status.createAndHoldSavepoint();
            return status;
        }
        // <4.2> 否則,例如 JtaTransactionManager(JTA 事務),暫時忽略
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 創建一個事務狀態對象,設置相關屬性,這里設置了是一個新的事務
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            // 執行 begin 操作,暫時忽略
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    // <5> 走到這里了,表示就使用當前已存在的事務,例如 **REQUIRED** 傳播級別

    // <6> 判斷定義的 IsolationLevel 和已存在的事務的 IsolationLevel 進行校驗(默認為 false)
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    // 是否是一個新的事務同步器(默認為 true)
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // <7> 創建一個 DefaultTransactionStatus 事務狀態對象
    // 設置 `newTransaction` 為 `false`,表示不是一個新的事務
    // 同時借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量
    // 注意這里用的 `definition` 是當前 `@Transactional` 注解的相關屬性,所以隔離級別等屬性是當前定義的,而不是當前已存在的事務的隔離級別
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

整個過程如下:

  1. 如果是 NEVER 事務傳播級別,因為當前線程正處於一個事務中,此時拋出異常
  2. 否則,如果是 NOT_SUPPORTED 事務傳播級別,因為當前線程正處於一個事務中,此時掛起事務,不使用事務
    1. 調用 suspend(..) 方法,將當前事務掛起,也就是從 ThreadLocal 中移除各種對象,並返回一個掛起的資源對象(包含所有被移除的對象)
    2. 創建一個 DefaultTransactionStatus 事務狀態對象,設置相關屬性,包括被掛起的資源,會設置 transactionnull不使用事務),同時借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量
  3. 否則,如果是 REQUIRES_NEW 事務傳播級別(無論如何都會創建一個新的事務),因為當前線程正處於一個事務中,此時掛起當前事務,創建一個新的事務
    1. 調用 suspend(..) 方法,將當前事務掛起,也就是從 ThreadLocal 中移除各種對象,並返回一個掛起的資源對象(包含所有被移除的對象)
    2. 創建一個事務狀態對象,設置相關屬性,包括被掛起的資源,會設置 newTransactiontrue,表示是一個新的事務
    3. 【關鍵】調用 doBegin(..) 方法,執行 begin 操作,前面的 1.2 小節已經分析過,不再贅述
    4. 借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量
    5. 返回上面創建的 DefaultTransactionStatus 事務狀態對象
  4. 否則,如果是 NESTED 事務傳播級別(執行一個嵌套事務),還是使用當前線程的事務,不過設置了保存點,相當於一個嵌套事務,在 Mysql 中是采用 SAVEPOINT 來實現的
    1. 如果支持使用保存點(默認為 true)
      1. 創建一個事務狀態對象,設置相關屬性,這里設置了 newTransactionfalse,表示不是一個新的事務
      2. 創建一個保存點,調用 Connection#setSavepoint(String) 方法
      3. 返回上面創建的 DefaultTransactionStatus 事務狀態對象
    2. 否則,例如 JtaTransactionManager(JTA 事務),暫時忽略
  5. 走到這里了,表示就使用當前已存在的事務,也就是SUPPORTSREQUIRED 兩種傳播級別
  6. 判斷是否需要對定義的隔離級別和已存在的事務的隔離級別進行校驗(默認為 false
  7. 創建一個 DefaultTransactionStatus 事務狀態對象,設置 newTransactionfalse,表示不是一個新的事務,還是使用當前事務;同時借助 TransactionSynchronizationManager 事務同步管理器設置相關 ThreadLocal 變量;注意這里用的 definition 是當前 @Transactional 注解的相關屬性,所以隔離級別等屬性是當前定義的,而不是當前已存在的事務的隔離級別

整個處理過程並不復雜,其中掛起當前事務會調用 suspend(..) 方法,我們一起來看看

1.4 suspend 方法

suspend(..) 方法,如果當前正處於一個事務中,傳播級別為 NOT_SUPPORTED 或者 REQUIRES_NEW,則需要掛起當前事務,然后不使用事務或者創建一個新的事務,如下:

// AbstractPlatformTransactionManager.java
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    // <1> 當前線程已有 TransactionSynchronization 事務同步器
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
            // <1.1> 將當前線程的 TransactionSynchronization 全部掛起,也就是從 ThreadLocal 中移除,並返回掛起的對象
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                // <1.2> 掛起事務,也就是從 ThreadLocal 中移除,並返回掛起的 ConnectionHolder 對象
                suspendedResources = doSuspend(transaction);
            }
            // <1.3> 解除綁定當前事務各種屬性,名稱、只讀、隔離級別、是否是真實的事務
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            // <1.4> 返回被掛起的資源對象(對上面被掛起的對象進行封裝)
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            // 在拋出異常前喚醒剛才被掛起的資源
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    // <2> 否則,如果當前數據源事務對象不為空,則掛起
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        // <2.1> 掛起事務,也就是從 ThreadLocal 中移除,並返回掛起的 ConnectionHolder 對象
        Object suspendedResources = doSuspend(transaction);
        // <2.2> 返回被掛起的資源對象(對上面被掛起的對象進行封裝)
        return new SuspendedResourcesHolder(suspendedResources);
    }
    // <3> 否則,什么都不用做,返回一個空對象
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

掛起當前事務的過程如下:

  1. 如果當前線程已有 TransactionSynchronization 事務同步器

    1. 將當前線程的 TransactionSynchronization 全部掛起,也就是從 ThreadLocal 中移除,並返回掛起的對象

    2. 掛起事務,也就是從 ThreadLocal 中移除,並返回掛起的 ConnectionHolder 對象

      // DataSourceTransactionManager.java
      @Override
      protected Object doSuspend(Object transaction) {
          DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
          // 將 Connection 連接持有者置空
          txObject.setConnectionHolder(null);
          // 將 ThreadLocal 中的 DataSource 移除
          return TransactionSynchronizationManager.unbindResource(obtainDataSource());
      }
      
    3. 解除綁定當前事務各種屬性,名稱、只讀、隔離級別、是否是真實的事務

    4. 返回被掛起的資源對象(對上面被掛起的對象進行封裝)

  2. 否則,如果當前數據源事務對象不為空,則掛起

    1. 掛起事務,和 1.2 步相同
    2. 返回被掛起的資源對象(對上面被掛起的對象進行封裝)
  3. 否則,什么都不用做,返回一個空對象

可以看到,掛起當前事務就是從 ThreadLocal 中移除相關資源,會將這些資源封裝成一個對象返回,因為后續需要喚醒這些被掛起的資源(重新設置到 ThreadLocal 中)

小節

Spring 創建事務的過程主要分為兩種情況,當前線程不處於一個事務中和正處於一個事務中,兩種情況都需要根據事務的傳播級別來做出不同的處理。創建一個事務的核心就是調用 Connection#setAutocommit(false) 方法,將自動提交關閉,這樣一來,就可以在一個事務中根據行為作出相應的操作,例如出現異常進行回滾,沒有問題則進行提交

當前線程不處於一個事務中:

  • 如果是MANDATORY傳播級別,則拋出異常
  • 否則,如果是 REQUIRED | REQUIRES_NEW | NESTED 傳播級別,則“創建”一個事務,將數據庫的 commit 設置為 false,此時會設置事務狀態里面的 newTransaction 屬性 true,表示是一個新的事務
  • 否則,創建一個“空”的事務狀態對象,也就是不使用事務

當前線程正處於一個事務中:

  • 如果是NEVER傳播級別,則拋出異常
  • 否則,如果是NOT_SUPPORTED傳播級別,則將當前事務掛起,然后創建一個“空”的事務狀態對象,也就是不使用事務
  • 否則,如果是REQUIRES_NEW 傳播級別,則將當前事務掛起,然后“創建”一個事務,將數據庫的 commit 設置為 false,此時會設置事務狀態里面的 newTransaction 屬性 true,表示是一個新的事務;同時還保存了被掛起的事務相關資源,在本次事務結束后會喚醒它
  • 否則,如果是 NESTED 傳播級別,則沿用當前事務,就是設置事務狀態里面的 newTransaction 屬性 false,表示不是一個新的事務,不過會調用 Connection#setSavepoint(String) 方法創建一個 SAVEPOINT 保存點,相當於嵌套事務
  • 否則,就是 SUPPORTS | REQUIRED 傳播級別,沿用當前事務,就是設置事務狀態里面的 newTransaction 屬性 false,表示不是一個新的事務

注意到 DefaultTransactionStatus 事務狀態對象有一個 newTransaction 屬性,通過它可以知道是否是一個新的事務,在后續的 commitrollback 有着關鍵的作用

至此,關於 Spring 創建事務的內容差不多就結束了,接下來我們來看看 Spring 是如何提交一個事務的

2. 提交事務

TransactionInterceptor 事務攔截處理過程中,如果方法的執行過程沒有拋出異常,那么此時我們是不是需要調用 Connection#commit() 方法,提交本次事務,我們一起來看看 Spring 的處理過程

// TransactionAspectSupport.java
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        // 通過 DataSourceTransactionManager 提交當前事務
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

可以看到是通過 DataSourceTransactionManager 提交當前事務

2.1 commit 方法
// AbstractPlatformTransactionManager.java
@Override
public final void commit(TransactionStatus status) throws TransactionException {
    // <1> 如果事務已完成,此時又提交,則拋出異常
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // <2> 事務明確標記為回滾
    if (defStatus.isLocalRollbackOnly()) {
        // 進行回滾過程(預料之中)
        processRollback(defStatus, false);
        return;
    }

    // <3> 判斷全局回滾時是否需要提交(默認不需要),且當前事務為全局回滾
    // 例如 **REQUIRED** 傳播級別,當已有一個事務時則加入其中,此時如果拋出異常,則會設置為全局回滾,那么當事務進行提交時,對於整個事務都需要回滾
    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        // 進行回滾過程(預料之外)
        processRollback(defStatus, true);
        return;
    }

    // <4> 執行提交事務
    processCommit(defStatus);
}

完成事務的過程如下:

  1. 如果事務已完成,此時又提交,則拋出異常

  2. 事務明確標記為回滾(暫時忽略),則調用 processRollback(..) 方法進行“回滾”

  3. 判斷全局回滾時是否需要提交(默認不需要),且當前事務為全局回滾,則調用 processRollback(..) 方法進行“回滾”

    例如REQUIRED傳播級別,當已有一個事務時則加入其中,此時如果拋出異常,則會設置為全局回滾,那么當事務進行提交時,對於整個事務都需要回滾

  4. 調用 processCommit(..) 方法,執行提交事務

可以看到並不一定會“提交”,當標記需要全局回滾的狀態時會進行“回滾”,這一小節我們重點關注第 4

2.2 processCommit 方法

processCommit(..) 方法,執行事務的提交過程,如下:

// AbstractPlatformTransactionManager.java
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;
        try {
            boolean unexpectedRollback = false;
            // <1> 進行三個前置操作
            prepareForCommit(status); // <1.1> 在 Spring 中為空方法
            // <1.2> 調用 TransactionSynchronization#beforeCommit 方法
            // 例如在 Mybatis-Spring 中的 SqlSessionSynchronization 會調用其 SqlSession#commit() 方法,提交批量操作,刷新緩存
            triggerBeforeCommit(status);
            // <1.3> 調用 TransactionSynchronization#beforeCompletion 方法
            // 由 Spring 事務托管,不是真的關閉連接,從 ThreadLocal 中刪除 DataSource 和 ConnectionHolder 的映射關系
            // 例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,
            // 且調用其 SqlSession#close() 方法
            triggerBeforeCompletion(status);
            // <2> 標記三個前置操作已完成
            beforeCompletionInvoked = true;

            // <3> 有保存點,即嵌套事務
            if (status.hasSavepoint()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
                // <3.1> 釋放保存點,等外層的事務進行提交
                status.releaseHeldSavepoint();
            }
            // <4> 否則,如果是一個新的事務
            else if (status.isNewTransaction()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
                // <4.1> 提交事務,執行 Connection#commit() 方法
                doCommit(status);
            }
            // <5> 否則,在事務被標記為全局回滾的情況下是否提前失敗(默認為 false)
            else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            // 如果全局標記為僅回滾,但是提交時沒有得到異常,則這里拋出異常
            // 目的是需要回滾
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            // 觸發完成后事務同步,狀態為回滾
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        // 事務異常
        catch (TransactionException ex) {
            // can only be caused by doCommit
            // 提交失敗回滾
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            }
            // 觸發完成后回調,事務同步狀態為未知
            else {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        // 運行時異常或者其它異常
        catch (RuntimeException | Error ex) {
            // 如果上面三個前置步驟未完成,調用最后一個前置步驟,即調用 TransactionSynchronization#beforeCompletion 方法
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            // 提交異常回滾
            doRollbackOnCommitException(status, ex);
            throw ex;
        }

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            // <6> 觸發提交后的回調,調用 TransactionSynchronization#afterCommit 方法
            // JMS 會有相關操作,暫時忽略
            triggerAfterCommit(status);
        }
        finally {
            // <7> 觸發完成后的回調,事務同步狀態為已提交,調用 TransactionSynchronization#afterCompletion 方法
            // 例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,
            // 且調用其 SqlSession#close() 方法,解決可能出現的跨線程的情況
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        // <8> 在完成后清理,清理相關資源,“釋放”連接,喚醒被掛起的資源
        cleanupAfterCompletion(status);
    }
}

提交過程如下:

  1. 進行三個前置操作

    1. 准備工作,在 Spring 中為空方法
    2. 調用 TransactionSynchronization#beforeCommit 方法,例如在 Mybatis-Spring 中的 SqlSessionSynchronization 會調用其 SqlSession#commit() 方法,提交批量操作,刷新緩存
    3. 調用 TransactionSynchronization#beforeCompletion 方法,由 Spring 事務托管,不是真的關閉連接,從 ThreadLocal 中刪除 DataSource 和 ConnectionHolder 的映射關系;例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,且調用其 SqlSession#close() 方法
  2. 標記三個前置操作已完成

  3. 如果有保存點,即嵌套事務,則調用 Connection#releaseSavepoint(Savepoint) 方法釋放保存點,等外層的事務進行提交

  4. 否則,如果是一個新的事務,根據之前一直提到的 newTransaction 屬性進行判斷是否是一個新的事務

    1. 提交事務,執行 Connection#commit() 方法

      // DataSourceTransactionManager.java
      @Override
      protected void doCommit(DefaultTransactionStatus status) {
          DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
          Connection con = txObject.getConnectionHolder().getConnection();
          if (status.isDebug()) {
              logger.debug("Committing JDBC transaction on Connection [" + con + "]");
          }
          try {
              con.commit();
          }
          catch (SQLException ex) {
              throw new TransactionSystemException("Could not commit JDBC transaction", ex);
          }
      }
      
  5. 否則,在事務被標記為全局回滾的情況下是否提前失敗(默認為 false

  6. 觸發提交后的回調,調用 TransactionSynchronization#afterCommit 方法,JMS 會有相關操作,暫時忽略

  7. 觸發完成后的回調,事務同步狀態為已提交,調用 TransactionSynchronization#afterCompletion 方法,例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,且調用其 SqlSession#close() 方法,解決可能出現的跨線程的情況

  8. 在完成后清理,清理相關資源,“釋放”連接,喚醒被掛起的資源,如下:

    // AbstractPlatformTransactionManager.java
    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        // <1> 設置為已完成
        status.setCompleted();
        // <2> 如果是一個新的事務同步器
        if (status.isNewSynchronization()) {
            // 清理事務管理器中的 ThreadLocal 相關資源,包括事務同步器、事務名稱、只讀屬性、隔離級別、真實的事務激活狀態
            TransactionSynchronizationManager.clear();
        }
        // <3> 如果是一個新的事務
        if (status.isNewTransaction()) {
            // 清理 Connection 資源,例如釋放 Connection 連接,將其引用計數減一(不會真的關閉)
            // 如果這個 `con` 是中途創建的,和 ThreadLocal 中的不一致,則需要關閉
            doCleanupAfterCompletion(status.getTransaction());
        }
        // <4> 如果之前有被掛起的事務,則喚醒
        if (status.getSuspendedResources() != null) {
            Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
            // 喚醒被掛起的事務和資源,重新將 DataSource 和 ConnectionHolder 的映射綁定到 ThreadLocal 中
            // 將之前掛起的相關屬性重新設置到 ThreadLocal 中
            resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }
    

整個過程在提交事務的前后會進行相關處理,例如清理資源;對於嵌套事務,這里會釋放保存點,等外層的事務進行提交;對於新的事務,這里會調用Connection#commit()方法提交事務;其他情況不會真的提交事務,在這里僅清理相關資源,喚醒被掛起的資源

3. 回滾事務

TransactionInterceptor 事務攔截處理過程中,如果方法的執行過程拋出異常,那么此時我們是不是需要調用 Connection#roback() 方法,對本次事務進行回滾,我們一起來看看 Spring 的處理過程

// TransactionAspectSupport.java
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        // 如果 @Transactional 配置了需要對那些異常進行回退,則需要判斷拋出的異常是否匹配
        // 沒有配置的話只處理 RuntimeException 或者 Error 兩種異常
        // <1> 如果異常類型匹配成功,則進行回滾
        if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                // 回滾操作
                // 嵌套事務,回滾到保存點;否則,新事務,回滾;否則,加入到的一個事務,設置為需要回滾
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            }
            catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            }
            catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                throw ex2;
            }
        }
        // <2> 否則,不需要回滾,提交事務
        else {
            // We don't roll back on this exception.
            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
            try {
                // 提交操作
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
            catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            }
            catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                throw ex2;
            }
        }
    }
}

處理異常的過程如下:

  1. 如果異常類型匹配成功,則進行回滾,如果 @Transactional 配置了需要對哪些異常進行回退,則需要判斷拋出的異常是否匹配,沒有配置的話只處理 RuntimeException 或者 Error 兩種異常
    1. 回滾操作,調用 AbstractPlatformTransactionManager#rollback(TransactionStatus) 方法
  2. 否則,異常類型不匹配
    1. 還是進行提交操作, 調用 AbstractPlatformTransactionManager#commit(TransactionStatus) 方法,在上面的“提交事務”小節中已經講過

可以看到,出現了異常不一定會回滾,需要異常類型匹配

3.1 rollback 方法
// AbstractPlatformTransactionManager.java
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
   // <1> 如果事務已完成,此時又回滾,則拋出異常
   if (status.isCompleted()) {
      throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
   }

   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
   // <2> 進行回滾(預料之中)
   processRollback(defStatus, false);
}
3.2 processRollback 方法
// AbstractPlatformTransactionManager.java
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;
        try {
            // <1> 調用 TransactionSynchronization#beforeCompletion 方法
            // 由 Spring 事務托管,不是真的關閉連接,從 ThreadLocal 中刪除 DataSource 和 ConnectionHolder 的映射關系
            // 例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,
            // 且調用其 SqlSession#close() 方法
            triggerBeforeCompletion(status);

            // <2> 如果有 Savepoint 保存點
            if (status.hasSavepoint()) {
                // 回滾到保存點,調用 Connection#rollback(Savepoint) 方法
                status.rollbackToHeldSavepoint();
            }
            // <3> 否則,如果是新的事務,例如傳播級別為 **REQUIRED_NEW** 則一定是一個新的事務
            else if (status.isNewTransaction()) {
                // 事務回滾,調用 Connection#rollback() 方法
                doRollback(status);
            }
            // <4> 否則,不是新的事務也沒有保存點,那就是加入到一個已有的事務這種情況,例如 **REQUIRED** 傳播級別,如果已存在一個事務,則加入其中
            else {
                // Participating in larger transaction
                if (status.hasTransaction()) {
                    // 如果已經標記為回滾,或當加入事務失敗時全局回滾(默認 true)
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        // 設置當前 ConnectionHolder#rollbackOnly 為 true
                        // 在這個事務提交的時候進行回滾
                        doSetRollbackOnly(status);
                    }
                    else { }
                }
                else { }
                // Unexpected rollback only matters here if we're asked to fail early
                // 在事務被標記為全局回滾的情況下是否提前失敗
                // 默認為 false,表示不希望拋出異常
                if (!isFailEarlyOnGlobalRollbackOnly()) {
                    // 那么設置 unexpectedRollback 為 false
                    unexpectedRollback = false;
                }
            }
        }
        // 運行時異常或者其它異常
        catch (RuntimeException | Error ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }

        // 觸發完成后的回調,事務同步狀態為已提交,調用 TransactionSynchronization#afterCompletion 方法
        // 例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,
        // 且調用其 SqlSession#close() 方法,解決可能出現的跨線程的情況
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

        // Raise UnexpectedRollbackException if we had a global rollback-only marker
        // 通過上面可以看到,通常情況這里不會拋出異常
        if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
                    "Transaction rolled back because it has been marked as rollback-only");
        }
    }
    finally {
        // 在完成后清理,清理相關資源,“釋放”連接,喚醒被掛起的資源
        cleanupAfterCompletion(status);
    }
}

回退過程如下:

  1. 調用 TransactionSynchronization#beforeCompletion 方法,由 Spring 事務托管,不是真的關閉連接,從 ThreadLocal 中刪除 DataSource 和 ConnectionHolder 的映射關系;例如在 Mybatis-Spring 中的 SqlSessionSynchronization 中,會從 ThreadLocal 中刪除 SqlSessionFactory 和 SqlSessionHolder 的映射關系,且調用其 SqlSession#close() 方法

  2. 如果有 Savepoint 保存點,也就是嵌套事務,則回滾到保存點,調用 Connection#rollback(Savepoint) 方法回滾到保存點,再調用Connection#releaseSavepoint(Savepoint) 方法釋放該保存點

  3. 否則,如果是新的事務,例如傳播級別為 REQUIRED_NEW 則一定是一個新的事務,則對當前事務進行回滾,調用 Connection#rollback() 方法,如下:

    // DataSourceTransactionManager.java
    @Override
    protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            con.rollback();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
    }
    
  4. 否則,不是新的事務也沒有保存點,那就是加入到一個已有的事務這種情況,例如 REQUIRED 傳播級別,如果已存在一個事務,則加入其中

    1. 如果已經標記為回滾,或當加入事務失敗時全局回滾(默認 true),那么設置 ConnectionHolderrollbackOnlytrue,也就是標記需要全局回滾,對應到前面“提交事務”的時候會判斷是否標記為全局回滾,標記了則進行回滾,而不是提交

可以看到,對於默認的REQUIRED事務傳播級別,如果已有一個事務(“物理事務”),則加入到當前事務中(相當於創建了一個“邏輯事務”),當這個“邏輯事務”出現異常時,整個事務(包括外面的“物理事務”)都需要回滾

總結

本文對 Spring 事務做了比較詳細的講解,我們通過 @EnableTransactionManagement 注解驅動整個 Spring 事務模塊,此時會往 IoC 注入一個 PointcutAdvisor 事務切面,關聯了一個 TransactionAttributeSourcePointcut(Pointcut)事務切點和一個 TransactionInterceptor(Advice)事務攔截器,關於 Spring AOP 的相關內容對於不熟悉的小伙伴可查看我前面的一系列文章。

這個 TransactionAttributeSourcePointcut(Pointcut)事務切點,它里面關聯了一個 AnnotationTransactionAttributeSource 事務屬性資源對象,通過它解析這個方法(或者類)上面的 @Transactional 注解;底層需要借助 SpringTransactionAnnotationParser 進行解析,解析出一個 TransactionAttribute 事務屬性對象,並緩存;沒有解析出對應的 TransactionAttribute 對象也就不會被事務攔截器攔截,否則,需要為這個 Bean 創建一個代理對象。

這個 TransactionInterceptor(Advice)事務攔截器讓方法的執行處於一個事務中(如果定義了 @Transactional 注解,且被 public 修飾符修飾);首先會創建一個事務(如果有必要),最核心就是將數據庫的 commit 設置為 false,不自動提交,在方法執行完后進行提交(或者回滾);事務的攔截處理過程更多的細節可查看本文全部內容。

拓展

Spirng 事務(Transactions)的底層實現總體上是這樣的:以 @EnableXxx 模塊驅動注解驅動整個模塊,同時會注入一個 PointcutAdvisor 切面,關聯一個 Pointcut 和一個 Advice 通知;通過 Pointcut 篩選出符合條件的 Bean;然后在 Advice 中進行攔截處理,實現相應的功能。

Spring 緩存(Caching)的底層實現和 Spirng 事務(Transactions)整體上差不多,當你對 Spirng 事務(Transactions)的底層了解后,你會發現 Spring 緩存(Caching)的實現基本是照搬過來的。

Spring 異步處理(Async)的底層實現和上面兩者類似(原理差不多),不過它沒有直接注入一個 PointcutAdvisor 切面,而是注入了一個 AbstractAdvisingBeanPostProcessor 對象,繼承 ProxyProcessorSupport(AOP 代理配置類),且實現 BeanPostProcessor 接口;在這個對象里面會關聯一個 AsyncAnnotationAdvisor 切面對象,然后通過實現 BeanPostProcessor 接口在 Spring Bean 的生命周期中的初始化后進行擴展,對於符合條件的 Bean 會通過 ProxyFactory 創建一個代理對象;AsyncAnnotationAdvisor 關聯的 Advice 會對方法進行攔截處理,也就是將方法的執行放入一個 Executor 線程池中執行,會返回一個 Future 可用於獲取執行結果。

其實你會發現許多的開源框架,對於整合 Spring 的實現都是通過這種方式來實現的(如果需要借助 Spring AOP)😃😃😃

至此,關於 Spring IoC、Spring AOP、Spring TX 三個核心模塊已經講的差不多了,接下來我們一起來看看 Spring Boot 的相關內容


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM