利用Spring的ThreadPoolTaskScheduler實現輕量級任務調度


在單體應用中需要一個使用簡單性能可靠的調度功能,要求可以通過Cron表達式配置觸發時間並且任務執行時間可以修改並且立即生效,可以在運行時動態增加、停止、重啟job等。

經過研究org.springframework.scheduling.annotation.SchedulingConfigurer滿足通過Cron表達式配置觸發時間、任務執行時間可以修改但不滿足修改cron表達式后立即生效,也不支持運行時動態增加、停止、重啟job。

進一步研究發現可以使用org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler滿足全部需要。示例代碼如下:

初始化ThreadPoolTaskScheduler:

@Configuration
public class AppConfig {
    @Bean
    @ConditionalOnBean
    public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("job-schedule-");
        //	 *Set whether to wait for scheduled tasks to complete on shutdown,
        //	  not interrupting running tasks and executing all tasks in the queue.
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        /**
         *  Set the maximum number of seconds that this executor is supposed to block
         *  on shutdown in order to wait for remaining tasks to complete their execution
         *  before the rest of the container continues to shut down. This is particularly
         *  useful if your remaining tasks are likely to need access to other resources
         *  that are also managed by the container.
         */
        taskScheduler.setAwaitTerminationSeconds(60);
        return taskScheduler;
    }
}

業務對象SysJobSchedule:

public class SysJobSchedule {
    private String name;
    private String code;
    private String comment;
    private String cron;
    private String className;
    private Boolean deleteFlag = false;
    private Boolean stopFlag = false;
    .....
 }

核心SmartScheduleJob

@Service
@Slf4j
@EnableScheduling
public class SmartScheduleJob {
    private ConcurrentHashMap<String, ScheduledFuture> futureConcurrentHashMap = new ConcurrentHashMap<>();
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    
      /**
     * 啟動調度
     * @param jobSchedule
     */
    public void startSchedule(SysJobSchedule jobSchedule) {
        if (jobSchedule != null) {
            log.info("啟動Job {}",jobSchedule.getClassName());
            stopSchedule(jobSchedule.getClassName());
            initSchedule(jobSchedule);
        }
    }

    /**
     * 關閉調度
     * @param className
     */
    public void stopSchedule((SysJobSchedule jobSchedule) {
        String className = jobSchedule.getgClassName();
        ScheduledFuture scheduledFuture = this.futureConcurrentHashMap.get(className);
        if (scheduledFuture != null) {
            log.info("==關閉Job:{}", className);
            futureConcurrentHashMap.remove(className);
            if (!scheduledFuture.isCancelled())
                scheduledFuture.cancel(true);
        }
    }


    private void initSchedule(SysJobSchedule jobSchedule) {
        if(jobSchedule.getStopFlag())
            return;
        ScheduledFuture future = this.threadPoolTaskScheduler.schedule(
                //1.添加任務內容(Runnable)
                () -> {
                    this.doTask(jobSchedule);
                },
                //2.設置執行周期(Trigger)
                triggerContext -> {                              
                    String cron = jobSchedule.getCron();                 
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
        //將ScheduledFuture添加到ConcurrentHashMap中
        futureConcurrentHashMap.put(jobSchedule.getClassName(), future);
    }

    private void doTask(SysJobSchedule jobSchedule) {
      doSomething。。。。
    }

}

ThreadPoolTaskScheduler為輕量級任務調度器適用於單體應用,不適合分布式集群部署,由於節點之間沒有共享信息,因而會出現多次調度的情況.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM