任務調度(02)Spring Schedule
Spring 3.0 提供兩種任務調度方式:一是定時任務調度;二是異步任務調度。這兩種任務調度方式都是基於 JUC 實現的,是一種非常輕量級的任務調度方式。同時在 spring-context-support 中也整合了 Quartz,文本重點關注 Spring 提供了原生任務調度方式 - @EnableScheduling 和 @EnableAsync。
- 定時任務調度和異步任務調度的基本使用方法。
- 定時任務調度的源碼分析。ScheduledAnnotationBeanPostProcessor 會掃描標注有 @Scheduled 注解的方法,將其注冊到 ScheduledTaskRegistrar 中,真正調度任務的類是 ScheduledTaskRegistrar 中的TaskScheduler。TaskScheduler 實現類最底層是 ScheduledExecutorService,由 JUC 提供。
- 異步任務調度的源碼分析。異步執行和定時執行的最大區別是異步執行可能需要返回執行結果,所以需要對標注有 @Async 的類進行代理。異步任務有兩種執行模式:代理 或 Aspectj,無論那種方法都是以 Spring AOP 作為支撐。AsyncAnnotationBeanPostProcessor 首先創建 AsyncAnnotationAdvisor,如果類或方法上標注有 @Async 注解,則使用 proxyFactory 生成代理對象。
1. 基本用法
1.1 定時調度 - 基於xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<bean id="scheduledXmlTest" class="ScheduledXmlTest"/>
<!--定時調度@Scheduled-->
<task:scheduler id="taskScheduler" pool-size="2"/>
<task:scheduled-tasks scheduler="taskScheduler">
<task:scheduled ref="scheduledXmlTest" method="fixedDelay" fixed-delay="2000"/>
<task:scheduled ref="scheduledXmlTest" method="fixedRate" fixed-rate="2000"/>
<task:scheduled ref="scheduledXmlTest" method="cron" cron="0/2 * * * * *"/>
</task:scheduled-tasks>
<!--異步調度@Async-->
<task:executor id="executor" pool-size="2"/>
<!--啟動注解驅動-->
<task:annotation-driven executor="executor" scheduler="taskScheduler"/>
</beans>
總結: Spring 任務調度總共有 4 個標簽:
task:scheduler
:配置調度器,實現了 org.springframework.scheduling.TaskScheduler 接口,默認是 org.springframework.scheduling.TaskScheduler。task:scheduled-tasks
:配置任務,如果不指定 scheduler 屬性,那么所有的任務都會在一個線程上執行,可能會造成線程阻塞。task:executor
:異步任務調度的執行線程,實現 Executor 接口。task:annotation-driven
:開啟注解驅動,包括 @Scheduled 和 @Async。其中 executor 為異步調度的執行線程池,scheduler 為定時調度的執行線程池。同樣不配置的話,所有的 @Scheduled 任務都是一個線程上執行。
1.2 定時調度 - 注解驅動
@EnableScheduling
public class ScheduledTest {
private Logger logger = LoggerFactory.getLogger(ScheduledTest.class);
public static void main(String[] args) {
SpringApplication.run(ScheduledTest.class);
}
@Scheduled(fixedDelay = 2000)
public void fixedDelay() {
logger.info("fixedDelay");
}
@Scheduled(fixedRate = 2000)
public void fixedRate() {
logger.info("fixedRate");
}
@Scheduled(cron = "0/2 * * * * *")
public void cron() {
logger.info("cron");
}
}
總結: fixedDelay、fixedRate、cron 都是定時的任務調度,那它們有什么區別呢?
fixedDelay
:上一個任務執行結束和下一個任務開始的時間間隔。總時間間隔=方法執行時間 + fixedDelay。fixedRate
:上一個任務執行開始和下一個任務開始的時間間隔。cron
:類似 Linux crontab,如果任務執行時間太長,就會忽略過期的任務。
1.3 異步調度 - 注解驅動
@Async 可以標注在類或方法上,如果標注在類上則表示這個類的所有方法都是異步執行的。
@EnableAsync
//@EnableAsync(proxyTargetClass = false, mode = AdviceMode.PROXY)
public class AsyncTest {
private static final Logger logger = LoggerFactory.getLogger(AsyncTest.class);
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(AsyncTest.class);
AsyncTest asyncTest = context.getBean(AsyncTest.class);
asyncTest.async();
logger.info("main method");
}
@Async
public void async() {
logger.info("async method");
}
}
日志如下:
INFO 2516 --- [ main] AsyncTest : main method
INFO 2516 --- [cTaskExecutor-1] AsyncTest : async method
總結: 可以發現這兩條日志輸出的線程不一致,而且是 main 線程先輸出日志,也就是 @Async 注解后,方法是異步執行的。
注意: @Async 標注在類上,則這類的所有方法都是異常執行。
接下來進行源碼分析。Spring 任務調度的源碼位於 spring-context
工程的 scheduling 包中,其目錄如下:
scheduling
|- annotation # @Scheduled和@Async注解支持
|- concurrent # juc擴展
|- config # 傳統的xml解析支持
|- support # cron解析支持
2. @EnableScheduling 源碼分析
總結: 上述的步驟,有兩個重要的入口:
- 實現
BeanPostProcessor#postProcessAfterInitialization
接口,在每個 bean 實例化完成后,掃描 @Scheduled 注解,注冊到 ScheduledTaskRegistrar 中。 - 監聽 Spring 的容器初始化事件
onApplicationEvent(ContextRefreshedEvent event)
事件,回調 finishRegistration 方法設置 ScheduledTaskRegistrar 的 TaskScheduler。在配置 ScheduledTaskRegistrar 后,調用其 scheduleTasks 方法觸發任務調度。 - 真正調度任務的類是 TaskScheduler 實現類中的 ScheduledExecutorService,由 JUC 提供。
- 查找 TaskScheduler 的實現類實例默認是通過類型查找,若有多個實現則會查找名字為 "taskScheduler" 的實現 Bean,若沒有找到則在 ScheduledTaskRegistrar 調度任務的時候會創建一個 newSingleThreadScheduledExecutor , 將 TaskScheduler 的實現類實例設置到 ScheduledTaskRegistrar 屬性中。
2.1 @Scheduled 注解掃描
@Scheduled 注解的處理分兩步:一是 MethodIntrospector.selectMethods 掃描該 bean 的所有 @@Scheduled和@Schedules注解的方法;二是 processScheduled 方法處理有 @Scheduled 注解的方法。
(1)@Scheduled 掃描
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
// 1. 掃描每個方法上的@Scheduled和@Schedules注解
Map<Method, Set<Scheduled>> annotatedMethods =
MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>)
method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
// 2. 調用processScheduled處理所有的@Scheduled注解的方法
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
} else {
annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled ->
processScheduled(scheduled, method, bean)));
}
}
return bean;
}
總結: 核心步驟就兩步:第一步是 MethodIntrospector.selectMethods
掃描所有方法的 @Scheduled 和@Schedules注解。至於第二步,最核心的處理則委托給了 processScheduled 方法處理。
(2)processScheduled
processScheduled 分別處理 @Scheduled 中的三種情況:cron、fixedDelay、fixedRate 三種場景,將其分別包裝成 CronTask、FixedDelayTask、FixedRateTask 后,注冊到 ScheduledTaskRegistrar 上,當調用其 scheduleTasks 方法觸發任務調度。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = scheduled.initialDelay();
...
// 1. Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (StringUtils.hasLength(cron)) {
if (!Scheduled.CRON_DISABLED.equals(cron)) {
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// 2. Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
// 3. Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
// 4. Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
} catch (IllegalArgumentException ex) {
throw new IllegalStateException();
}
}
總結: processScheduled 代碼有刪減,只保留了核心的邏輯。processScheduled 代碼很簡單,接下來繼續分析定時任務觸發流程。
2.2 定時任務觸發
ScheduledAnnotationBeanPostProcessor 會通過 onApplicationEvent(ContextRefreshedEvent event)
監聽容器初始化事件。一旦容器初始化完成,就會試圖從容器中獲取 TaskScheduler。最終調用 scheduleTasks 觸發定時任務調度。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
finishRegistration();
}
}
private void finishRegistration() {
// 1. 手動配置 TaskScheduler
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
// 2. 自定義配置ScheduledTaskRegistrar。獲取容器中的SchedulingConfigurer
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 3. 如果沒有配置TaskScheduler,則從Spring容器中查找。查找規則如下:
// 1) 查找TaskScheduler,如果有多個則查找名稱為"taskScheduler"
// 2) 查找ScheduledExecutorService,如果有多個則查找名稱為"taskScheduler"
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
} catch (NoUniqueBeanDefinitionException ex) {
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
} catch (NoSuchBeanDefinitionException ex2) {
}
} catch (NoSuchBeanDefinitionException ex) {
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
} catch (NoUniqueBeanDefinitionException ex2) {
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
} catch (NoSuchBeanDefinitionException ex3) {
}
} catch (NoSuchBeanDefinitionException ex2) {
}
}
}
// 4. 觸發定時任務調度
this.registrar.afterPropertiesSet();
}
總結: 一旦 Spring 容器初始化完成,就會觸發定時任務的執行。在 finishRegistration 方法中,主要是配置定時任務執行器,並啟動定時任務。
- TaskScheduler 配置:
手動配置scheduler
->TaskScheduler
->TaskScheduler("taskScheduler")
->ScheduledExecutorService
->ScheduledExecutorService("taskScheduler")
。 - ScheduledTaskRegistrar 配置:自定義 SchedulingConfigurer。
- 觸發定時任務調度:調用 ScheduledTaskRegistrar#scheduleTasks。
- TaskScheduler 是定時任務的最終執行器,底層的實現類是 ScheduledExecutorService,由 JUC 提供。可以看到所有的 @Scheduled 注解的定時任務都會在同一個 TaskScheduler 中執行,不像 xml 一樣可以指定不同的 TaskScheduler。所以使用時要注意方法是否會阻塞,或將配置合適的 TaskScheduler 線程池大小。
2.3 TaskScheduler
TaskScheduler 是定時任務的最終執行器,底層的實現類是 ScheduledExecutorService,由 JUC 提供。
2.3.1 自定義 TaskScheduler
Spring Boot 自定義 TaskScheduler 如下:
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數10:線程池創建時候初始化的線程數
executor.setCorePoolSize(10);
// 最大線程數20:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
executor.setMaxPoolSize(20);
// 緩沖隊列200:用來緩沖執行任務的隊列
executor.setQueueCapacity(200);
// 允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
executor.setKeepAliveSeconds(60);
// 線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
executor.setThreadNamePrefix("taskExecutor-");
// 線程池對拒絕任務的處理策略:這里采用了CallerRunsPolicy策略,
// 當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;
// 如果執行程序已關閉,則會丟棄該任務
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
2.3.2 TaskScheduler接口

(1)schedule(Runnable task, Trigger trigger);
指定一個觸發器執行定時任務。可以使用 CronTrigger 來指定 Cron 表達式,執行定時任務
CronTrigger t = new CronTrigger("0 0 10,14,16 * * ?");
taskScheduler.schedule(this, t);
(2)schedule(Runnable task, Date startTime);
指定一個具體時間點執行定時任務,可以動態的指定時間,開啟任務。只執行一次。
(3)scheduleAtFixedRate(Runnable task, long period);
立即執行,循環任務,指定一個執行周期(毫秒計時)。
PS: 不管上一個周期是否執行完,到時間下個周期就開始執行。
(4)scheduleAtFixedRate(Runnable task, Date startTime, long period);
指定時間開始執行,循環任務,指定一個間隔周期(毫秒計時)。
PS: 不管上一個周期是否執行完,到時間下個周期就開始執行。
(5)scheduleWithFixedDelay(Runnable task, long delay);
立即執行,循環任務,指定一個間隔周期(毫秒計時)。
PS: 上一個周期執行完,等待delay時間,下個周期開始執行。
(6)scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
指定時間開始執行,循環任務,指定一個間隔周期(毫秒計時)
2.3.3 TaskScheduler實現類

TaskScheduler 有三個實現類。
(1)ConcurrentTaskScheduler
ConcurrentTaskScheduler 如果不設置 ScheduledExecutorService,則默認通過 Executors.newSingleThreadScheduledExecutor() 創建單線程池執行。
private ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler();
taskScheduler.schedule(task, new Date());
(2)DefaultManagedTaskScheduler
以當前線程執行任務,這是 ConcurrentTaskScheduler 的子類,添加了 JNDI 的支持。和ConcurrentTaskScheduler 一樣的用法,需要使用 JNDI 可以單獨設置
(3)ThreadPoolTaskScheduler
TaskScheduler 接口的默認實現類,多線程定時任務執行。可以設置執行線程池數(默認一個線程)。
3. @EnableAsync 源碼分析
Spring 異步任務調度,使用了動態代理( JDK 或 CGLIB),依賴 Spring Aop,支持動態代理或 ASPECTJ 兩種模式。
3.1 @EnableAsync
@EnableAsync 是注解驅動的入口,用於配置任務調度的模式。
@EnableAsync(proxyTargetClass = false, mode = AdviceMode.PROXY)
public class AsyncTest {
}
總結: @EnableAsync 注解有兩個重要的屬性:
mode
:表示使用動態代理(PROXY時使用 JDK 或 CGLIB 代理)還是 ASPECTJ 方式(AdviceMode.ASPECTJ)。默認使用 AdviceMode.PROXY,也就是動態代理。如果使用 AdviceMode.ASPECTJ 模式需要引入 spring-aspectj 包。proxyTargetClass
:表示使用 JDK 代理(false)還是CGLIB 代理(true)。默認使用 false,也就是 JDK 動態代理。annotation
:表示自定義注解,默認 @Async。
3.2 @Async
@Async 可以標注在類或方法上,如果在類上,則表示這個類的所有方法都異步執行。
@Async("myExecuter")
public void async() {
logger.info("async method");
}
總結: 和 @Scheduled 不同,@Async 可以指定執行的線程池,默認是執行器是 SimpleAsyncTaskExecutor,即每次執行時都會創建一個新的線程執行任務。可以從日志中看出不配置執行線程池的線程名稱(線程名SimpleAsyncTaskExecutor,日志輸出時線程名稱有截斷):
INFO 2516 --- [ main] AsyncTest : main method
INFO 2516 --- [cTaskExecutor-1] AsyncTest : async method
3.3 執行流程
@EnableAsync 通過 @Import 向 Spring 容器中注入 AsyncConfigurationSelector。上文提到 @EnableAsync 有兩種執行模式:動態代理或 Aspectj,AsyncConfigurationSelector 會根據配置動態注入 ProxyAsyncConfiguration 或 AspectJAsyncConfiguration(需要引入 spring-aspectj 包)。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
總結: Spring 默認為 PROXY 代理模式,即使用 ProxyAsyncConfiguration。下文的源碼分析也是從 ProxyAsyncConfiguration 展開。
3.3.1 初始化流程
總結: Spring 異步任務調度最核心的步驟是生成切面 AsyncAnnotationAdvisor,至於其它的執行邏輯則是依靠 Spring AOP 實現的。主要的核心類如下:
ProxyAsyncConfiguration
:配置類,向 Spring 容器中注入 AsyncConfigurer 和 AsyncAnnotationBeanPostProcessor。AsyncConfigurer
:可以自定義異步執行的線程線程池(executor)和異常(exceptionHandler)處理機制。使用時,只需要將實現了 AsyncConfigurer 接口的類注入到容器中即可。AsyncAnnotationBeanPostProcessor
:最核心類,主要完成兩件事:一是 AsyncAnnotationBeanPostProcessor 初始化時,會執行 setBeanFactory 方法初始化 AsyncAnnotationAdvisor,二是在 bean 初始化完成后生成代理類。Spring 異步任務調度實際上是依賴 Spring AOP 機制。AsyncAnnotationAdvisor
:最核心類,包含切入點(Pointcut)
和通知(Advice)
。AbstractAdvisingBeanPostProcessor(Spring AOP)
:在 bean 實現完成后,根據 ProxyFactory 生成代理類。ProxyFactory(Spring AOP)
:代理工廠。有兩種代理方式:JdkDynamicAopProxy 或 ObjenesisCglibAopProxy。
3.3.2 AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor 主要的工作是初始化 AsyncAnnotationAdvisor,這是 Spring 異步執行的基石。AsyncAnnotationAdvisor 相當於 Spring AOP 的 @Aspect 注解,定義了切入點(Pointcut)和通知(Advice)。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
總結: Spring 異步任務調度除了 AsyncAnnotationAdvisor 不一樣,其余的執行流程和 Spring AOP 一模一樣。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
return bean;
}
...
// 生成動態代理
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
return bean; // No proxy needed.
}
總結: 首先判斷是否需要進行動態代理,如果類或方法上標注有 @Async 注解,則使用 proxyFactory 生成代理對象。
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
// 最終使用 advisor.putcut 判斷是否需要進行代理
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
3.3.3 AsyncAnnotationAdvisor
AsyncAnnotationAdvisor 主要是定義了 AOP 執行的通知(Advice)和切入點(Pointcut)。
(1)構造器
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;
public AsyncAnnotationAdvisor(
Supplier<Executor> executor, Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
} catch (ClassNotFoundException ex) {
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
}
總結: 從上也可以看出 Spring 除了支持 @Async 注解外,還支持 @Asynchronous 注解。另外就是創建通知(advice)和切面(pointcut)。
其實我們也大致可以猜到,AsyncAnnotationAdvisor 實際上將標注有 @Async 注解的方法丟到線程池中異步執行。pointcut 根據注解判斷是否需要異步執行,advice 則是具體的執行邏輯。
(2)buildPointcut
buildPointcut 建立切入點,也就是判斷是否需要異步執行。
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 匹配類上的注解
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// 匹配方法上的注解
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
} else {
// union:表示所有的匹配規則只要有一個匹配上了就返回匹配成功
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
總結: buildPointcut 建立匹配規則,實際上就是只要類或方法上有一個匹配成功,就執行任務調度。
(3)buildAdvice
buildAdvice 方法包含異步執行的邏輯。
protected Advice buildAdvice(
Supplier<Executor> executor, Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
總結: buildAdvice 方法很簡單,將異步執行的邏輯全部委托給了 AnnotationAsyncExecutionInterceptor 完成。
AnnotationAsyncExecutionInterceptor 繼承自 AsyncExecutionInterceptor(Spring AOP),具體的執行邏輯由 Spring AOP 完成。至此容器初始化時,代理已經生成完畢。
3.3.4 執行流程
Spring 異步任務執行的入口是 AnnotationAsyncExecutionInterceptor,繼承自 AsyncExecutionInterceptor。在 Spring AOP 執行時會回調其 invoke 方法。
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
// `1. 獲取最原始的方法 userDeclaredMethod,去除代理,橋接
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 2. 根據方法獲取任務執行的線程池,@Async可以指定要執行的線程池名稱
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(");
}
// 3. 任務封裝成 Callable
Callable<Object> task = () -> {
try {
// 執行任務
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
} catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 4. 提交任務
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
總結: invoke 方法首先要獲取要異步執行的線程池,將任務封裝成一個 Callable 后,丟到該線程池中執行。
- 獲取任務執行的線程池:determineAsyncExecutor
- 任務執行並返回執行結果:doSubmit
(1)determineAsyncExecutor
determineAsyncExecutor 方法用於獲取任務將要在哪個線程池上執行,默認是 SimpleAsyncTaskExecutor。
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 1. 緩存中獲取
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
// 2. @Async 注解指定執行線程池名稱
String qualifier = getExecutorQualifier(method);
// 3. 如果指定了線程池名稱,直接從 Spring 容器中獲取
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
// 4. 獲取默認的線程池
} else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
// 5. 包裝成 AsyncListenableTaskExecutor
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
總結: determineAsyncExecutor 的邏輯很清晰,如果指定了線程池名稱,則直接從 Spring 容器中獲取,如果沒有指定,則獲取默認的線程池。最后將其包裝成 AsyncListenableTaskExecutor。
至於默認線程池的設置也很簡單,如果 Spring 容器中設置了實現 TaskExecutor 接口的 bean,則直接返回;如果有多個,則返回名稱為 ”taskExecutor“ 的 TaskExecutor ;如果容器中沒有,則返回默認的 SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor 非常簡單,每次執行時都會創建一個新的線程執行任務。代碼如下:
// AsyncExecutionInterceptor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
// 1. 從Spring容器獲取TaskExecutor類型的bean,如果有多個,則選擇名稱為"taskExecutor"
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
// 2. 如果沒有定義,則選擇默認的SimpleAsyncTaskExecutor。
// SimpleAsyncTaskExecutor很簡單,每次執行時,都會創建一個新的線程執行任務
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
(2)doSubmit
doSubmit 處理結果的異步返回。
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 1. 異步執行。jdk CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
// 2. 結果回調。ListenableFuture
} else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
// 3. 同步返回。jdk Future
} else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
// 4. 不返回任務結果
} else {
executor.submit(task);
return null;
}
}
每天用心記錄一點點。內容也許不重要,但習慣很重要!