項目開發中,經常會遇到定時任務的場景,Spring提供了@Scheduled注解
,方便進行定時任務的開發
概述
要使用@Scheduled
注解,首先需要在啟動類添加@EnableScheduling
,啟用Spring的計划任務執行功能,這樣可以在容器中的任何Spring管理的bean上檢測@Scheduled
注解,執行計划任務
注解定義
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Repeatable(Schedules.class) public @interface Scheduled { String cron() default ""; String zone() default ""; long fixedDelay() default -1; String fixedDelayString() default ""; long fixedRate() default -1; String fixedRateString() default ""; long initialDelay() default -1; String initialDelayString() default ""; }
參數說明
源碼解析
配置了@Scheduled
注解的方法,Spring的處理是通過注冊ScheduledAnnotationBeanPostProcessor來執行,將不同配置參數的任務分配給不同的handler處理,核心代碼如下
- org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
- org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#processScheduled
/** * Process the given {@code @Scheduled} method declaration on the given bean. * @param scheduled the @Scheduled annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance * @see #createRunnable(Object, Method) */ protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
- org.springframework.scheduling.config.ScheduledTaskRegistrar#scheduleTasks
/** * Schedule all registered tasks against the underlying * {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}. */ proected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
使用詳解
定時任務同步/異步執行
定時任務執行默認是單線程模式,會創建一個本地線程池,線程池大小為1。當項目中有多個定時任務時,任務之間會相互等待,同步執行
源碼:
// org.springframework.scheduling.config.ScheduledTaskRegistrar#scheduleTasks if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } // java.util.concurrent.Executors#newSingleThreadScheduledExecutor() public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
代碼示例:
@Slf4j @Component public class RunIntervalTestScheduler { @Scheduled(cron = "0/1 * * * * ?") public void singleThreadTest1() { log.info("singleThreadTest1"); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); } @Scheduled(cron = "0/1 * * * * ?") public void singleThreadTest2() { log.info("singleThreadTest2"); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); } @Scheduled(cron = "0/1 * * * * ?") public void singleThreadTest3() { log.info("singleThreadTest3"); LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); } }
執行結果:
可以看到,默認情況下,三個任務串行執行,都使用pool-1-thread-1
同一個線程池,並且線程只有一個
可以通過實現SchedulingConfigurer
接口,手動創建線程池,配置期望的線程數量
示例代碼:
@Configuration public class ScheduledConfig implements SchedulingConfigurer { /** * 任務執行線程池大小 */ private static final int TASK_POOL_SIZE = 50; /** * 線程名 */ private static final String TASK_THREAD_PREFIX = "test-task-"; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { ThreadPoolTaskScheduler taskPool = new ThreadPoolTaskScheduler(); taskPool.setPoolSize(TASK_POOL_SIZE); taskPool.setThreadNamePrefix(TASK_THREAD_PREFIX); taskPool.initialize(); scheduledTaskRegistrar.setTaskScheduler(taskPool); } }
任務執行結果:
此時任務的執行已經異步化,從自定義線程池中分配線程執行任務,在實際應用中需要考慮實際任務數量,創建相應大小的線程池
fixedRate/fixedDelay區別
- fixedRate是配置上一次任務執行開始到下一次執行開始的間隔時間,不會等待上一次任務執行完成就會調度下一次任務,將其放入等待隊列中
代碼示例:
@Slf4j @Component public class RunIntervalTestScheduler { @Scheduled(initialDelay = 1000, fixedRate = 1000) public void fixedRate() throws Exception { log.info("fixedRate run"); TimeUnit.SECONDS.sleep(3); } }
執行結果:
任務配置的fixedRate為1s,執行日志打印的時間間隔都是3s左右,也就是上一次執行完成后,緊接着就執行下一次任務
- fixedDelay是配置的上一次任務執行結束到下一次執行開始的間隔時間,也就是說會等待上一次任務執行結束后,延遲間隔時間,再執行下一次任務
代碼示例:
@Slf4j @Component public class RunIntervalTestScheduler { @Scheduled(initialDelay = 1000, fixedDelay = 1000) public void fixedDelay() throws Exception { log.info("fixedDelay run"); TimeUnit.SECONDS.sleep(3); } }
執行結果:
任務配置的fixedDelay為1s,執行日志打印的時間間隔都是4s左右,也就是上一次執行完成后,延遲1s后執行下一次任務
- cron表達式如果配置為類似每秒執行、每分鍾執行(例:0/1 * * * * ?, 每秒執行),調度跟fixedDelay是一致的,也是在上一次任務執行結束后,等待間隔時間
代碼示例:
@Slf4j @Component public class RunIntervalTestScheduler { @Scheduled(cron = "0/1 * * * * ?") public void cronRun() throws Exception{ log.info("cron run"); TimeUnit.SECONDS.sleep(3); } }
執行結果:
執行日志打印的時間間隔都是4s左右,也就是上一次執行完成后,延遲1s后執行下一次任務
- cron表達式如果配置為固定時間執行(例:1 * * * * ?, 秒數為1時執行),若上一次任務沒有執行完,則不會調度本次任務,跳過本次執行,等待下一次執行周期
代碼示例:
@Slf4j @Component public class RunIntervalTestScheduler { @Scheduled(cron = "1 * * * * ?") public void cronRun() throws Exception{ log.info("cron run"); TimeUnit.SECONDS.sleep(70); } }
執行結果:
上一次任務未執行完畢,則跳過了本次執行
https://blog.csdn.net/hu_dongyang/article/details/114270232