1,@Async注解
Annotation that marks a method as a candidate for <i>asynchronous</i> execution.
Can also be used at the type level, in which case all of the type's methods are
considered as asynchronous.
這個注解標志着某個方法作作為異步執行候選者。這個注解可以加在類的層級上,表示該類的所有方法都被認
為是異步執行。
<p>In terms of target method signatures, any parameter types are supported.
However, the return type is constrained to either {@code void} or
{@link java.util.concurrent.Future}. In the latter case, you may declare the
more specific {@link org.springframework.util.concurrent.ListenableFuture} or
{@link java.util.concurrent.CompletableFuture} types which allow for richer
interaction with the asynchronous task and for immediate composition with
further processing steps.
在目標的方法簽名方面,任何的方法參數類型都是支持的,返回類型只能是void或者Future類,使用后一種
返回類型時你可以特別的聲明返回ListenableFuture或者CompleteFuture類型,這兩種返回類型允許更豐富
地與異步任務進行交互以及組裝返回值用於下一步的處理。
<p>A {@code Future} handle returned from the proxy will be an actual asynchronous
{@code Future} that can be used to track the result of the asynchronous method
execution. However, since the target method needs to implement the same signature,
it will have to return a temporary {@code Future} handle that just passes a value
through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult},
or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.
從代理返回的{@code Future}句柄將是實際的異步{@code Future}可用於跟蹤異步方法的結果執行。
然而,因為目標方法需要實現同樣的方法簽名,目標方法也需要返回一個僅傳遞值的臨時的Future
句柄通過:例如spring的AsyncResult或javax.ejb.AsyncResult或java.util.concurrent.CompletableFuture
來傳遞。
2,@EnableAsync
Enables Spring's asynchronous method execution capability, similar to functionality
found in Spring's {@code <task:*>} XML namespace.
啟用spring異步方法執行功能,該注解的功能和xml中<task:*>的命名空間功能相似
<p>To be used together with @{@link Configuration Configuration} classes as follows,
enabling annotation-driven async processing for an entire Spring application context:
按如下方式與@Configuration注解類同時使用,為整個springs application上下文啟用注解驅動的異步處理
功能。
<pre class="code">
@Configuration
@EnableAsync
public class AppConfig {
}</pre>
{@code MyAsyncBean} is a user-defined type with one or more methods annotated with
either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
annotation, or any custom annotation specified via the {@link #annotation} attribute.
The aspect is added transparently for any registered bean, for instance via this
configuration:
MyAsyncBean是一個用戶定義的類,類中含一個或一個以上@Async注解 或@Asynchronouss注解或任何通過
anntation attribute指定的自定義注解,切面是顯式添加到任何已經注冊到spring容器的bean上的,
舉例,通過這個配置:
<pre class="code">
@Configuration
public class AnotherAppConfig {
@Bean
public MyAsyncBean asyncBean() {
return new MyAsyncBean();
}
}</pre>
<p>By default, Spring will be searching for an associated thread pool definition:
either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
will be used to process async method invocations. Besides, annotated methods having a
{@code void} return type cannot transmit any exception back to the caller. By default,
such uncaught exceptions are only logged.
默認情況下,spring會搜索如下定義的關聯線程池,要么是一個在spring context中唯一的TaskExecutor實現bean,
要么是一個beanname為"taskExecutor"的Executor實現bean,如果以上兩個實例都沒有找到,那么SimpAsyncTaskExcutor會被用來處理異步方法調用。此外,異步注解的方法如果返回類型是void,那么該方法不能將異步方法執行產生的異常信息傳遞到異步方法的調用方默認情況下,這些未捕獲的異常只能在日志中記錄。
<p>To customize all this, implement {@link AsyncConfigurer} and provide:
<ul>
<li>your own {@link java.util.concurrent.Executor Executor} through the
{@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li>
<li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler
getAsyncUncaughtExceptionHandler()}
method.</li>
</ul>
定制所有這些,需要實現AsyncConfigurer接口,和通過AsyncConfigurer#getAsyncExecutor方法提供Exector的
實現類,通過AsyncConfigurer#getAsyncUncaughtExceptionHandler方法提供AsyncUncaughtExceptionHandler接口
的實現類
<p><b>NOTE: {@link AsyncConfigurer} configuration classes get initialized early
in the application context bootstrap. If you need any dependencies on other beans
there, make sure to declare them 'lazy' as far as possible in order to let them
go through other post-processors as well.</b>
<pre class="code">
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncUncaughtExceptionHandler();
}
}</pre>
<p>If only one item needs to be customized, {@code null} can be returned to
keep the default settings. Consider also extending from {@link AsyncConfigurerSupport}
when possible.
如果只定制部分內容,那么以上方法可以返回null來保持默認配置。另外盡可能考慮繼承AsyncConfigurerSupport類
<p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed
Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method
if you want a fully managed bean. In such circumstances it is no longer necessary to
manually call the {@code executor.initialize()} method as this will be invoked
automatically when the bean is initialized.
注意:在上面的例子中,ThreadPoolTaskExecutor類不是完全受spring管理的bean, 在getAsyncExecutor()
方法上添加@Bean注解可以讓它完全受spring管理。在這種情況下executor.initialize()方法不需要手動調用,
因為initizalze()方法會在類初始化時被自動調用。
<p>For reference, the example above can be compared to the following Spring XML
configuration:
舉例,以上的例子可以用xml配置形式進行如下的配置:
<pre class="code">
<beans>EnableAsync
<task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>
<task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>
<bean id="asyncBean" class="com.foo.MyAsyncBean"/>
<bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>
</beans>
</pre>
The above XML-based and JavaConfig-based examples are equivalent except for the
setting of the <em>thread name prefix</em> of the {@code Executor}; this is because
the {@code <task:executor>} element does not expose such an attribute. This
demonstrates how the JavaConfig-based approach allows for maximum configurability
through direct access to actual componentry.
以上xml的配置和java注解的配置基本上是相同的,除了Executor線程名字的前綴配置不同。
這是因為<task:executor>元素沒有暴露這個配置項。這示范了使用java注解能最大化類的可配置性。
<p>The {@link #mode} attribute controls how advice is applied: If the mode is
{@link AdviceMode#PROXY} (the default), then the other attributes control the behavior
of the proxying. Please note that proxy mode allows for interception of calls through
the proxy only; local calls within the same class cannot get intercepted that way.
EnableAsync注解類的屬性mode,控制着切面是如何被應用的:如果mode是AdviceMode#PROXY(默認)
那么其他的屬性控制着代理行為。注意代理模式只允許通過代理方式實現方法調用的攔截;這種方式
下同一個類中的本地方法調用不會被攔截。
<p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the
value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in
this case the {@code spring-aspects} module JAR must be present on the classpath, with
compile-time weaving or load-time weaving applying the aspect to the affected classes.
There is no proxy involved in such a scenario; local calls will be intercepted as well.
注意如果mode被設置成AdviceMode#ASPECTJ,那么#proxyTargetClass屬性會被忽略。同時注意在這種模式下
spring-aspects的jar包需要在項目的classpath下,在編譯時或加載時將切面織入到影響的類。
這種方案下沒有涉及代理;同一類的本地方法調用也可以被攔截。
下面看看在哪一步會將切面織入到影響的類中,在源碼Async類的邊上找到了AsyncAnnotationBeanPostProcessor這個類,解讀類名可以看到這個類是執行Async注解類實例化后的處理操作的。
1,該類有一個setBeanFactory的方法,該方法新建了一個AsyncAnnotationAdvisor類,並將該類設置給成員變量advisor。
2,該類還有一個postProcessAfterInitialization()方法,在bean初始化后,為@Async注解的類生產代理對象,以及設置代理對象的攔截器(DynamicAdvisedInterceptor)。
Bean post-processor that automatically applies asynchronous invocation behavior to any bean that carries the {@link Async} annotation at class or method-level by adding a corresponding {@link AsyncAnnotationAdvisor} to the
exposed proxy (either an existing AOP proxy or a newly generated proxy that implements all of the target's interfaces).
bean后置處理器自動在@Async注解的類或方法的代理上添加異步調用的行為
<p>The {@link TaskExecutor} responsible for the asynchronous execution may be provided as well as the annotation type that indicates a method should be invoked asynchronously. If no annotation type is specified, this post-processor will detect both Spring's {@link Async @Async} annotation as well as the EJB 3.1 {@code javax.ejb.Asynchronous} annotation.
TaskExector負責異步方法的執行,taskExector和annotationType可能同時被提供,標識這個方法會被異步執行。如果annotation type沒有被特殊指定,那么bean后置處理器會探測@Async注解和@Asynchronous注解
<p>For methods having a {@code void} return type, any exception thrown during the asynchronous method invocation cannot be accessed by the caller. An {@link AsyncUncaughtExceptionHandler} can be specified to handle these cases.
<p>Note: The underlying async advisor applies before existing advisors by default, in order to switch to async execution as early as possible in the invocation chain.
注意: 默認情況下潛在的async advisor會優先於存在的advisors進行使用,為了盡可能在調用鏈的早期就切換成異步調用
新建advisor類,並在AsyncAnnotationBeanPostProcessor屬性中設置advisor通知類
//AsyncAnnotationBeanPostProcessor @Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
新建AsyncAnnotationAdvisor類,並設置advice通知,實際的通知類為AnnotationAsyncExecutionInterceptor類,設置pointcut切點
//AsyncAnnotationAdvisor public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); } protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; } /** * Calculate a pointcut for the given async annotation types, if any. * @param asyncAnnotationTypes the async annotation types to introspect * @return the applicable Pointcut object, or {@code null} if none */ protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
AbstractAdvisingBeanPostProcessor類繼承層次
AsyncAnnotationBeanPostProcessor執行postProcessAfterInitialization方法
//AsyncAnnotationBeanPostProcessor @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (this.advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
//DefaultAopProxyFactory /** * Create a new proxy according to the settings in this factory. * <p>Can be called repeatedly. Effect will vary if we've added * or removed interfaces. Can add and remove interceptors. * <p>Uses the given class loader (if necessary for proxy creation). * @param classLoader the class loader to create the proxy with * (or {@code null} for the low-level proxy facility's default) * @return the proxy object */ public Object getProxy(@Nullable ClassLoader classLoader) { return createAopProxy().getProxy(classLoader); }
獲得代理工廠並創建Cglib代理類
//ProxyCreatorSupport /** * Subclasses should call this to get a new AOP proxy. They should <b>not</b> * create an AOP proxy with {@code this} as an argument. */ protected final synchronized AopProxy createAopProxy() { if (!this.active) { activate(); } return getAopProxyFactory().createAopProxy(this); }
創建AopProxy代理工廠,這里創建的代理工廠為ObjenesisCglibAopProxy
//DefaultAopProxyFactory, create a CGLIB proxy if one the following is true for a given instance.optimize is true,proxyTargetClass is true,
// no proxy interfaces have been specified @Override public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException { if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) { Class<?> targetClass = config.getTargetClass(); if (targetClass == null) { throw new AopConfigException("TargetSource cannot determine target class: " + "Either an interface or a target is required for proxy creation."); } if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) { return new JdkDynamicAopProxy(config); } return new ObjenesisCglibAopProxy(config); } else { return new JdkDynamicAopProxy(config); } }
下面是創建代理類的核心邏輯,新建一個Enhancer(增強類),這個類是被代理類的一個子類enhaner.setSuperclass(proxySuperClass),創建代理類以及代理實例,並設置回調
//CglibAopProxy @Override public Object getProxy(@Nullable ClassLoader classLoader) { if (logger.isTraceEnabled()) { logger.trace("Creating CGLIB proxy: " + this.advised.getTargetSource()); } try { Class<?> rootClass = this.advised.getTargetClass(); Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy"); Class<?> proxySuperClass = rootClass; if (ClassUtils.isCglibProxyClass(rootClass)) { proxySuperClass = rootClass.getSuperclass(); Class<?>[] additionalInterfaces = rootClass.getInterfaces(); for (Class<?> additionalInterface : additionalInterfaces) { this.advised.addInterface(additionalInterface); } } // Validate the class, writing log messages as necessary. validateClassIfNecessary(proxySuperClass, classLoader); // Configure CGLIB Enhancer... Enhancer enhancer = createEnhancer(); if (classLoader != null) { enhancer.setClassLoader(classLoader); if (classLoader instanceof SmartClassLoader && ((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) { enhancer.setUseCache(false); } } enhancer.setSuperclass(proxySuperClass); enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised)); enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE); enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader)); Callback[] callbacks = getCallbacks(rootClass); Class<?>[] types = new Class<?>[callbacks.length]; for (int x = 0; x < types.length; x++) { types[x] = callbacks[x].getClass(); } // fixedInterceptorMap only populated at this point, after getCallbacks call above enhancer.setCallbackFilter(new ProxyCallbackFilter( this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset)); enhancer.setCallbackTypes(types); // Generate the proxy class and create a proxy instance. return createProxyClassAndInstance(enhancer, callbacks); } catch (CodeGenerationException | IllegalArgumentException ex) { .. } catch (Throwable ex) { .. } }
獲得代理類的回調,DynamicAdvisedInterceptor是aop方法調用時使用的攔截器,也是代理方法調用時的入口類
//CglibAopProxy private Callback[] getCallbacks(Class<?> rootClass) throws Exception { // Parameters used for optimization choices... boolean exposeProxy = this.advised.isExposeProxy(); boolean isFrozen = this.advised.isFrozen(); boolean isStatic = this.advised.getTargetSource().isStatic(); // Choose an "aop" interceptor (used for AOP calls)aop方法調用時使用的攔截器,在也是代理方法調用時的入口類 Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised); // Choose a "straight to target" interceptor. (used for calls that are // unadvised but can return this). May be required to expose the proxy. Callback targetInterceptor; if (exposeProxy) { targetInterceptor = (isStatic ? new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) : new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource())); } else { targetInterceptor = (isStatic ? new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) : new DynamicUnadvisedInterceptor(this.advised.getTargetSource())); } // Choose a "direct to target" dispatcher (used for // unadvised calls to static targets that cannot return this). Callback targetDispatcher = (isStatic ? new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp()); Callback[] mainCallbacks = new Callback[] { aopInterceptor, // for normal advice targetInterceptor, // invoke target without considering advice, if optimized new SerializableNoOp(), // no override for methods mapped to this targetDispatcher, this.advisedDispatcher, new EqualsInterceptor(this.advised), new HashCodeInterceptor(this.advised) }; Callback[] callbacks; // If the target is a static one and the advice chain is frozen, // then we can make some optimizations by sending the AOP calls // direct to the target using the fixed chain for that method. if (isStatic && isFrozen) { Method[] methods = rootClass.getMethods(); Callback[] fixedCallbacks = new Callback[methods.length]; this.fixedInterceptorMap = new HashMap<>(methods.length); // TODO: small memory optimization here (can skip creation for methods with no advice) for (int x = 0; x < methods.length; x++) { List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass); fixedCallbacks[x] = new FixedChainStaticTargetInterceptor( chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass()); this.fixedInterceptorMap.put(methods[x].toString(), x); } // Now copy both the callbacks from mainCallbacks // and fixedCallbacks into the callbacks array. callbacks = new Callback[mainCallbacks.length + fixedCallbacks.length]; System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length); System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length); this.fixedInterceptorOffset = mainCallbacks.length; } else { callbacks = mainCallbacks; } return callbacks; }
創建代理類以及代理類的實例,並設置代理類的回調(setCallbacks)
//ObjenesisCglibAopProxy @Override @SuppressWarnings("unchecked") protected Object createProxyClassAndInstance(Enhancer enhancer, Callback[] callbacks) { Class<?> proxyClass = enhancer.createClass(); Object proxyInstance = null; if (objenesis.isWorthTrying()) { try { proxyInstance = objenesis.newInstance(proxyClass, enhancer.getUseCache()); } catch (Throwable ex) { logger.debug("Unable to instantiate proxy using Objenesis, " + "falling back to regular proxy construction", ex); } } if (proxyInstance == null) { // Regular instantiation via default constructor... try { Constructor<?> ctor = (this.constructorArgs != null ? proxyClass.getDeclaredConstructor(this.constructorArgTypes) : proxyClass.getDeclaredConstructor()); ReflectionUtils.makeAccessible(ctor); proxyInstance = (this.constructorArgs != null ? ctor.newInstance(this.constructorArgs) : ctor.newInstance()); } catch (Throwable ex) { throw new AopConfigException("Unable to instantiate proxy using Objenesis, " + "and regular proxy instantiation via default constructor fails as well", ex); } } ((Factory) proxyInstance).setCallbacks(callbacks); return proxyInstance; }
@Ansync注解的方法被調用時會被DynamicAdvisedInterceptor攔截,先進入DynamicAdvisedInterceptor.intercept方法
//DynamicAdvisedInterceptor @Override @Nullable public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object oldProxy = null; boolean setProxyContext = false; Object target = null; TargetSource targetSource = this.advised.getTargetSource(); try { if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // Get as late as possible to minimize the time we "own" the target, in case it comes from a pool... target = targetSource.getTarget(); Class<?> targetClass = (target != null ? target.getClass() : null); List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); Object retVal; // Check whether we only have one InvokerInterceptor: that is, // no real advice, but just reflective invocation of the target. if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) { // We can skip creating a MethodInvocation: just invoke the target directly. // Note that the final invoker must be an InvokerInterceptor, so we know // it does nothing but a reflective operation on the target, and no hot // swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = methodProxy.invoke(target, argsToUse); } else { // 實際使用的時Cglib方法代理 We need to create a method invocation... retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed(); } retVal = processReturnType(proxy, target, method, retVal); return retVal; } finally { if (target != null && !targetSource.isStatic()) { targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
@Async注解方法被調用前,會先調用ReflectiveMethodInvocation.process()方法
//ReflectiveMethodInvocation @Override @Nullable public Object proceed() throws Throwable { // We start with an index of -1 and increment early. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass()); if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } }
進入AsyncExecutionInterceptor攔截器,進行方法實際的增強
//AsyncExecutionInterceptor /** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
將實際的執行方法放到線程池中執行,並返回異步方法的Future結果或者null
/** * Delegate for actually executing the given task with the chosen executor. * @param task the task to execute * @param executor the chosen executor * @param returnType the declared return type (potentially a {@link Future} variant) * @return the execution result (potentially a corresponding {@link Future} handle) */ @Nullable protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException(ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }