任務調度(02)Spring Schedule


任務調度(02)Spring Schedule

Spring 3.0 提供兩種任務調度方式:一是定時任務調度;二是異步任務調度。這兩種任務調度方式都是基於 JUC 實現的,是一種非常輕量級的任務調度方式。同時在 spring-context-support 中也整合了 Quartz,文本重點關注 Spring 提供了原生任務調度方式 - @EnableScheduling 和 @EnableAsync。

  1. 定時任務調度和異步任務調度的基本使用方法。
  2. 定時任務調度的源碼分析。ScheduledAnnotationBeanPostProcessor 會掃描標注有 @Scheduled 注解的方法,將其注冊到 ScheduledTaskRegistrar 中,真正調度任務的類是 ScheduledTaskRegistrar 中的TaskScheduler。TaskScheduler 實現類最底層是 ScheduledExecutorService,由 JUC 提供。
  3. 異步任務調度的源碼分析。異步執行和定時執行的最大區別是異步執行可能需要返回執行結果,所以需要對標注有 @Async 的類進行代理。異步任務有兩種執行模式:代理 或 Aspectj,無論那種方法都是以 Spring AOP 作為支撐。AsyncAnnotationBeanPostProcessor 首先創建 AsyncAnnotationAdvisor,如果類或方法上標注有 @Async 注解,則使用 proxyFactory 生成代理對象。

1. 基本用法

1.1 定時調度 - 基於xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    <bean id="scheduledXmlTest" class="ScheduledXmlTest"/>

    <!--定時調度@Scheduled-->
    <task:scheduler id="taskScheduler" pool-size="2"/>
    <task:scheduled-tasks scheduler="taskScheduler">
        <task:scheduled ref="scheduledXmlTest" method="fixedDelay" fixed-delay="2000"/>
        <task:scheduled ref="scheduledXmlTest" method="fixedRate" fixed-rate="2000"/>
        <task:scheduled ref="scheduledXmlTest" method="cron" cron="0/2 * * * * *"/>
    </task:scheduled-tasks>

    <!--異步調度@Async-->
    <task:executor id="executor" pool-size="2"/>
    <!--啟動注解驅動-->
    <task:annotation-driven executor="executor" scheduler="taskScheduler"/>
</beans>

總結: Spring 任務調度總共有 4 個標簽:

  1. task:scheduler:配置調度器,實現了 org.springframework.scheduling.TaskScheduler 接口,默認是 org.springframework.scheduling.TaskScheduler。
  2. task:scheduled-tasks:配置任務,如果不指定 scheduler 屬性,那么所有的任務都會在一個線程上執行,可能會造成線程阻塞。
  3. task:executor:異步任務調度的執行線程,實現 Executor 接口。
  4. task:annotation-driven:開啟注解驅動,包括 @Scheduled 和 @Async。其中 executor 為異步調度的執行線程池,scheduler 為定時調度的執行線程池。同樣不配置的話,所有的 @Scheduled 任務都是一個線程上執行。

1.2 定時調度 - 注解驅動

@EnableScheduling
public class ScheduledTest {
    private Logger logger = LoggerFactory.getLogger(ScheduledTest.class);
    public static void main(String[] args) {
        SpringApplication.run(ScheduledTest.class);
    }

    @Scheduled(fixedDelay = 2000)
    public void fixedDelay() {
        logger.info("fixedDelay");
    }

    @Scheduled(fixedRate = 2000)
    public void fixedRate() {
        logger.info("fixedRate");
    }

    @Scheduled(cron = "0/2 * * * * *")
    public void cron() {
        logger.info("cron");
    }
}

總結: fixedDelay、fixedRate、cron 都是定時的任務調度,那它們有什么區別呢?

  • fixedDelay:上一個任務執行結束和下一個任務開始的時間間隔。總時間間隔=方法執行時間 + fixedDelay。
  • fixedRate:上一個任務執行開始和下一個任務開始的時間間隔。
  • cron:類似 Linux crontab,如果任務執行時間太長,就會忽略過期的任務。

1.3 異步調度 - 注解驅動

@Async 可以標注在類或方法上,如果標注在類上則表示這個類的所有方法都是異步執行的。

@EnableAsync
//@EnableAsync(proxyTargetClass = false, mode = AdviceMode.PROXY)
public class AsyncTest {
    private static final Logger logger = LoggerFactory.getLogger(AsyncTest.class);
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(AsyncTest.class);

        AsyncTest asyncTest = context.getBean(AsyncTest.class);
        asyncTest.async();
        logger.info("main method");
    }

    @Async
    public void async() {
        logger.info("async method");
    }
}

日志如下:

INFO 2516 --- [           main] AsyncTest                                : main method
INFO 2516 --- [cTaskExecutor-1] AsyncTest                                : async method

總結: 可以發現這兩條日志輸出的線程不一致,而且是 main 線程先輸出日志,也就是 @Async 注解后,方法是異步執行的。

注意: @Async 標注在類上,則這類的所有方法都是異常執行。

接下來進行源碼分析。Spring 任務調度的源碼位於 spring-context 工程的 scheduling 包中,其目錄如下:

scheduling
	|- annotation	# @Scheduled和@Async注解支持
	|- concurrent	# juc擴展
	|- config		# 傳統的xml解析支持
	|- support		# cron解析支持

2. @EnableScheduling 源碼分析

Spring 定時調度流程圖
sequenceDiagram @EnableScheduling ->> ScheduledAnnotationBeanPostProcessor : @Import alt 步驟1:postProcessAfterInitialization ScheduledAnnotationBeanPostProcessor ->> @Scheduled : selectMethods ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleCronTask ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleFixedDelayTask ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleFixedRateTask end alt 步驟2:onApplicationEvent ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : setTaskScheduler ScheduledAnnotationBeanPostProcessor ->> ScheduledTaskRegistrar : scheduleTasks end

總結: 上述的步驟,有兩個重要的入口:

  1. 實現 BeanPostProcessor#postProcessAfterInitialization 接口,在每個 bean 實例化完成后,掃描 @Scheduled 注解,注冊到 ScheduledTaskRegistrar 中。
  2. 監聽 Spring 的容器初始化事件 onApplicationEvent(ContextRefreshedEvent event) 事件,回調 finishRegistration 方法設置 ScheduledTaskRegistrar 的 TaskScheduler。在配置 ScheduledTaskRegistrar 后,調用其 scheduleTasks 方法觸發任務調度。
  3. 真正調度任務的類是 TaskScheduler 實現類中的 ScheduledExecutorService,由 JUC 提供。
  4. 查找 TaskScheduler 的實現類實例默認是通過類型查找,若有多個實現則會查找名字為 "taskScheduler" 的實現 Bean,若沒有找到則在 ScheduledTaskRegistrar 調度任務的時候會創建一個 newSingleThreadScheduledExecutor , 將 TaskScheduler 的實現類實例設置到 ScheduledTaskRegistrar 屬性中。

2.1 @Scheduled 注解掃描

@Scheduled 注解的處理分兩步:一是 MethodIntrospector.selectMethods 掃描該 bean 的所有 @@Scheduled和@Schedules注解的方法;二是 processScheduled 方法處理有 @Scheduled 注解的方法。

(1)@Scheduled 掃描

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
        bean instanceof ScheduledExecutorService) {
        return bean;
    }

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass)) {
        // 1. 掃描每個方法上的@Scheduled和@Schedules注解
        Map<Method, Set<Scheduled>> annotatedMethods = 	
            MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) 
                method -> {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                        method, Scheduled.class, Schedules.class);
                    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
    	       });
        // 2. 調用processScheduled處理所有的@Scheduled注解的方法
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
        } else {
            annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> 
                            processScheduled(scheduled, method, bean)));
        }
    }
    return bean;
}

總結: 核心步驟就兩步:第一步是 MethodIntrospector.selectMethods 掃描所有方法的 @Scheduled 和@Schedules注解。至於第二步,最核心的處理則委托給了 processScheduled 方法處理。

(2)processScheduled

processScheduled 分別處理 @Scheduled 中的三種情況:cron、fixedDelay、fixedRate 三種場景,將其分別包裝成 CronTask、FixedDelayTask、FixedRateTask 后,注冊到 ScheduledTaskRegistrar 上,當調用其 scheduleTasks 方法觸發任務調度。

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        Runnable runnable = createRunnable(bean, method);
        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
        // Determine initial delay
        long initialDelay = scheduled.initialDelay();
        ...

        // 1. Check cron expression
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (StringUtils.hasLength(cron)) {
                if (!Scheduled.CRON_DISABLED.equals(cron)) {
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }
        }

        // 2. Check fixed delay
        long fixedDelay = scheduled.fixedDelay();
        if (fixedDelay >= 0) {
            tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
        }

        // 3. Check fixed rate
        long fixedRate = scheduled.fixedRate();
        if (fixedRate >= 0) {
            tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
        }
      
        // 4. Finally register the scheduled tasks
        synchronized (this.scheduledTasks) {
            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
            regTasks.addAll(tasks);
        }
    } catch (IllegalArgumentException ex) {
        throw new IllegalStateException();
    }
}

總結: processScheduled 代碼有刪減,只保留了核心的邏輯。processScheduled 代碼很簡單,接下來繼續分析定時任務觸發流程。

2.2 定時任務觸發

ScheduledAnnotationBeanPostProcessor 會通過 onApplicationEvent(ContextRefreshedEvent event) 監聽容器初始化事件。一旦容器初始化完成,就會試圖從容器中獲取 TaskScheduler。最終調用 scheduleTasks 觸發定時任務調度。

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (event.getApplicationContext() == this.applicationContext) {
        finishRegistration();
    }
}

private void finishRegistration() {
    // 1. 手動配置 TaskScheduler
    if (this.scheduler != null) {
        this.registrar.setScheduler(this.scheduler);
    }
	
    // 2. 自定義配置ScheduledTaskRegistrar。獲取容器中的SchedulingConfigurer
    if (this.beanFactory instanceof ListableBeanFactory) {
        Map<String, SchedulingConfigurer> beans =
            ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(configurers);
        for (SchedulingConfigurer configurer : configurers) {
            configurer.configureTasks(this.registrar);
        }
    }

    // 3. 如果沒有配置TaskScheduler,則從Spring容器中查找。查找規則如下:
    //    1) 查找TaskScheduler,如果有多個則查找名稱為"taskScheduler"
    //    2) 查找ScheduledExecutorService,如果有多個則查找名稱為"taskScheduler"
    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
        try {
            // Search for TaskScheduler bean...
            this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
        } catch (NoUniqueBeanDefinitionException ex) {
            try {
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
            } catch (NoSuchBeanDefinitionException ex2) {
            }
        } catch (NoSuchBeanDefinitionException ex) {
            // Search for ScheduledExecutorService bean next...
            try {
                this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
            } catch (NoUniqueBeanDefinitionException ex2) {
                try {
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                } catch (NoSuchBeanDefinitionException ex3) {
                }
            } catch (NoSuchBeanDefinitionException ex2) {
            }
        }
    }

    // 4. 觸發定時任務調度
    this.registrar.afterPropertiesSet();
}

總結: 一旦 Spring 容器初始化完成,就會觸發定時任務的執行。在 finishRegistration 方法中,主要是配置定時任務執行器,並啟動定時任務。

  1. TaskScheduler 配置:手動配置scheduler -> TaskScheduler -> TaskScheduler("taskScheduler") -> ScheduledExecutorService -> ScheduledExecutorService("taskScheduler")
  2. ScheduledTaskRegistrar 配置:自定義 SchedulingConfigurer。
  3. 觸發定時任務調度:調用 ScheduledTaskRegistrar#scheduleTasks。
  4. TaskScheduler 是定時任務的最終執行器,底層的實現類是 ScheduledExecutorService,由 JUC 提供。可以看到所有的 @Scheduled 注解的定時任務都會在同一個 TaskScheduler 中執行,不像 xml 一樣可以指定不同的 TaskScheduler。所以使用時要注意方法是否會阻塞,或將配置合適的 TaskScheduler 線程池大小。

2.3 TaskScheduler

TaskScheduler 是定時任務的最終執行器,底層的實現類是 ScheduledExecutorService,由 JUC 提供。

2.3.1 自定義 TaskScheduler

Spring Boot 自定義 TaskScheduler 如下:

@Bean("taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 核心線程數10:線程池創建時候初始化的線程數
    executor.setCorePoolSize(10);
    // 最大線程數20:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
    executor.setMaxPoolSize(20);
    // 緩沖隊列200:用來緩沖執行任務的隊列
    executor.setQueueCapacity(200);
    // 允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
    executor.setKeepAliveSeconds(60);
    // 線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
    executor.setThreadNamePrefix("taskExecutor-");
    // 線程池對拒絕任務的處理策略:這里采用了CallerRunsPolicy策略,
    // 當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;
    // 如果執行程序已關閉,則會丟棄該任務
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}

2.3.2 TaskScheduler接口

圖2 TaskScheduler接口

(1)schedule(Runnable task, Trigger trigger);

指定一個觸發器執行定時任務。可以使用 CronTrigger 來指定 Cron 表達式,執行定時任務

CronTrigger t = new CronTrigger("0 0 10,14,16 * * ?");
taskScheduler.schedule(this, t);

(2)schedule(Runnable task, Date startTime);

指定一個具體時間點執行定時任務,可以動態的指定時間,開啟任務。只執行一次。

(3)scheduleAtFixedRate(Runnable task, long period);

立即執行,循環任務,指定一個執行周期(毫秒計時)。

PS: 不管上一個周期是否執行完,到時間下個周期就開始執行。

(4)scheduleAtFixedRate(Runnable task, Date startTime, long period);

指定時間開始執行,循環任務,指定一個間隔周期(毫秒計時)。

PS: 不管上一個周期是否執行完,到時間下個周期就開始執行。

(5)scheduleWithFixedDelay(Runnable task, long delay);

立即執行,循環任務,指定一個間隔周期(毫秒計時)。

PS: 上一個周期執行完,等待delay時間,下個周期開始執行。

(6)scheduleWithFixedDelay(Runnable task, Date startTime, long delay);

指定時間開始執行,循環任務,指定一個間隔周期(毫秒計時)

2.3.3 TaskScheduler實現類

圖2 TaskScheduler實現類

TaskScheduler 有三個實現類。

(1)ConcurrentTaskScheduler

ConcurrentTaskScheduler 如果不設置 ScheduledExecutorService,則默認通過 Executors.newSingleThreadScheduledExecutor() 創建單線程池執行。

private ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler();
taskScheduler.schedule(task, new Date());

(2)DefaultManagedTaskScheduler

以當前線程執行任務,這是 ConcurrentTaskScheduler 的子類,添加了 JNDI 的支持。和ConcurrentTaskScheduler 一樣的用法,需要使用 JNDI 可以單獨設置

(3)ThreadPoolTaskScheduler

TaskScheduler 接口的默認實現類,多線程定時任務執行。可以設置執行線程池數(默認一個線程)。

3. @EnableAsync 源碼分析

Spring 異步任務調度,使用了動態代理( JDK 或 CGLIB),依賴 Spring Aop,支持動態代理或 ASPECTJ 兩種模式。

3.1 @EnableAsync

@EnableAsync 是注解驅動的入口,用於配置任務調度的模式。

@EnableAsync(proxyTargetClass = false, mode = AdviceMode.PROXY)
public class AsyncTest {
}

總結: @EnableAsync 注解有兩個重要的屬性:

  • mode:表示使用動態代理(PROXY時使用 JDK 或 CGLIB 代理)還是 ASPECTJ 方式(AdviceMode.ASPECTJ)。默認使用 AdviceMode.PROXY,也就是動態代理。如果使用 AdviceMode.ASPECTJ 模式需要引入 spring-aspectj 包。
  • proxyTargetClass :表示使用 JDK 代理(false)還是CGLIB 代理(true)。默認使用 false,也就是 JDK 動態代理。
  • annotation:表示自定義注解,默認 @Async。

3.2 @Async

@Async 可以標注在類或方法上,如果在類上,則表示這個類的所有方法都異步執行。

@Async("myExecuter")
public void async() {
    logger.info("async method");
}

總結: 和 @Scheduled 不同,@Async 可以指定執行的線程池,默認是執行器是 SimpleAsyncTaskExecutor,即每次執行時都會創建一個新的線程執行任務。可以從日志中看出不配置執行線程池的線程名稱(線程名SimpleAsyncTaskExecutor,日志輸出時線程名稱有截斷):

INFO 2516 --- [           main] AsyncTest                   : main method
INFO 2516 --- [cTaskExecutor-1] AsyncTest                   : async method

3.3 執行流程

@EnableAsync 通過 @Import 向 Spring 容器中注入 AsyncConfigurationSelector。上文提到 @EnableAsync 有兩種執行模式:動態代理或 Aspectj,AsyncConfigurationSelector 會根據配置動態注入 ProxyAsyncConfiguration 或 AspectJAsyncConfiguration(需要引入 spring-aspectj 包)。

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
        "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

總結: Spring 默認為 PROXY 代理模式,即使用 ProxyAsyncConfiguration。下文的源碼分析也是從 ProxyAsyncConfiguration 展開。

3.3.1 初始化流程

圖2 ProxyAsyncConfiguration 初始化流程
sequenceDiagram participant ProxyAsyncConfiguration participant AsyncConfigurer participant AsyncAnnotationBeanPostProcessor participant AsyncAnnotationAdvisor participant AbstractAdvisingBeanPostProcessor participant ProxyFactory ProxyAsyncConfiguration ->> AsyncConfigurer : executor/exceptionHandler ProxyAsyncConfiguration ->> AsyncAnnotationBeanPostProcessor : @Bean AsyncAnnotationBeanPostProcessor ->> AsyncAnnotationAdvisor: setBeanFactory AsyncAnnotationBeanPostProcessor ->> AbstractAdvisingBeanPostProcessor : postProcessAfterInitialization AbstractAdvisingBeanPostProcessor ->> ProxyFactory : prepareProxyFactory AbstractAdvisingBeanPostProcessor ->> ProxyFactory : addAdvisor AbstractAdvisingBeanPostProcessor ->> ProxyFactory : getProxy ProxyFactory -->> AsyncAnnotationBeanPostProcessor : proxy

總結: Spring 異步任務調度最核心的步驟是生成切面 AsyncAnnotationAdvisor,至於其它的執行邏輯則是依靠 Spring AOP 實現的。主要的核心類如下:

  1. ProxyAsyncConfiguration:配置類,向 Spring 容器中注入 AsyncConfigurer 和 AsyncAnnotationBeanPostProcessor。
  2. AsyncConfigurer:可以自定義異步執行的線程線程池(executor)和異常(exceptionHandler)處理機制。使用時,只需要將實現了 AsyncConfigurer 接口的類注入到容器中即可。
  3. AsyncAnnotationBeanPostProcessor:最核心類,主要完成兩件事:一是 AsyncAnnotationBeanPostProcessor 初始化時,會執行 setBeanFactory 方法初始化 AsyncAnnotationAdvisor,二是在 bean 初始化完成后生成代理類。Spring 異步任務調度實際上是依賴 Spring AOP 機制。
  4. AsyncAnnotationAdvisor:最核心類,包含 切入點(Pointcut)通知(Advice)
  5. AbstractAdvisingBeanPostProcessor(Spring AOP):在 bean 實現完成后,根據 ProxyFactory 生成代理類。
  6. ProxyFactory(Spring AOP):代理工廠。有兩種代理方式:JdkDynamicAopProxy 或 ObjenesisCglibAopProxy。

3.3.2 AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor 主要的工作是初始化 AsyncAnnotationAdvisor,這是 Spring 異步執行的基石。AsyncAnnotationAdvisor 相當於 Spring AOP 的 @Aspect 注解,定義了切入點(Pointcut)和通知(Advice)。

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);

        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }
}

總結: Spring 異步任務調度除了 AsyncAnnotationAdvisor 不一樣,其余的執行流程和 Spring AOP 一模一樣。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (this.advisor == null || bean instanceof AopInfrastructureBean) {
        return bean;
    }
    ...
	// 生成動態代理
    if (isEligible(bean, beanName)) {
        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
        if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
        }
        proxyFactory.addAdvisor(this.advisor);
        customizeProxyFactory(proxyFactory);
        return proxyFactory.getProxy(getProxyClassLoader());
    }
    return bean;	// No proxy needed.
}

總結: 首先判斷是否需要進行動態代理,如果類或方法上標注有 @Async 注解,則使用 proxyFactory 生成代理對象。

protected boolean isEligible(Class<?> targetClass) {
    Boolean eligible = this.eligibleBeans.get(targetClass);
    if (eligible != null) {
        return eligible;
    }
    if (this.advisor == null) {
        return false;
    }
    // 最終使用 advisor.putcut 判斷是否需要進行代理
    eligible = AopUtils.canApply(this.advisor, targetClass);
    this.eligibleBeans.put(targetClass, eligible);
    return eligible;
}

3.3.3 AsyncAnnotationAdvisor

AsyncAnnotationAdvisor 主要是定義了 AOP 執行的通知(Advice)和切入點(Pointcut)。

(1)構造器

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
	private Advice advice;
	private Pointcut pointcut;
    
    public AsyncAnnotationAdvisor(
			Supplier<Executor> executor, Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		} catch (ClassNotFoundException ex) {
		}
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}
}

總結: 從上也可以看出 Spring 除了支持 @Async 注解外,還支持 @Asynchronous 注解。另外就是創建通知(advice)和切面(pointcut)。

其實我們也大致可以猜到,AsyncAnnotationAdvisor 實際上將標注有 @Async 注解的方法丟到線程池中異步執行。pointcut 根據注解判斷是否需要異步執行,advice 則是具體的執行邏輯。

(2)buildPointcut

buildPointcut 建立切入點,也就是判斷是否需要異步執行。

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    ComposablePointcut result = null;
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
        // 匹配類上的注解
        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
        // 匹配方法上的注解
        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
        if (result == null) {
            result = new ComposablePointcut(cpc);
        } else {
            // union:表示所有的匹配規則只要有一個匹配上了就返回匹配成功
            result.union(cpc);
        }
        result = result.union(mpc);
    }
    return (result != null ? result : Pointcut.TRUE);
}

總結: buildPointcut 建立匹配規則,實際上就是只要類或方法上有一個匹配成功,就執行任務調度。

(3)buildAdvice

buildAdvice 方法包含異步執行的邏輯。

protected Advice buildAdvice(
    Supplier<Executor> executor, Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    interceptor.configure(executor, exceptionHandler);
    return interceptor;
}

總結: buildAdvice 方法很簡單,將異步執行的邏輯全部委托給了 AnnotationAsyncExecutionInterceptor 完成。

AnnotationAsyncExecutionInterceptor 繼承自 AsyncExecutionInterceptor(Spring AOP),具體的執行邏輯由 Spring AOP 完成。至此容器初始化時,代理已經生成完畢。

3.3.4 執行流程

Spring 異步任務執行的入口是 AnnotationAsyncExecutionInterceptor,繼承自 AsyncExecutionInterceptor。在 Spring AOP 執行時會回調其 invoke 方法。

@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
    // `1. 獲取最原始的方法 userDeclaredMethod,去除代理,橋接
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

    // 2. 根據方法獲取任務執行的線程池,@Async可以指定要執行的線程池名稱
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
        throw new IllegalStateException(");
    }

	// 3. 任務封裝成 Callable
    Callable<Object> task = () -> {
        try {
            // 執行任務
            Object result = invocation.proceed();
            if (result instanceof Future) {
                return ((Future<?>) result).get();
            }
        } catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        } catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };
	
	// 4. 提交任務 
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

總結: invoke 方法首先要獲取要異步執行的線程池,將任務封裝成一個 Callable 后,丟到該線程池中執行。

  1. 獲取任務執行的線程池:determineAsyncExecutor
  2. 任務執行並返回執行結果:doSubmit

(1)determineAsyncExecutor

determineAsyncExecutor 方法用於獲取任務將要在哪個線程池上執行,默認是 SimpleAsyncTaskExecutor。

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    // 1. 緩存中獲取
    AsyncTaskExecutor executor = this.executors.get(method);
    if (executor == null) {
        Executor targetExecutor;
        // 2. @Async 注解指定執行線程池名稱
        String qualifier = getExecutorQualifier(method);
        // 3. 如果指定了線程池名稱,直接從 Spring 容器中獲取
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        // 4. 獲取默認的線程池
        } else {
            targetExecutor = this.defaultExecutor.get();
        }
        if (targetExecutor == null) {
            return null;
        }
        // 5. 包裝成 AsyncListenableTaskExecutor
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                    (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    return executor;
}

總結: determineAsyncExecutor 的邏輯很清晰,如果指定了線程池名稱,則直接從 Spring 容器中獲取,如果沒有指定,則獲取默認的線程池。最后將其包裝成 AsyncListenableTaskExecutor。

至於默認線程池的設置也很簡單,如果 Spring 容器中設置了實現 TaskExecutor 接口的 bean,則直接返回;如果有多個,則返回名稱為 ”taskExecutor“ 的 TaskExecutor ;如果容器中沒有,則返回默認的 SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor 非常簡單,每次執行時都會創建一個新的線程執行任務。代碼如下:

// AsyncExecutionInterceptor
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
    // 1. 從Spring容器獲取TaskExecutor類型的bean,如果有多個,則選擇名稱為"taskExecutor"
    Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
    // 2. 如果沒有定義,則選擇默認的SimpleAsyncTaskExecutor。
    //    SimpleAsyncTaskExecutor很簡單,每次執行時,都會創建一個新的線程執行任務
    return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

(2)doSubmit

doSubmit 處理結果的異步返回。

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    // 1. 異步執行。jdk CompletableFuture
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            }
            catch (Throwable ex) {
                throw new CompletionException(ex);
            }
        }, executor);
    // 2. 結果回調。ListenableFuture
    } else if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    // 3. 同步返回。jdk Future
    } else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    // 4. 不返回任務結果
    } else {
        executor.submit(task);
        return null;
    }
}

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM