在單體應用中需要一個使用簡單性能可靠的調度功能,要求可以通過Cron表達式配置觸發時間並且任務執行時間可以修改並且立即生效,可以在運行時動態增加、停止、重啟job等。
經過研究org.springframework.scheduling.annotation.SchedulingConfigurer
滿足通過Cron表達式配置觸發時間、任務執行時間可以修改但不滿足修改cron表達式后立即生效,也不支持運行時動態增加、停止、重啟job。
進一步研究發現可以使用org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
滿足全部需要。示例代碼如下:
初始化ThreadPoolTaskScheduler
:
@Configuration
public class AppConfig {
@Bean
@ConditionalOnBean
public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("job-schedule-");
// *Set whether to wait for scheduled tasks to complete on shutdown,
// not interrupting running tasks and executing all tasks in the queue.
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
/**
* Set the maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down. This is particularly
* useful if your remaining tasks are likely to need access to other resources
* that are also managed by the container.
*/
taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
}
業務對象SysJobSchedule
:
public class SysJobSchedule {
private String name;
private String code;
private String comment;
private String cron;
private String className;
private Boolean deleteFlag = false;
private Boolean stopFlag = false;
.....
}
核心SmartScheduleJob
@Service
@Slf4j
@EnableScheduling
public class SmartScheduleJob {
private ConcurrentHashMap<String, ScheduledFuture> futureConcurrentHashMap = new ConcurrentHashMap<>();
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
/**
* 啟動調度
* @param jobSchedule
*/
public void startSchedule(SysJobSchedule jobSchedule) {
if (jobSchedule != null) {
log.info("啟動Job {}",jobSchedule.getClassName());
stopSchedule(jobSchedule.getClassName());
initSchedule(jobSchedule);
}
}
/**
* 關閉調度
* @param className
*/
public void stopSchedule((SysJobSchedule jobSchedule) {
String className = jobSchedule.getgClassName();
ScheduledFuture scheduledFuture = this.futureConcurrentHashMap.get(className);
if (scheduledFuture != null) {
log.info("==關閉Job:{}", className);
futureConcurrentHashMap.remove(className);
if (!scheduledFuture.isCancelled())
scheduledFuture.cancel(true);
}
}
private void initSchedule(SysJobSchedule jobSchedule) {
if(jobSchedule.getStopFlag())
return;
ScheduledFuture future = this.threadPoolTaskScheduler.schedule(
//1.添加任務內容(Runnable)
() -> {
this.doTask(jobSchedule);
},
//2.設置執行周期(Trigger)
triggerContext -> {
String cron = jobSchedule.getCron();
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
//將ScheduledFuture添加到ConcurrentHashMap中
futureConcurrentHashMap.put(jobSchedule.getClassName(), future);
}
private void doTask(SysJobSchedule jobSchedule) {
doSomething。。。。
}
}
ThreadPoolTaskScheduler
為輕量級任務調度器適用於單體應用,不適合分布式集群部署,由於節點之間沒有共享信息,因而會出現多次調度的情況.