前提
最近的新項目和數據同步相關,有定時調度的需求。之前一直有使用過Quartz
、XXL-Job
、Easy Scheduler
等調度框架,后來越發覺得這些框架太重量級了,於是想到了Spring
內置的Scheduling
模塊。而原生的Scheduling
模塊只是內存態的調度模塊,不支持任務的持久化或者配置(配置任務通過@Scheduled
注解進行硬編碼,不能抽離到類之外),因此考慮理解Scheduling
模塊的底層原理,並且基於此造一個簡單的輪子,使之支持調度任務配置:通過配置文件或者JDBC
數據源。
Scheduling模塊
Scheduling
模塊是spring-context
依賴下的一個包org.springframework.scheduling
:
這個模塊的類並不多,有四個子包:
- 頂層包的定義了一些通用接口和異常。
org.springframework.scheduling.annotation
:定義了調度、異步任務相關的注解和解析類,常用的注解如@Async
、@EnableAsync
、@EnableScheduling
和@Scheduled
。org.springframework.scheduling.concurrent
:定義了調度任務執行器和相對應的FactoryBean
。org.springframework.scheduling.config
:定義了配置解析、任務具體實現類、調度任務XML
配置文件解析相關的解析類。org.springframework.scheduling.support
:定義了反射支持類、Cron
表達式解析器等工具類。
如果想單獨使用Scheduling
,只需要引入spring-context
這個依賴。但是現在流行使用SpringBoot
,引入spring-boot-starter-web
已經集成了spring-context
,可以直接使用Scheduling
模塊,筆者編寫本文的時候(2020-03-14
)SpringBoot
的最新版本為2.2.5.RELEASE
,可以選用此版本進行源碼分析或者生產應用:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
開啟Scheduling
模塊支持只需要在某一個配置類中添加@EnableScheduling
注解即可,一般為了明確模塊的引入,建議在啟動類中使用此注解,如:
@EnableScheduling
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Scheduling模塊的工作流程
這個圖描述了Scheduling
模塊的工作流程,這里分析一下非XML
配置下的流程(右邊的分支):
- 通過注解
@EnableScheduling
中的@Import
引入了SchedulingConfiguration
,而SchedulingConfiguration
中配置了一個類型為ScheduledAnnotationBeanPostProcessor
名稱為org.springframework.context.annotation.internalScheduledAnnotationProcessor
的Bean
,這里有個常見的技巧,Spring
內部加載的Bean
一般會定義名稱為internalXXX
,Bean
的role
會定義為ROLE_INFRASTRUCTURE = 2
。 Bean
后置處理器ScheduledAnnotationBeanPostProcessor
會解析和處理每一個符合特定類型的Bean
中的@Scheduled
注解(注意@Scheduled
只能使用在方法或者注解上),並且把解析完成的方法封裝為不同類型的Task
實例,緩存在ScheduledTaskRegistrar
中的。ScheduledAnnotationBeanPostProcessor
中的鈎子接口方法afterSingletonsInstantiated()
在所有單例初始化完成之后回調觸發,在此方法中設置了ScheduledTaskRegistrar
中的任務調度器(TaskScheduler
或者ScheduledExecutorService
類型)實例,並且調用ScheduledTaskRegistrar#afterPropertiesSet()
方法添加所有緩存的Task
實例到任務調度器中執行。
任務調度器
Scheduling
模塊支持TaskScheduler
或者ScheduledExecutorService
類型的任務調度器,而ScheduledExecutorService
其實是JDK
並發包java.util.concurrent
的接口,一般實現類就是調度線程池ScheduledThreadPoolExecutor
。實際上,ScheduledExecutorService
類型的實例最終會通過適配器模式轉變為ConcurrentTaskScheduler
,所以這里只需要分析TaskScheduler
類型的執行器。
ThreadPoolTaskScheduler
:基於線程池實現的任務執行器,這個是最常用的實現,底層依賴於ScheduledThreadPoolExecutor
實現。ConcurrentTaskScheduler
:TaskScheduler
接口和ScheduledExecutorService
接口的適配器,如果自定義一個ScheduledThreadPoolExecutor
類型的Bean
,那么任務執行器就會適配為ConcurrentTaskScheduler
。DefaultManagedTaskScheduler
:JDK7
引入的JSR-236
的支持,可以通過JNDI
配置此調度執行器,一般很少用到,底層也是依賴於ScheduledThreadPoolExecutor
實現。
也就是說,內置的三個調度器類型底層都依賴於JUC
調度線程池ScheduledThreadPoolExecutor
。這里分析一下頂層接口org.springframework.scheduling.TaskScheduler
提供的功能(筆者已經把功能一致的default
方法暫時移除):
// 省略一些功能一致的default方法
public interface TaskScheduler {
// 調度一個任務,通過觸發器實例指定觸發時間周期
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
// 指定起始時間調度一個任務 - 單次執行
ScheduledFuture<?> schedule(Runnable task, Date startTime);
// 指定固定頻率調度一個任務,period的單位是毫秒
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
// 指定起始時間和固定頻率調度一個任務,period的單位是毫秒
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
// 指定固定延遲間隔調度一個任務,delay的單位是毫秒
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);
// 指定起始時間和固定延遲間隔調度一個任務,delay的單位是毫秒
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
}
Task的分類
Scheduling
模塊中支持不同類型的任務,主要包括下面的3種(解析的優先順序也是如下):
Cron
表達式任務,支持通過Cron
表達式配置執行的周期,對應的任務類型為org.springframework.scheduling.config.CronTask
。- 固定延遲間隔任務,也就是上一輪執行完畢后間隔固定周期再執行本輪,依次類推,對應的的任務類型為
org.springframework.scheduling.config.FixedDelayTask
。 - 固定頻率任務,基於固定的間隔時間執行,不會理會上一輪是否執行完畢本輪會照樣執行,對應的的任務類型為
org.springframework.scheduling.config.FixedRateTask
。
關於這幾類Task
,舉幾個簡單的例子。CronTask
是通過cron
表達式指定執行周期的,並且不支持延遲執行,可以使用特殊字符-
禁用任務執行:
// 注解聲明式使用 - 每五秒執行一次,不支持initialDelay
@Scheduled(cron = "*/5 * * * * ?")
public void processTask(){
}
// 注解聲明式使用 - 禁止任務執行
@Scheduled(cron = "-")
public void processTask(){
}
// 編程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
CronTask cronTask = new CronTask(() -> {
System.out.println(String.format("[%s] - CronTask觸發...", F.format(LocalDateTime.now())));
}, "*/5 * * * * ?");
taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次執行輸出結果
[2020-03-16 01:07:00] - CronTask觸發...
[2020-03-16 01:07:05] - CronTask觸發...
......
FixedDelayTask
需要配置延遲間隔值(fixedDelay
或者fixedDelayString
)和可選的起始延遲執行時間(initialDelay
或者initialDelayString
),這里注意一點是fixedDelayString
和initialDelayString
都支持從EmbeddedValueResolver
(簡單理解為配置文件的屬性處理器)讀取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
// 注解聲明式使用 - 延遲一秒開始執行,延遲間隔為5秒
@Scheduled(fixedDelay = 5000, initialDelay = 1000)
public void process(){
}
// 注解聲明式使用 - spring-boot配置文件中process.task.fixedDelay=5000 process.task.initialDelay=1000
@Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 編程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> {
System.out.println(String.format("[%s] - FixedDelayTask觸發...", F.format(LocalDateTime.now())));
}, 5000, 1000);
Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay());
taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次執行輸出結果
[2020-03-16 01:06:12] - FixedDelayTask觸發...
[2020-03-16 01:06:17] - FixedDelayTask觸發...
......
FixedRateTask
需要配置固定間隔值(fixedRate
或者fixedRateString
)和可選的起始延遲執行時間(initialDelay
或者initialDelayString
),這里注意一點是fixedRateString
和initialDelayString
都支持從EmbeddedValueResolver
(簡單理解為配置文件的屬性處理器)讀取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
// 注解聲明式使用 - 延遲一秒開始執行,每隔5秒執行一次
@Scheduled(fixedRate = 5000, initialDelay = 1000)
public void processTask(){
}
// 注解聲明式使用 - spring-boot配置文件中process.task.fixedRate=5000 process.task.initialDelay=1000
@Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}")
public void process(){
}
// 編程式使用
public class Tasks {
static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();
FixedRateTask fixedRateTask = new FixedRateTask(() -> {
System.out.println(String.format("[%s] - FixedRateTask觸發...", F.format(LocalDateTime.now())));
}, 5000, 1000);
Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay());
taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval());
Thread.sleep(Integer.MAX_VALUE);
}
}
// 某次執行輸出結果
[2020-03-16 23:58:25] - FixedRateTask觸發...
[2020-03-16 23:58:30] - FixedRateTask觸發...
......
簡單分析核心流程的源代碼
在SpringBoot
注解體系下,Scheduling
模塊的所有邏輯基本在ScheduledAnnotationBeanPostProcessor
和ScheduledTaskRegistrar
中。一般來說,一個類實現的接口代表了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor
實現的接口:
ScheduledTaskHolder
接口:返回Set<ScheduledTask>
,表示持有的所有任務實例。MergedBeanDefinitionPostProcessor
接口:Bean
定義合並時回調,預留空實現,暫時不做任何處理。BeanPostProcessor
接口:也就是MergedBeanDefinitionPostProcessor
的父接口,Bean
實例初始化前后分別回調,其中,后回調的postProcessAfterInitialization()
方法就是用於解析@Scheduled
和裝載ScheduledTask
,需要重點關注此方法的邏輯。DestructionAwareBeanPostProcessor
接口:具體的Bean
實例銷毀的時候回調,用於Bean
實例銷毀的時候移除和取消對應的任務實例。Ordered
接口:用於Bean
加載時候的排序,主要是改變ScheduledAnnotationBeanPostProcessor
在BeanPostProcessor
執行鏈中的順序。EmbeddedValueResolverAware
接口:回調StringValueResolver
實例,用於解析帶占位符的環境變量屬性值。BeanNameAware
接口:回調BeanName
。BeanFactoryAware
接口:回調BeanFactory
實例,具體是DefaultListableBeanFactory
,也就是熟知的IOC
容器。ApplicationContextAware
接口:回調ApplicationContext
實例,也就是熟知的Spring
上下文,它是IOC
容器的門面,同時是事件廣播器、資源加載器的實現等等。SmartInitializingSingleton
接口:所有單例實例化完畢之后回調,作用是在持有的applicationContext
為NULL
的時候開始調度所有加載完成的任務,這個鈎子接口十分有用,筆者常用它做一些資源初始化工作。ApplicationListener
接口:監聽Spring
應用的事件,具體是ApplicationListener<ContextRefreshedEvent>
,監聽上下文刷新的事件,如果事件中攜帶的ApplicationContext
實例和ApplicationContextAware
回調的ApplicationContext
實例一致,那么在此監聽回調方法中開始調度所有加載完成的任務,也就是在ScheduledAnnotationBeanPostProcessor
這個類中,SmartInitializingSingleton
接口的實現和ApplicationListener
接口的實現邏輯是互斥的。DisposableBean
接口:當前Bean
實例銷毀時候回調,也就是ScheduledAnnotationBeanPostProcessor
自身被銷毀的時候回調,用於取消和清理所有的ScheduledTask
。
上面分析的鈎子接口在SpringBoot體系中可以按需使用,了解回調不同鈎子接口的回調時機,可以在特定時機完成達到理想的效果。
@Scheduled
注解的解析集中在postProcessAfterInitialization()
方法:
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三種類型的Bean
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 獲取Bean的用戶態類型,例如Bean有可能被CGLIB增強,這個時候要取其父類
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// nonAnnotatedClasses存放着不存在@Scheduled注解的類型,緩存起來避免重復判斷它是否攜帶@Scheduled注解的方法
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 因為JDK8之后支持重復注解,因此獲取具體類型中Method -> @Scheduled的集合,也就是有可能一個方法使用多個@Scheduled注解,最終會封裝為多個Task
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
// 解析到類型中不存在@Scheduled注解的方法添加到nonAnnotatedClasses緩存
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Method -> @Scheduled的集合遍歷processScheduled()方法進行登記
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
processScheduled(Scheduled scheduled, Method method, Object bean)
就是具體的注解解析和Task
封裝的方法:
// Runnable適配器 - 用於反射調用具體的方法,觸發任務方法執行
public class ScheduledMethodRunnable implements Runnable {
private final Object target;
private final Method method;
public ScheduledMethodRunnable(Object target, Method method) {
this.target = target;
this.method = method;
}
....// 省略無關代碼
// 這個就是最終的任務方法執行的核心方法,抑制修飾符,然后反射調用
@Override
public void run() {
try {
ReflectionUtils.makeAccessible(this.method);
this.method.invoke(this.target);
}
catch (InvocationTargetException ex) {
ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
}
catch (IllegalAccessException ex) {
throw new UndeclaredThrowableException(ex);
}
}
}
// 通過方法所在Bean實例和方法封裝Runnable適配器ScheduledMethodRunnable實例
protected Runnable createRunnable(Object target, Method method) {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}
// 這個方法十分長,不過邏輯並不復雜,它只做了四件事
// 0. 解析@Scheduled中的initialDelay、initialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行
// 1. 優先解析@Scheduled中的cron屬性,封裝為CronTask,通過ScheduledTaskRegistrar進行緩存
// 2. 解析@Scheduled中的fixedDelay、fixedDelayString屬性,封裝為FixedDelayTask,通過ScheduledTaskRegistrar進行緩存
// 3. 解析@Scheduled中的fixedRate、fixedRateString屬性,封裝為FixedRateTask,通過ScheduledTaskRegistrar進行緩存
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
// 通過方法宿主Bean和目標方法封裝Runnable適配器ScheduledMethodRunnable實例
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
// 緩存已經裝載的任務
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
// 解析初始化延遲執行時間,initialDelayString支持占位符配置,如果initialDelayString配置了,會覆蓋initialDelay的值
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
// 解析時區zone的值,支持支持占位符配置,判斷cron是否存在,存在則裝載為CronTask
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
// 此方法雖然表面上是調度CronTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中
// 返回的任務實例添加到宿主Bean的緩存中,然后最后會放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
// 修正小於0的初始化延遲執行時間值為0
if (initialDelay < 0) {
initialDelay = 0;
}
// 解析fixedDelay和fixedDelayString,如果同時配置,fixedDelayString最終解析出來的整數值會覆蓋fixedDelay,封裝為FixedDelayTask
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
// 此方法雖然表面上是調度FixedDelayTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中
// 返回的任務實例添加到宿主Bean的緩存中,然后最后會放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// 解析fixedRate和fixedRateString,如果同時配置,fixedRateString最終解析出來的整數值會覆蓋fixedRate,封裝為FixedRateTask
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
// 此方法雖然表面上是調度FixedRateTask,實際上由於ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中
// 返回的任務實例添加到宿主Bean的緩存中,然后最后會放入宿主Bean -> List<ScheduledTask>映射中
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
// 注冊所有任務實例,這個映射Key為宿主Bean實例,Value為List<ScheduledTask>,后面用於調度所有注冊完成的任務
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
總的來說,這個方法做了四件事:
- 解析
@Scheduled
中的initialDelay
、initialDelayString
屬性,適用於FixedDelayTask
或者FixedRateTask
的延遲執行。 - 優先解析
@Scheduled
中的cron
屬性,封裝為CronTask
,通過ScheduledTaskRegistrar
進行緩存。 - 解析
@Scheduled
中的fixedDelay
、fixedDelayString
屬性,封裝為FixedDelayTask
,通過ScheduledTaskRegistrar
進行緩存。 - 解析
@Scheduled
中的fixedRate
、fixedRateString
屬性,封裝為FixedRateTask
,通過ScheduledTaskRegistrar
進行緩存。
@Scheduled
修飾的某個方法如果同時配置了cron
、fixedDelay|fixedDelayString
和fixedRate|fixedRateString
屬性,意味着此方法同時封裝為三種任務CronTask
、FixedDelayTask
和FixedRateTask
。解析xxString
值的使用,用到了EmbeddedValueResolver
解析字符串的值,支持占位符,這樣可以直接獲取環境配置中的占位符屬性(基於SPEL
的特性,甚至可以支持嵌套占位符)。解析成功的所有任務實例存放在ScheduledAnnotationBeanPostProcessor
的一個映射scheduledTasks
中:
// 宿主Bean實例 -> 解析完成的任務實例Set
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
解析和緩存工作完成之后,接着分析最終激活所有調度任務的邏輯,見互斥方法afterSingletonsInstantiated()
和onApplicationEvent()
,兩者中一定只有一個方法能夠調用finishRegistration()
:
// 所有單例實例化完畢之后回調
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
this.nonAnnotatedClasses.clear();
if (this.applicationContext == null) {
// Not running in an ApplicationContext -> register tasks early...
finishRegistration();
}
}
// 上下文刷新完成之后回調
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
// Running in an ApplicationContext -> register tasks this late...
// giving other ContextRefreshedEvent listeners a chance to perform
// their work at the same time (e.g. Spring Batch's job registration).
finishRegistration();
}
}
//
private void finishRegistration() {
// 如果持有的scheduler對象不為null則設置ScheduledTaskRegistrar中的任務調度器
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
// 這個判斷一般會成立,得到的BeanFactory就是DefaultListableBeanFactory
if (this.beanFactory instanceof ListableBeanFactory) {
// 獲取所有的調度配置器SchedulingConfigurer實例,並且都回調configureTasks()方法,這個很重要,它是用戶動態裝載調取任務的擴展鈎子接口
Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
// SchedulingConfigurer實例列表排序
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// 下面這一大段邏輯都是為了從BeanFactory取出任務調度器實例,主要判斷TaskScheduler或者ScheduledExecutorService類型的Bean,包括嘗試通過類型或者名字獲取
// 獲取成功后設置到ScheduledTaskRegistrar中
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
logger.trace("Could not find unique TaskScheduler bean", ex);
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.trace("Could not find default TaskScheduler bean", ex);
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
logger.trace("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
// 調用ScheduledTaskRegistrar的afterPropertiesSet()方法,裝載所有的調度任務
this.registrar.afterPropertiesSet();
}
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {
// 省略其他代碼.........
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
// 裝載所有調度任務
@SuppressWarnings("deprecation")
protected void scheduleTasks() {
// 這里注意一點,如果找不到任務調度器實例,那么會用單個線程調度所有任務
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// 調度所有裝載完畢的自定義觸發器的任務實例
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
// 調度所有裝載完畢的CronTask
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
// 調度所有裝載完畢的FixedRateTask
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
// 調度所有裝載完畢的FixedDelayTask
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
// 省略其他代碼.........
}
注意兩個個問題:
- 如果沒有配置
TaskScheduler
或者ScheduledExecutorService
類型的Bean
,那么調度模塊只會創建一個線程去調度所有裝載完畢的任務,如果任務比較多,執行密度比較大,很有可能會造成大量任務飢餓,表現為存在部分任務不會觸發調度的場景(這個是調度模塊生產中經常遇到的故障,需要重點排查是否沒有設置TaskScheduler
或者ScheduledExecutorService
)。 SchedulingConfigurer
是調度模塊提供給使用的進行擴展的鈎子接口,用於在激活所有調度任務之前回調ScheduledTaskRegistrar
實例,只要拿到ScheduledTaskRegistrar
實例,我們就可以使用它注冊和裝載新的Task
。
調度任務動態裝載
Scheduling
模塊本身已經支持基於NamespaceHandler
支持通過XML
文件配置調度任務,但是筆者一直認為XML
給人的感覺太"重",使用起來顯得太笨重,這里打算擴展出JSON
文件配置和基於JDBC
數據源配置(也就是持久化任務,這里選用MySQL
)。根據前文的源碼分析,需要用到SchedulingConfigurer
接口的實現,用於在所有調度任務觸發之前從外部添加自定義的調度任務。先定義調度任務的一些配置屬性類:
// 調度任務類型枚舉
@Getter
@RequiredArgsConstructor
public enum ScheduleTaskType {
CRON("CRON"),
FIXED_DELAY("FIXED_DELAY"),
FIXED_RATE("FIXED_RATE"),
;
private final String type;
}
// 調度任務配置,enable屬性為全局開關
@Data
public class ScheduleTaskProperties {
private Long version;
private Boolean enable;
private List<ScheduleTasks> tasks;
}
// 調度任務集合,筆者設計的時候采用一個宿主類中每個獨立方法都是一個任務實例的模式
@Data
public class ScheduleTasks {
// 這里故意叫Klass代表Class,避免關鍵字沖突
private String taskHostKlass;
private Boolean enable;
private List<ScheduleTaskMethod> taskMethods;
}
// 調度任務方法 - enable為任務開關,沒有配置會被ScheduleTaskProperties或者ScheduleTasks中的enable覆蓋
@Data
public class ScheduleTaskMethod {
private Boolean enable;
private String taskDescription;
private String taskMethod;
// 時區,cron的計算需要用到
private String timeZone;
private String cronExpression;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
}
設計的時候,考慮到多個任務執行方法可以放在同一個宿主類,這樣可以方便同一種類的任務進行統一管理,如:
public class TaskHostClass {
public void task1() {
}
public void task2() {
}
......
public void taskN() {
}
}
細節方面,intervalMilliseconds
和initialDelayMilliseconds
的單位設計為毫秒,使用字符串形式,方便可以基於StringValueResolver
解析配置文件中的屬性配置。添加一個抽象的SchedulingConfigurer
:
@Slf4j
public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware,
EmbeddedValueResolverAware {
@Getter
private StringValueResolver embeddedValueResolver;
private ConfigurableBeanFactory configurableBeanFactory;
private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList();
private final Set<String> tasksLoaded = Sets.newHashSet();
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;
}
@Override
public void afterPropertiesSet() throws Exception {
internalTasks.clear();
internalTasks.addAll(loadTaskProperties());
}
@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
embeddedValueResolver = resolver;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
for (InternalTaskProperties task : internalTasks) {
try {
synchronized (tasksLoaded) {
String key = task.taskHostKlass() + "#" + task.taskMethod();
// 避免重復加載
if (!tasksLoaded.contains(key)) {
if (task instanceof CronTaskProperties) {
loadCronTask((CronTaskProperties) task, taskRegistrar);
}
if (task instanceof FixedDelayTaskProperties) {
loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar);
}
if (task instanceof FixedRateTaskProperties) {
loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar);
}
tasksLoaded.add(key);
} else {
log.info("調度任務已經裝載,任務宿主類:{},任務執行方法:{}", task.taskHostKlass(), task.taskMethod());
}
}
} catch (Exception e) {
throw new IllegalStateException(String.format("加載調度任務異常,任務宿主類:%s,任務執行方法:%s",
task.taskHostKlass(), task.taskMethod()), e);
}
}
}
private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception {
Class<?> klass = ClassUtils.forName(taskHostKlass, null);
Object target = configurableBeanFactory.getBean(klass);
Method method = ReflectionUtils.findMethod(klass, taskMethod);
if (null == method) {
throw new IllegalArgumentException(String.format("找不到目標方法,任務宿主類:%s,任務執行方法:%s", taskHostKlass, taskMethod));
}
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}
private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression());
if (null != cronExpression) {
String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone());
TimeZone timeZone;
if (null != timeZoneString) {
timeZone = TimeZone.getTimeZone(timeZoneString);
} else {
timeZone = TimeZone.getDefault();
}
CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone));
taskRegistrar.addCronTask(cronTask);
log.info("裝載CronTask[{}#{}()]成功,cron表達式:{},任務描述:{}", cronExpression, pops.taskMethod(),
pops.cronExpression(), pops.taskDescription());
}
}
private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds);
taskRegistrar.addFixedDelayTask(fixedDelayTask);
log.info("裝載FixedDelayTask[{}#{}()]成功,固定延遲間隔:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(),
pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription());
}
private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds);
taskRegistrar.addFixedRateTask(fixedRateTask);
log.info("裝載FixedRateTask[{}#{}()]成功,固定執行頻率:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(),
pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription());
}
private long parseDelayAsLong(String value) {
if (null == value) {
return 0L;
}
if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) {
return Duration.parse(value).toMillis();
}
return Long.parseLong(value);
}
private boolean isP(char ch) {
return (ch == 'P' || ch == 'p');
}
/**
* 加載任務配置,預留給子類實現
*/
protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception;
interface InternalTaskProperties {
String taskHostKlass();
String taskMethod();
String taskDescription();
}
@Builder
protected static class CronTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String cronExpression;
private String taskDescription;
private String timeZone;
@Override
public String taskDescription() {
return taskDescription;
}
public String cronExpression() {
return cronExpression;
}
public String timeZone() {
return timeZone;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
@Builder
protected static class FixedDelayTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
private String taskDescription;
@Override
public String taskDescription() {
return taskDescription;
}
public String initialDelayMilliseconds() {
return initialDelayMilliseconds;
}
public String intervalMilliseconds() {
return intervalMilliseconds;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
@Builder
protected static class FixedRateTaskProperties implements InternalTaskProperties {
private String taskHostKlass;
private String taskMethod;
private String intervalMilliseconds;
private String initialDelayMilliseconds;
private String taskDescription;
@Override
public String taskDescription() {
return taskDescription;
}
public String initialDelayMilliseconds() {
return initialDelayMilliseconds;
}
public String intervalMilliseconds() {
return intervalMilliseconds;
}
@Override
public String taskHostKlass() {
return taskHostKlass;
}
@Override
public String taskMethod() {
return taskMethod;
}
}
}
loadTaskProperties()
方法用於加載任務配置,留給子類實現。
JSON配置
JSON
配置文件的格式如下(類路徑下的scheduling/tasks.json
文件):
{
"version": 1,
"tasks": [
{
"taskKlass": "club.throwable.schedule.Tasks",
"taskMethods": [
{
"taskType": "FIXED_DELAY",
"taskDescription": "processTask1任務",
"taskMethod": "processTask1",
"intervalMilliseconds": "5000"
}
]
}
]
}
每個層級都有一個enable
屬性,默認為true
,只有強制指定為false
的時候才不會裝載對應的任務調度方法。這里就是簡單繼承AbstractSchedulingConfigurer
,實現從類路徑加載配置的邏輯,定義JsonSchedulingConfigurer
:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer {
// 這里把默認的任務配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通過配置項scheduling.json.config.location進行覆蓋
@Value("${scheduling.json.config.location:scheduling/tasks.json}")
private String location;
@Autowired
private ObjectMapper objectMapper;
@Override
protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
ClassPathResource resource = new ClassPathResource(location);
String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class);
if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) {
return Lists.newArrayList();
}
List<InternalTaskProperties> target = Lists.newArrayList();
for (ScheduleTasks tasks : properties.getTasks()) {
if (null != tasks) {
List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods();
if (null != taskMethods) {
for (ScheduleTaskMethod taskMethod : taskMethods) {
if (!Boolean.FALSE.equals(taskMethod.getEnable())) {
if (ScheduleTaskType.CRON == taskMethod.getTaskType()) {
target.add(CronTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.cronExpression(taskMethod.getCronExpression())
.timeZone(taskMethod.getTimeZone())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) {
target.add(FixedDelayTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.intervalMilliseconds(taskMethod.getIntervalMilliseconds())
.initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) {
target.add(FixedRateTaskProperties.builder()
.taskMethod(taskMethod.getTaskMethod())
.intervalMilliseconds(taskMethod.getIntervalMilliseconds())
.initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
.taskDescription(taskMethod.getTaskDescription())
.taskHostKlass(tasks.getTaskKlass())
.build());
}
}
}
}
}
}
return target;
}
}
添加一個配置類和任務類:
@Configuration
public class SchedulingAutoConfiguration {
@Bean
public JsonSchedulingConfigurer jsonSchedulingConfigurer(){
return new JsonSchedulingConfigurer();
}
}
// club.throwable.schedule.Tasks
@Slf4j
@Component
public class Tasks {
public void processTask1() {
log.info("processTask1觸發..........");
}
}
啟動SpringBoot
應用,某次執行的部分日志如下:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 裝載FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延遲間隔:5000 ms,初始延遲執行時間:0 ms,任務描述:processTask1任務
2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發..........
2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發..........
2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發..........
......
這里有些細節沒有完善,例如配置文件參數的一些非空判斷、配置值是否合法等等校驗邏輯沒有做,如果要設計成一個工業級的類庫,這些方面必須要考慮。
JDBC數據源配置
JDBC
數據源這里用MySQL
舉例說明,先建一個調度任務配置表schedule_task
:
CREATE TABLE `schedule_task`
(
id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
editor VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '修改者',
creator VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '創建者',
deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '軟刪除標識',
task_host_class VARCHAR(256) NOT NULL COMMENT '任務宿主類全類名',
task_method VARCHAR(128) NOT NULL COMMENT '任務執行方法名',
task_type VARCHAR(16) NOT NULL COMMENT '任務類型',
task_description VARCHAR(64) NOT NULL COMMENT '任務描述',
cron_expression VARCHAR(128) COMMENT 'cron表達式',
time_zone VARCHAR(32) COMMENT '時區',
interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '執行間隔時間',
initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延遲執行時間',
UNIQUE uniq_class_method (task_host_class, task_method)
) COMMENT '調度任務配置表';
其實具體的做法和JSON
配置差不多,先引入spring-boot-starter-jdbc
,接着編寫MysqlSchedulingConfigurer
:
// DAO
@RequiredArgsConstructor
public class MysqlScheduleTaskDao {
private final JdbcTemplate jdbcTemplate;
private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> {
List<ScheduleTask> tasks = Lists.newArrayList();
while (r.next()) {
ScheduleTask task = new ScheduleTask();
tasks.add(task);
task.setId(r.getLong("id"));
task.setCronExpression(r.getString("cron_expression"));
task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds"));
task.setIntervalMilliseconds(r.getLong("interval_milliseconds"));
task.setTimeZone(r.getString("time_zone"));
task.setTaskDescription(r.getString("task_description"));
task.setTaskHostClass(r.getString("task_host_class"));
task.setTaskMethod(r.getString("task_method"));
task.setTaskType(r.getString("task_type"));
}
return tasks;
};
public List<ScheduleTask> selectAllTasks() {
return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI);
}
}
// MysqlSchedulingConfigurer
@RequiredArgsConstructor
public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer {
private final MysqlScheduleTaskDao mysqlScheduleTaskDao;
@Override
protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
List<InternalTaskProperties> target = Lists.newArrayList();
List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks();
if (!tasks.isEmpty()) {
for (ScheduleTask task : tasks) {
ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType());
if (ScheduleTaskType.CRON == scheduleTaskType) {
target.add(CronTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.cronExpression(task.getCronExpression())
.timeZone(task.getTimeZone())
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) {
target.add(FixedDelayTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
.initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) {
target.add(FixedRateTaskProperties.builder()
.taskMethod(task.getTaskMethod())
.intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
.initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
.taskDescription(task.getTaskDescription())
.taskHostKlass(task.getTaskHostClass())
.build());
}
}
}
return target;
}
}
記得引入spring-boot-starter-jdbc
和mysql-connector-java
並且激活MysqlSchedulingConfigurer
配置。插入一條記錄:
INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '測試任務', NULL, NULL, 10000, 5000);
然后啟動服務,某次執行的輸出:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發..........
2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發..........
....
混合配置
有些時候我們希望可以JSON
配置和JDBC
數據源配置進行混合配置,或者動態二選一以便靈活應對多環境的場景(例如要在開發環境使用JSON
配置而測試和生產環境使用JDBC
數據源配置,甚至可以將JDBC
數據源配置覆蓋JSON
配置,這樣能保證總是傾向於使用JDBC
數據源配置),這樣需要對前面兩小節的實現加多一層抽象。這里的設計可以參考SpringMVC
中的控制器參數解析器的設計,具體是HandlerMethodArgumentResolverComposite
,其實道理是相同的。
其他注意事項
在生產實踐中,暫時不考慮生成任務執行日志和細粒度的監控,着重做了兩件事:
- 並發控制,(多服務節點下)禁止任務並發執行。
- 跟蹤任務的日志軌跡。
解決並發執行問題
一般情況下,我們需要禁止任務並發執行,考慮引入Redisson
提供的分布式鎖:
// 引入依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>最新版本</version>
</dependency>
// 配置類
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedissonAutoConfiguration {
@Autowired
private RedisProperties redisProperties;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort()));
if (redisProperties.getDatabase() > 0) {
singleServerConfig.setDatabase(redisProperties.getDatabase());
}
if (null != redisProperties.getPassword()) {
singleServerConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}
// 分布式鎖工廠
@Component
public class DistributedLockFactory {
private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:";
@Autowired
private RedissonClient redissonClient;
public DistributedLock provideDistributedLock(String lockKey) {
String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey;
return new RedissonDistributedLock(redissonClient, lockPath);
}
}
這里考慮到項目依賴了spring-boot-starter-redis
,直接復用了它的配置屬性類(RedissonDistributedLock
是RLock
的輕量級封裝,見附錄)。使用方式如下:
@Autowired
private DistributedLockFactory distributedLockFactory;
public void task1() {
DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey);
// 等待時間為20秒,持有鎖的最大時間為60秒
boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS);
if (tryLock) {
try {
// 業務邏輯
}finally {
lock.unlock();
}
}
}
引入MDC跟蹤任務的Trace
MDC
其實是Mapped Diagnostic Context
的縮寫,也就是映射診斷上下文,一般用於日志框架里面同一個線程執行過程的跟蹤(例如一個線程跑過了多個方法,各個方法里面都打印了日志,那么通過MDC
可以對整個調用鏈通過一個唯一標識關聯起來),例如這里選用slf4j
提供的org.slf4j.MDC
:
@Component
public class MappedDiagnosticContextAssistant {
/**
* 在MDC中執行
*
* @param runnable runnable
*/
public void processInMappedDiagnosticContext(Runnable runnable) {
String uuid = UUID.randomUUID().toString();
MDC.put("TRACE_ID", uuid);
try {
runnable.run();
} finally {
MDC.remove("TRACE_ID");
}
}
}
任務執行的時候需要包裹成一個Runnale
實例:
public void task1() {
mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
StopWatch watch = new StopWatch();
watch.start();
log.info("開始執行......");
// 業務邏輯
watch.stop();
log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis());
});
}
結合前面一節提到的並發控制,那么最終執行的任務方法如下:
public void task1() {
mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
StopWatch watch = new StopWatch();
watch.start();
log.info("開始執行......");
scheduleTaskAssistant.executeInDistributedLock("任務分布式鎖KEY", () -> {
// 真實的業務邏輯
});
watch.stop();
log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis());
});
}
這里的方法看起來比較別扭,其實可以直接在任務裝載的時候基於分布式鎖和MDC
進行封裝,方式類似於ScheduledMethodRunnable
,這里不做展開,因為要詳細展開篇幅可能比較大(ScheduleTaskAssistant
見附錄)。
小結
其實spring-context
整個調度模塊完全依賴於TaskScheduler
實現,更底層的是JUC
調度線程池ScheduledThreadPoolExecutor
。如果想要從底層原理理解整個調度模塊的運行原理,那么就一定要分析ScheduledThreadPoolExecutor
的實現。整篇文章大致介紹了spring-context
調度模塊的加載調度任務的流程,並且基於擴展接口SchedulingConfigurer
擴展出多種自定義配置調度任務的方式,但是考慮到需要在生產環境中運行,那么免不了需要考慮監控、並發控制、日志跟蹤等等的功能,但是這樣就會使得整個調度模塊變重,慢慢地就會發現,這個輪子越造越大,越有主流調度框架Quartz
或者Easy Scheduler
的影子。筆者認為,軟件工程,有些時候要權衡取舍,該拋棄的就應該果斷拋棄,否則總是負重而行,還能走多遠?
參考資料:
SpringBoot
源碼
附錄
ScheduleTaskAssistant
:
@RequiredArgsConstructor
@Component
public class ScheduleTaskAssistant {
/**
* 5秒
*/
public static final long DEFAULT_WAIT_TIME = 5L;
/**
* 30秒
*/
public static final long DEFAULT_LEAVE_TIME = 30L;
private final DistributedLockFactory distributedLockFactory;
/**
* 在分布式鎖中執行
*
* @param waitTime 鎖等着時間
* @param leaveTime 鎖持有時間
* @param timeUnit 時間單位
* @param lockKey 鎖的key
* @param task 任務對象
*/
public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) {
DistributedLock lock = distributedLockFactory.dl(lockKey);
boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit);
if (tryLock) {
try {
long waitTimeMillis = timeUnit.toMillis(waitTime);
long start = System.currentTimeMillis();
task.run();
long end = System.currentTimeMillis();
long cost = end - start;
// 預防鎖過早釋放
if (cost < waitTimeMillis) {
Sleeper.X.sleep(waitTimeMillis - cost);
}
} finally {
lock.unlock();
}
}
}
/**
* 在分布式鎖中執行 - 使用默認時間
*
* @param lockKey 鎖的key
* @param task 任務對象
*/
public void executeInDistributedLock(String lockKey, Runnable task) {
executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task);
}
}
RedissonDistributedLock
:
@Slf4j
public class RedissonDistributedLock implements DistributedLock {
private final RedissonClient redissonClient;
private final String lockPath;
private final RLock internalLock;
RedissonDistributedLock(RedissonClient redissonClient, String lockPath) {
this.redissonClient = redissonClient;
this.lockPath = lockPath;
this.internalLock = initInternalLock();
}
private RLock initInternalLock() {
return redissonClient.getLock(lockPath);
}
@Override
public boolean isLock() {
return internalLock.isLocked();
}
@Override
public boolean isHeldByCurrentThread() {
return internalLock.isHeldByCurrentThread();
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
internalLock.lock(leaseTime, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
try {
return internalLock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e);
}
}
@Override
public void unlock() {
try {
internalLock.unlock();
} catch (IllegalMonitorStateException ex) {
log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex);
}
}
}
(本文完 c-7-d e-a-20200324 真是有點滑稽,筆者發現任務持久化最好還是用現成的工業級調度器,於是基於Quartz做了輕量級封裝,寫了個后台管理界面,且聽下回分解)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):