springboot 事務執行全流程分析


springboot 事務執行全流程分析

在上篇文章springboot 事務創建流程源碼分析中主要講了springboot事務創建的過程,本次我們來看看事務具體執行的過程。

這里關於幾個名稱提前先達成一致:

  1. com.springboot.transaction.service.impl.UserServiceImpl這個類我們稱它為原始類,它的對象我們稱原始對象

  2. springboot通過aop生成的com.springboot.transaction.service.impl.UserServiceImpl的子類,我們稱它為代理類,它的對象我們稱代理對象

在main方法中會有這一句System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY,p);這里的p是設置的springboot動態生成代碼的本地保存位置,在生成代理類中,我們就可以在這里找到對應的所有動態代理類的class文件,通過反編譯,我們也能了解更多的信息

為了避免其他框架對事務執行的干擾,本篇文章數據庫使用的是原生的jdbctemplate。主要是其他框架,類似mybatis之類,在事務過程中也會插入自己框架的代碼。

本篇博客的源碼路徑:https://github.com/wbo112/blogdemo/tree/main/springbootdemo/springboot-jdbc-transaction

1. 事務方法執行前的准備工作

在服務啟動后,我們在瀏覽器中輸入http://localhost:8080/addUser來請求后台。就會請求到com.springboot.transaction.jdbc.controller.UserController的addUsers方法,具體會執行到return userService.addUser(); 這句代碼,這里的userService就是上篇文章中講到的代理類,這就會調用到代理類中的addUser方法。

上篇文章我們也貼出來了反編譯的userService代理類,具體會調到下面的代碼

//代理類中的方法
    public final boolean addUser() {
      //這里的this.CGLIB$CALLBACK_0,就是上篇文章中講到的在最終生成代理類后調用setCallbacks方法賦值的
      //這里的this.CGLIB$CALLBACK_0就是CglibAopProxy$DynamicAdvisedInterceptor
        MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
        if (var10000 == null) {
            CGLIB$BIND_CALLBACKS(this);
            var10000 = this.CGLIB$CALLBACK_0;
        }

      			//CGLIB$addUser$0$Method是原始類的方法public boolean com.springboot.transaction.jdbc.service.impl.UserServiceImpl.addUser()
      //CGLIB$addUser$0$Proxy是MethodProxy類的對象,最終的調用就是通過MethodProxy.invoke來完成的
        if (var10000 != null) {
            Object var1 = var10000.intercept(this, CGLIB$addUser$0$Method, CGLIB$emptyArgs, CGLIB$addUser$0$Proxy);
            return var1 == null ? false : (Boolean)var1;
        } else {
            return super.addUser();
        }
    }

上面的代碼就會調用到CglibAopProxy$DynamicAdvisedInterceptor的intercept方法,我們繼續進到這個里面去看看

		public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
			Object oldProxy = null;
			boolean setProxyContext = false;
			Object target = null;
			TargetSource targetSource = this.advised.getTargetSource();
			try {
				if (this.advised.exposeProxy) {
					// Make invocation available if necessary.
					oldProxy = AopContext.setCurrentProxy(proxy);
					setProxyContext = true;
				}
				// Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...
        //這里會獲取我們的原始對象和原始類
				target = targetSource.getTarget();
				Class<?> targetClass = (target != null ? target.getClass() : null);
        //這里會獲取具體需要執行攔截的攔截器列表,我們這里只會有一個org.springframework.transaction.interceptor.TransactionInterceptor的對象,這個也是在上篇文章中有講到,是作為一個bean對象加載的
        //這里的this.advised是org.springframework.aop.framework.ProxyFactory的對象,關於這個類在上篇文章中有過描述,這里就不再說類
        //我們先進到這個方法里面去看看攔截器列表是如何獲取到的
				List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
				Object retVal;
				// Check whether we only have one InvokerInterceptor: that is,
				// no real advice, but just reflective invocation of the target.
				if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
					// We can skip creating a MethodInvocation: just invoke the target directly.
					// Note that the final invoker must be an InvokerInterceptor, so we know
					// it does nothing but a reflective operation on the target, and no hot
					// swapping or fancy proxying.
					Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
					retVal = methodProxy.invoke(target, argsToUse);
				}
				else {
					// We need to create a method invocation...
          //在這里就會去生成CglibMethodInvocation對象,並通過調用process來完成整個事務過程的處理的。
					retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
				}
        //在這里會對返回的結果做個簡單的判斷,因為原始類型不能為null,這里會判斷方法的返回類型是不是原始類型,當前的返回值是不是null,如果兩者都是就拋出異常
				retVal = processReturnType(proxy, target, method, retVal);
				return retVal;
			}
			finally {
				if (target != null && !targetSource.isStatic()) {
					targetSource.releaseTarget(target);
				}
				if (setProxyContext) {
					// Restore old proxy.
					AopContext.setCurrentProxy(oldProxy);
				}
			}
		}

下面我們看看this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);這句代碼具體的執行情況

	//這個方法是在AdvisedSupport類中
	
	public List<Object> getInterceptorsAndDynamicInterceptionAdvice(Method method, @Nullable Class<?> targetClass) {
    //這里會對方法的攔截器列表做個緩存
    //當前這里緩存中是沒有的,所以會走到if分支中
		MethodCacheKey cacheKey = new MethodCacheKey(method);
		List<Object> cached = this.methodCache.get(cacheKey);
		if (cached == null) {
      //我們進到這個分支中看看
      //這里的this.advisorChainFactory是一個成員變量,在聲明的時候就初始化了,之后沒有調用set方法重新賦值	 	                //advisorChainFactory = new DefaultAdvisorChainFactory();
			cached = this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(
					this, method, targetClass);
      //獲取到對應的攔截器列表后,加到緩存中,下次就可以直接獲取到
			this.methodCache.put(cacheKey, cached);
		}
		return cached;
	}

看看this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(this, method, targetClass)這句的執行過程

//這個方法是DefaultAdvisorChainFactory
@Override
	public List<Object> getInterceptorsAndDynamicInterceptionAdvice(
			Advised config, Method method, @Nullable Class<?> targetClass) {

		// This is somewhat tricky... We have to process introductions first,
		// but we need to preserve order in the ultimate list.
		AdvisorAdapterRegistry registry = GlobalAdvisorAdapterRegistry.getInstance();
    //這里的advisors數組中只有一個對象,BeanFactoryTransactionAttributeSourceAdvisor。這個類上篇文章中也就有講,這里也就不再說了
    
		Advisor[] advisors = config.getAdvisors();
		List<Object> interceptorList = new ArrayList<>(advisors.length);
		Class<?> actualClass = (targetClass != null ? targetClass : method.getDeclaringClass());
		Boolean hasIntroductions = null;

		for (Advisor advisor : advisors) {
			if (advisor instanceof PointcutAdvisor) {
				// Add it conditionally.
				PointcutAdvisor pointcutAdvisor = (PointcutAdvisor) advisor;
				if (config.isPreFiltered() || pointcutAdvisor.getPointcut().getClassFilter().matches(actualClass)) {
					......//這里都是判斷方法是否和advisors是否匹配,和上篇文章中的匹配步驟基本也是一致的,這里也就略過了
					if (match) {
            //如果能夠匹配,就會從advisor中獲取方法的攔截器列表,加到list中最終返回
            //getInterceptors這個方法也比較簡單,就是直接通過advisor.getAdvice()獲取攔截器
            //不過這個方法里面有一些其他的知識點,所以我們還是進去一起看看
						MethodInterceptor[] interceptors = registry.getInterceptors(advisor);
						//后面的代碼基本都走不到,這里就直接略過了
              ......
		return interceptorList;
	}
	//DefaultAdvisorAdapterRegistry中的方法
	@Override
	public MethodInterceptor[] getInterceptors(Advisor advisor) throws UnknownAdviceTypeException {
		List<MethodInterceptor> interceptors = new ArrayList<>(3);
    //這里的advice就是TransactionInterceptor了,上篇文章也講過了
    //Advice對應我們中文的術語好像叫通知,感覺好別扭😂
		Advice advice = advisor.getAdvice(); 
    //TransactionInterceptor實現了 MethodInterceptor,所以會走到這個if分支里面
		if (advice instanceof MethodInterceptor) {
			interceptors.add((MethodInterceptor) advice);
		}
    //這里的adapters有3個對象,分別是new MethodBeoreAdviceAdapter(),new AfterReturningAdviceAdapter(),new ThrowsAdviceAdapter()
    //這里也可以看到,aop的攔截(Advice通知)主要分為4種
    //MethodInterceptor 是環繞通知,就是方法執行前后都會進行攔截處理
    //MethodBeforeAdviceAdapter 前置通知,就是先執行攔截,后執行原始的方法;這種主要是對原始方法執行的參數做一些處理
    //AfterReturningAdviceAdapter 后置通知,就是先執行原始的方法,后執行攔截;這種主要是對原始方法的執行結果做一些處理
    //ThrowsAdviceAdapter這種就是如果原始方法執行拋出了異常,會由它來處理
    
    //這4種攔截都是沒有繼承關系的,所以不會走進這個for if分支中
		for (AdvisorAdapter adapter : this.adapters) {
			if (adapter.supportsAdvice(advice)) {
				interceptors.add(adapter.getInterceptor(advisor));
			}
		}
		if (interceptors.isEmpty()) {
			throw new UnknownAdviceTypeException(advisor.getAdvice());
		}
		return interceptors.toArray(new MethodInterceptor[0]);
	}

下面我們再次回到CglibAopProxy類的intercept方法中看看retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();首先會對CglibMethodInvocation對象進行創建,這個就不去看了,都是簡單的賦值操作,我們重點看看proceed方法的執行

//proceed會直接通過 super.proceed();調用父類的方法,我們直接看它父類的方法吧

	public Object proceed() throws Throwable {
		// We start with an index of -1 and increment early.
    //初始化currentInterceptorIndex=-1;this.interceptorsAndDynamicMethodMatchers.size()=1
    //所以依次會從interceptorsAndDynamicMethodMatchers中獲取攔截器來執行,執行完所有攔截器后執行invokeJoinpoint
		if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
			return invokeJoinpoint();
		}

    //依次獲取攔截器
    //我們攔截器列表中只有一個TransactionInterceptor
		Object interceptorOrInterceptionAdvice =
				this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
    //不會走到這個分支
		if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
					.......
		}
		else {
			// It's an interceptor, so we just invoke it: The pointcut will have
			// been evaluated statically before this object was constructed.
      //會走到這里,走到TransactionInterceptor.invoke方法
			return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
		}
	}
	@Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
    //這里會得到原始類
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
    //看這個方法的名字就能明白,"在事務內調用"
    //這里的invocation就是上面的CglibMethodInvocation,可以看到proceedWithInvocation方法中又會重新調到proceed方法,就是我們上面說的遞歸調用了
    //我們繼續進到invokeWithinTransaction里面去看看,invokeWithinTransaction方法在它的父類TransactionAspectSupport中
		return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
			@Override
			@Nullable
			public Object proceedWithInvocation() throws Throwable {
				return invocation.proceed();
			}
			@Override
			public Object getTarget() {
				return invocation.getThis();
			}
			@Override
			public Object[] getArguments() {
				return invocation.getArguments();
			}
		});
	}

invokeWithinTransaction這個方法的內容比較多,可以說事務內部處理都是在這個方法中完成的

	//這個方法是在TransactionAspectSupport

  protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
    //這個屬性是在創建TransactionInterceptor這個bean時調用set方法賦值的,上篇文章中有講到
		TransactionAttributeSource tas = getTransactionAttributeSource();
    //這個是獲取到對應方法的事務的屬性信息,這個在執行創建事務的時候就已經有獲取,所以這里可以直接從緩存中獲取到。這個上篇文章中也講到了
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    //這個是獲取到事務管理器,第一次也是沒有的,會從beanfactory中獲取並加到緩存中,這里獲取到的JdbcTransactionManager的對象
    //具體是在DataSourceTransactionManagerAutoConfiguration中創建的,具體的創建過程和上篇文章講到的其他bean基本意識一致的
		final TransactionManager tm = determineTransactionManager(txAttr);
		//下面的分支條件不成立,不會進入,我們也就不看了,這部分代碼也就直接省略掉了
		if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
			......
		}

    //這里主要是判斷類型,如果是PlatformTransactionManager類型,就強轉;如果不是就泡異常。
    //JdbcTransactionManager間接實現了PlatformTransactionManager,這里是可以強轉的
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    //這個是得到連接點的標識:具體就是全類名+.+方法名拼接個字符串。
    //我們事務注解都是針對方法的,這個主要是用來如果對應事務屬性中事務的名字為空,就會用這個來代碼對應方法的事務屬性的名字,可以通過這個來區分不同的事務
    //我們這里是com.springboot.transaction.service.impl.UserServiceImpl.findAll
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    //下面的條件是成立,會進到if分支里面去
		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
      //看這個方法名就知道這里就是根據需要來創建事務,具體那些場景需要創建事務,那些場景不需要呢?
      //對springboot事務概念不了解的可以先看看之前寫的關於springboot事務概念的一些簡單介紹         https://www.cnblogs.com/wbo112/p/15427078.html
      //我們具體進到這個方法去看看
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
        //在上面創建完TransactionInfo后在這里會調用proceedWithInvocation,繼續遞歸調回到proceed,最后調到invokeJoinpoint方法,在這個里面最終調用到this.methodProxy.invoke(this.target, this.arguments);中。關於methodProxy我們會在文章的最后做分析
        //我們@Transactional注解的方法最終會在這里面執行
				retVal = invocation.proceedWithInvocation();
        //后面的代碼都是根據@Transactional注解的方法執行是否拋出異常,決定后續是否進行事務提交還是回滾操作,這部分代碼我們在后面單獨說

//TransactionAspectSupport的方法
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

		// If no name specified, apply method identification as transaction name.
  //我們這里事務默認是沒有名字的,這里就會對事務屬性進行包裝,通過上一步生成的joinpointIdentification來作為事務的名字
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {
					return joinpointIdentification;
				}
			};
		}

		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
        //在這里,就會具體的創建事務,這個方法是在AbstractPlatformTransactionManager,我們進去看看
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
  //在這里會創建TransactionInfo對象,並通過ThreadLocal的方式綁定到當前線程
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}
//AbstractPlatformTransactionManager的方法
@Override
	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// Use defaults if no transaction definition given.
    //這里的definition已經在上一步通過匿名內部類的方式包裝成了DelegatingTransactionAttribute的子類
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
		//這里會創建一個datasource的事務對象,並獲取當前上下文的數據庫連接,如果沒有連接的話獲取到就是null,我們進去這個方法看看
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

    //這里會判斷ConnectionHolder是不是null,如果ConnectionHolder不是null,還要判斷是不是事務已經啟動,如果已經啟動了事務,就是已啟動的事務中進行后續的執行
    //我們這里ConnectionHolder是null,不會進入這個分支
    //注意:這里的ConnectionHolder是區分不同DataSource的,只有同一個DataSource的當前ConnectionHolder才會走到這里
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
    //這里是判斷事務的超時時間,默認是-1,表示不超時。如果<-1,那就拋出異常
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}
		//后面就是根據事務的傳播行為去進行處理了,我們這次是默認的傳播行為,也就是Propagation.REQUIRED;所以會走到后面的else if分支中
		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      //對於同一個DataSource,如果當前有開啟事務就會從上面isExistingTransaction走進去,不會走到這里,所以這里針對的是不同的DataSource的,我們當前只有一個DataSource,這里面不會執行什么,就不進去了
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
			}
			try {
        //會在這里開啟事務,我們進去看看
				return startTransaction(def, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}

//DataSourceTransactionManager中的方法	
@Override
	protected Object doGetTransaction() {
    //首先創建一個DataSourceTransactionObject,這個類主要是為類持有connection
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    //設置是否允許嵌套
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
    
    //obtainDataSource()獲取到的就是底層的DataSource,具體是什么不用關注;這個是在創建JdbcTransactionManager的時候作為bean的參數傳入的。springboot中默認的數據庫連接池是Hikari,所以這個DataSource默認就是HikariDataSource
    //下面TransactionSynchronizationManager.getResource看看是如何獲取ConnectionHolder的
    //這里獲取到的conHolder是null
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    //這里會把獲取到的連接賦值給DataSourceTransactionObject
    //第二個參數表示是不是一個新連接。我們這里只是獲取到之前的(這里只是獲取,不用管是不是null),並沒有創建新的,所以這里是false
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}
	//TransactionSynchronizationManager的方法
	@Nullable
	public static Object getResource(Object key) {
    //這里首先如果傳入的key是包裝過的,就去掉包裝;我們這里沒有包裝,所以actualKey也就是key
		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    //這里會通過ThreadLocal的方式來獲取當前線程的連接,這里是一個map,key是datasource,所以這里是可以同時有多個datasouce的,並不會沖突。
    //我們是第一次進來,map也是null,這里的value也就是null了。
		Object value = doGetResource(actualKey);
		if (value != null && logger.isTraceEnabled()) {
			logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
					Thread.currentThread().getName() + "]");
		}
    //這里返回null
		return value;
	}

//AbstractPlatformTransactionManager的方法
	private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
		//首先判斷當前的事務傳播屬性,SYNCHRONIZATION_NEVER是不需要開啟事務的
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    //這里是創建一個DefaultTransactionStatus對象
		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    //在這里,會判斷是否開啟了自動提交,如果開啟了自動提交就改為手動提交,這樣就需要在執行完所有SQL后,手動執行提交
    //我們進去看看
		doBegin(transaction, definition);
    //這里會在在TransactionSynchronizationManager中設置屬性,准備好事務開始
		prepareSynchronization(status, definition);
		return status;
	}

//DataSourceTransactionManager的方法
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
      //首先會判斷ConnectionHolder是不是null,如果不是null,繼續判斷當前是不是已經在事務中
      //我們這里ConnectionHolder是null,會走進if分支中
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        //這里就是直接通過DataSource獲取數據庫的一個連接Connection
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
        //這里會將Connection包裝成ConnectionHolder,set到DataSourceTransactionObject上
        //同時這里是新創建的ConnectionHolder,第二個參數就是true
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}
			//這里設置開啟事務
			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();
			//這里是獲取數據庫的隔離級別
      //如果我們事務屬性中定義的隔離級別是默認的,那表示我們用數據當前的設置,這時我們不需要修改,這里回返回null
      //如果我們事務屬性中定義的不是默認的,那就先獲取數據庫當前的隔離級別,將數據庫的隔離級別設置成我們定義的,同時返回數據庫之前的隔離級別
			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);
			txObject.setReadOnly(definition.isReadOnly());

			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
			// so we don't want to do it unnecessarily (for example if we've explicitly
			// configured the connection pool to set it already).
      //上面的注釋基本也說明了,這里就是開啟手動提交
			if (con.getAutoCommit()) {
        //這里表示我們在執行完事務要還原成自動提交
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
        //通過這句就是設置手動提交了
				con.setAutoCommit(false);
			}
			//如果我們定義的是只讀事務,在這里就會執行只讀事務的設置
			prepareTransactionalConnection(con, definition);
      //設置事務開啟標志
			txObject.getConnectionHolder().setTransactionActive(true);
			//獲取超時時間,如果不是-1,說明需要設置超時時間。對於非法的超時時間,在前面已經做過判斷了
			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}
			
      //在這里判斷如何是新創建的ConnectionHolder,通過ThreadLocal的方式和DataSource進行綁定,后續如果在同一個事務(只有在同一個DataSource、同一個connection中才是同一個事務)中,就可以直接獲取到
			// Bind the connection holder to the thread.
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}

2. 業務代碼的調用

​ 1. MethodProxy對象的創建

在上面的源碼分析中,我們看到最后是通過調用invokeJoinpoint方法,在這個里面最終調用到this.methodProxy.invoke(this.target, this.arguments)的,那我們看看這里的methodProxy是怎么創建的

它是在我們的代理的CGLIB$STATICHOOK9()方法中創建的,相關的代碼就是下面的了

 Class var0 = Class.forName("com.springboot.transaction.jdbc.service.impl.UserServiceImpl$$EnhancerBySpringCGLIB$$3f0b65e9");
Class var1 = Class.forName("com.springboot.transaction.jdbc.service.impl.UserServiceImpl");
//這里看到它會調用MethodProxy的靜態方法來創建的
//前面兩個參數分別是原始類和代理類,第3個參數我們事務數據對應的方法的簽名
// 第4個參數是原始類中對應的方法名,第5個參數是對應生成的代理類中的方法中
//我們現在進去看看它內部的調用過程
CGLIB$addUser$0$Proxy = MethodProxy.create(var1, var0, "()Z", "addUser", "CGLIB$addUser$0");
	//MethodProxy中的方法
	public static MethodProxy create(Class c1, Class c2, String desc, String name1, String name2) {
		MethodProxy proxy = new MethodProxy();
		proxy.sig1 = new Signature(name1, desc);
		proxy.sig2 = new Signature(name2, desc);
    //前面幾行都比較簡單,都是創建對象,屬性賦值。我們看看下面這行,它會創建一個createInfo對象,並賦值到methodproxy對應屬性上
		proxy.createInfo = new CreateInfo(c1, c2);
		return proxy;
	}

		
		public CreateInfo(Class c1, Class c2) {
      //這里c1=om.springboot.transaction.service.impl.UserServiceImpl.class      
      //c2=com.springboot.transaction.service.impl.UserServiceImpl$$EnhancerBySpringCGLIB$$f0a9d143.class
      //c2也就是我們新生成的代理類
			this.c1 = c1;
			this.c2 = c2;
      //下面會通過ThreadLocal的方式獲取AbstractClassGenerator,主要是為了獲取下面3個屬性,保證代理類和我們這里最終要生成的快速執行方法的類相關設置保持一致
      //我們看看這個是怎么獲取到的
			AbstractClassGenerator fromEnhancer = AbstractClassGenerator.getCurrent();
			if (fromEnhancer != null) {
				namingPolicy = fromEnhancer.getNamingPolicy();
				strategy = fromEnhancer.getStrategy();
				attemptLoad = fromEnhancer.getAttemptLoad();
			}
		}

首先上面的方法調用是在我們執行完創建完事務的代理類之后,加載完代理類后就會調用到上面的方法,我們的代理類中會有個靜態方法,執行類加載后就會執行靜態方法

    static {
        CGLIB$STATICHOOK9();
    }

上面的 CGLIB$STATICHOOK9方法會通過反射的方法獲取我們原始類的方法,也會生成各種methodProxy。

所以 AbstractClassGenerator.getCurrent()這里得到的AbstractClassGenerator其實也就是我們創建代理類時使用的Enhancer對象,可以通過查看方法調用堆棧來確認

查看上圖可以看到當前調用CreateInfo的構造方法,是在AbstractClassGenerator的generate時調用的。而在AbstractClassGenerator的generate的開始部分會有下面幾句

	protected Class generate(ClassLoaderData data) {
		Class gen;
		Object save = CURRENT.get();
    //就在這里把this賦值給了CURRENT,而這里的this就是當前創建代理類用的Enhancer
		CURRENT.set(this);
		......
		}
  1. 下面我們來看看this.methodProxy.invoke(this.target, this.arguments)的的執行

    MethodProxy主要就是用來執行在原始類和代理類中方法調用的。

    主要對應兩個方法:

    • invokeSuper方法,調用代理類中的方法
    • invoke方法,調用原始類的方法
    //MethodProxy的方法	
    public Object invoke(Object obj, Object[] args) throws Throwable {
    		try {
          //在init方法中會判斷fastClassInfo對象是否存在,不存在就會去生成對應對象。
          //init中也會生成2個代理類,下面我們看看init方法
    			init();
    			FastClassInfo fci = fastClassInfo;
    			return fci.f1.invoke(fci.i1, obj, args);
    		}
    		catch (InvocationTargetException ex) {
    			throw ex.getTargetException();
    		}
    		catch (IllegalArgumentException ex) {
    			if (fastClassInfo.i1 < 0)
    				throw new IllegalArgumentException("Protected method: " + sig1);
    			throw ex;
    		}
    	}
    
    //MethodProxy的方法	
    private void init() {
    		/*
    		 * Using a volatile invariant allows us to initialize the FastClass and
    		 * method index pairs atomically.
    		 *
    		 * Double-checked locking is safe with volatile in Java 5.  Before 1.5 this
    		 * code could allow fastClassInfo to be instantiated more than once, which
    		 * appears to be benign.
    		 */
    		if (fastClassInfo == null) {
    			synchronized (initLock) {
    				if (fastClassInfo == null) {
    					CreateInfo ci = createInfo;
    					//首先創建一個FastClassInfo對象
              //FastClassInfo主要有4個屬性,在下面分別會對這4個屬性賦值
    					FastClassInfo fci = new FastClassInfo();
              //這里的ci就是上面的createInfo,
              //ci.c1是我們的原始類com.springboot.transaction.service.impl.UserServiceImpl.class
              //下面下面分別調用helper方法,生成快速調用原始類和代理類方法的FastClass類
    					fci.f1 = helper(ci, ci.c1);
    					fci.f2 = helper(ci, ci.c2);
              //這里會根據方法簽名在FastClass中返回一個整數,最終在調用invoke方法是時,會傳入這個整數,在FastClass內部,通過switch-case的方式快速調用對應的方法
    					fci.i1 = fci.f1.getIndex(sig1);
    					fci.i2 = fci.f2.getIndex(sig2);
    					fastClassInfo = fci;
    					createInfo = null;
    				}
    			}
    		}
    	}
    
    

    helper方法也會動態的生成一個叫FastClass的類,這個類的作用主要是通過數字下標的方式可以快速的調用一些類的方法。主要是代理類的方法。

    個人理解

    比如我們的代理類中方法CGLIB$findAll$0,這個是動態生成的。

    再比如雖然我們的方法是不是動態生成的,但是springboot框架也是不知道我們的代碼的,也沒法直接調我們的代碼。

    由於上面兩種情況,需要平台去調我們的代碼方法,所以需要通過FastClass去根據我們的方法動態生成調用

    下面的兩行代碼分別會生成對應我們原始類和代理類的FastClass

    //helper方法還是通過創建FastClass.Generator對象,調用它的create動態創建FastClass
    fci.f1 = helper(ci, ci.c1);
    fci.f2 = helper(ci, ci.c2);
    

    3.FastClass.Generator生成的FastClass。由於結構是一致的,這里就只展現代理類CGLIB$findAll$0方法對應的FastClass

    它主要對應兩個方法

    • getIndex根據傳入的Signature或者方法名跟類名返回一個整數,代碼在invoke中的索引
    • invoke根據傳入的整數,和對應原始類或者代理類的參數,去調用代理類或原始類的方法

    動態生成類class文件會根據我們main方法中指定的System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY,p);位置在這里生成.class。反編譯就能看到

3. 事務方法執行后處理

前面已經看到事務的執行是在TransactionInterceptor這個類中invokeWithinTransaction方法中執行的,那我們現在去看看我們的代碼執行完后,springboot是如何執行事務提交及后續工作的

//TransactionAspectSupport中的方法			
try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
        //前面說到會在這里執行我們@Transactional注解的方法
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
        //如果拋出異常,就會走到這里,在這里面根據情況進行回滾等操作,之后繼續向外拋出異常
        //注意:這里捕獲的是Throwable,這是所有異常的最頂層接口,所以肯定能捕獲到
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
        //在這里將TransactionInfo之前的信息繼續存放到對應ThreadLocal變量中
        //注意:由於是存放到ThreadLocal變量中,所以也不會有並發問題
				cleanupTransactionInfo(txInfo);
			}
			//如果事務執行失敗,在前面就拋出異常了,所以走到下面代碼的,肯定是執行成功了,后面就會需要去提交事務
				//這里不會存在io.vavr.control.Try.class,也就不會走到這個分支了
			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}
			//在這里會進行事務的提交,我們進到這里面去看看
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}

		else {
			......
	}
//TransactionAspectSupport中的方法		
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
			}
      //通過事務管理器去提交事務,我們繼續看里面的代碼
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}
	}
//AbstractPlatformTransactionManager中的方法

public final void commit(TransactionStatus status) throws TransactionException {
  //首先判斷當前事務是否已經完成,completed這個標志是事務完成后設置成true的,這里還是false
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
			processRollback(defStatus, false);
			return;
		}

		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus, true);
			return;
		}
		//在這里執行事務的提交
		processCommit(defStatus);
	}

	/**
	 * Process an actual commit.
	 * Rollback-only flags have already been checked and applied.
	 * @param status object representing the transaction
	 * @throws TransactionException in case of commit failure
	 */
	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				boolean unexpectedRollback = false;
        //下面是個空方法,留給子類去繼承的
				prepareForCommit(status);
        //下面是做一些提交前的處理
        //主要是獲取通過TransactionSynchronizationManager.getSynchronizations()獲取所有TransactionSynchronization
        //TransactionSynchronization這個接口為第三方框架服務的,在我們事務提交的階段實行一些框架內部相關的工作

        //如果數據庫使用mybatis框架的話,這里會獲取到SqlSessionUtils$SqlSessionSynchronization,這個主要是mybatis使用的,對我們事務框架的邏輯沒有太大關系
        //如果我們數據庫交互用的是原生的JdbcTemplate這里就是空的,所以triggerBeforeCommit,triggerBeforeCompletion都不會執行什么操作  
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				//如果之前有保存點的話,這里就需要釋放掉,因為我們后面會對整個操作進行提交
        //我們沒有設置保存點,不會走到這里
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					status.releaseHeldSavepoint();
				}
        //我們本地是開啟的一個新的事務,會走到這里面去
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
          //對於SmartTransactionObject,有可能會有回滾的情況,我們這里不是SmartTransactionObject,這里的返回值就是false
					unexpectedRollback = status.isGlobalRollbackOnly();
          //在這里執行真正的提交,比較簡單就不進去看了。就是通過調用底層jdbc的connection的commit方法手動提交事務
					doCommit(status);
				}
				else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}

				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (unexpectedRollback) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
        //這本次也是空操作
				triggerAfterCommit(status);
			}
			finally {
        //這本次也是空操作
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
      //在這里會進行一些清理工作,之前threadlocal變量設置的資源修改回原來的,把重新還原成自動提交,connection還給連接池等等
			cleanupAfterCompletion(status);
		}
	}

4. 業務代碼在事務和非事務中的區別

上面說的都是springboot對於事務的處理,對於我們業務代碼來說,是否使用事務,內部處理也是有一些區別的。

  1. 主要是區別:
  • 如果是在事務中執行,我們獲取的真正執行數據庫連接connection是之前在事務創建時獲取的那個,也就是關閉自動提交,改為手動提交的那個。
  • 如果不是在事務中執行,就會直接數據庫連接池中直接獲取
  1. 我們看看這部分關鍵代碼

    我們執行數據庫操作獲取數據庫連接connection都是通過這句代碼的=DataSourceUtils.getConnection(obtainDataSource());

    所以我們只需要看看這句代碼的內部邏輯就會明白

    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    		Assert.notNull(dataSource, "No DataSource specified");
    //如果之前開啟了事務,這里的ConnectionHolder默認就不會為空,下面這個if條件也是成立的,就會從這里獲取到數據庫的連接
    		ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    		if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
    			conHolder.requested();
    			if (!conHolder.hasConnection()) {
    				logger.debug("Fetching resumed JDBC Connection from DataSource");
    				conHolder.setConnection(fetchConnection(dataSource));
    			}
    			return conHolder.getConnection();
    		}
    		// Else we either got no holder or an empty thread-bound holder here.
    
    		logger.debug("Fetching JDBC Connection from DataSource");
      	//如果沒在事務中,或者從事務中獲取到的是null,就會數據庫連接池中新獲取一個
    		Connection con = fetchConnection(dataSource);
    		//在這里會判斷如果當前已經在事務中,能走到這個if分支中,說明當前事務中沒有connection,那么就會把本次獲取的connection方式到事務管理器中,在同一個事務中,執行多次數據庫操作,最多只有一次會走到這里,之后的數據庫操作都會從上面的if分支中返回
    		if (TransactionSynchronizationManager.isSynchronizationActive()) {
    			try {
    				// Use same Connection for further JDBC actions within the transaction.
    				// Thread-bound object will get removed by synchronization at transaction completion.
    				ConnectionHolder holderToUse = conHolder;
    				if (holderToUse == null) {
    					holderToUse = new ConnectionHolder(con);
    				}
    				else {
    					holderToUse.setConnection(con);
    				}
    				holderToUse.requested();
    				TransactionSynchronizationManager.registerSynchronization(
    						new ConnectionSynchronization(holderToUse, dataSource));
    				holderToUse.setSynchronizedWithTransaction(true);
    				if (holderToUse != conHolder) {
    					TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
    				}
    			}
    			catch (RuntimeException ex) {
    				// Unexpected exception from external delegation call -> close Connection and rethrow.
    				releaseConnection(con, dataSource);
    				throw ex;
    			}
    		}
    
    		return con;
    	}
    

5. 總結

​ 從上面的流程可以看到其實事務大致流程如下:

  1. 通過動態生成代理類的方式,調用攔截器org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor的intercept方法。

  2. 在intercept方法中,獲取對應的攔截器列表,調用invoke方法處理。我們這里只會有一個org.springframework.transaction.interceptor.TransactionInterceptor。繼而調用到它的invokeWithinTransaction方法

  3. 在TransactionInterceptor的invokeWithinTransaction方法中會執行一系列事務的准備工作

    • 獲取事務屬性信息、事務管理器

    • 創建事務資源對象,獲取數據庫連接connection與當前事務進行綁定,根據當前事務定義的屬性,設置隔離級別,傳播行為,關閉自動提交,設置為手動提交。

    • 設置當前事務為開啟狀態,將當前事務資源與當前線程進行綁定

    • 通過invocation.proceedWithInvocation()調用,最終通過methodproxy.invoke最終調用到我們的業務代碼

    • 根據我們業務代碼執行是否拋出異常來進行后續處理

      1. 如果拋出異常,認為當前事務執行失敗,執行回滾等等回退操作,最終將異常繼續向外拋
      2. 如果沒有拋出異常,認為當前事務執行成功,執行事務的提交,同時清理資源。將事務隔離級別,傳播行為等等還原成之前的狀態。
    • 最終將結果返回給調用者

整個事務處理大致的流程就是上面這些了 。

同時對比Springboot中注解@Configuration源碼分析,你也會發現,springboot中動態生成類,基本都是一樣的。都是套的同一個模版。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM