使用ThreadPoolTaskScheduler動態修改調度時間


SchedulingConfigurer 接口只能統一修改,要分開控制的話有多少個job就要有多少個實現。比較麻煩

配置線程池ThreadPoolTaskScheduler

@Configuration
public class JobConfig {
    @Bean("taskExecutor")
    public ThreadPoolTaskScheduler taskExecutor() {
        ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
        executor.setPoolSize(20);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(300);
        return executor;
    }
}

封裝實現

@Component
@Slf4j
public class Jobs {
    /**
     * key josId
     * value ScheduledFuture object
     */
    private final ConcurrentHashMap<Integer, ScheduledFuture> jobsMap = new ConcurrentHashMap<>(8);

    private final ApplicationContext applicationContext;
    private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
    private final EcoScheduleJobMapper scheduleJobMapper;

    public Jobs(ApplicationContext applicationContext, EcoScheduleJobMapper scheduleJobMapper, ThreadPoolTaskScheduler threadPoolTaskScheduler) {
        this.applicationContext = applicationContext;
        this.scheduleJobMapper = scheduleJobMapper;
        this.threadPoolTaskScheduler = threadPoolTaskScheduler;
        init();
    }

    public Map<Integer, ScheduledFuture> getJobsMap() {
        return jobsMap;
    }

    public void start(Integer jobId) {
        if (jobsMap.containsKey(jobId)) {
            log.debug("job:{} is running", jobId);
            // running...
            return;
        }

        executeJob(selectJobsDetail(jobId));
    }

    public void stop(Integer jobId) {
        ScheduledFuture scheduledFuture = jobsMap.get(jobId);
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            log.debug("job cancel:{}", scheduledFuture);
        } else
            log.debug("jobId:{} is not running ", jobId);
    }

    private EcoScheduleJob selectJobsDetail(Integer jobId) {
        return scheduleJobMapper.selectByPrimaryKey(jobId);
    }

    // 修改corn配置,先取消再重新啟動
    public void reset(Integer jobId) {
        ScheduledFuture scheduledFuture = jobsMap.get(jobId);
        EcoScheduleJob scheduleJob = selectJobsDetail(jobId);
        if (Objects.nonNull(scheduledFuture)) {
            scheduledFuture.cancel(true);
            log.debug("job:{} cancel success", scheduleJob);
        }
        if (Objects.nonNull(scheduleJob)) {
            executeJob(scheduleJob);
            log.debug("job:{} start success", scheduleJob);
        }
    }

    // 啟動初始化啟動所有job
    private void init() {
        // 從數據庫獲取任務信息
        List<EcoScheduleJob> jobs = scheduleJobMapper.selectJobs();
        if (CollectionUtils.isNotEmpty(jobs)) {
            jobs.forEach(this::executeJob);
        }
    }

    private void executeJob(EcoScheduleJob job) {
        Runnable runnable = null;
        try {
            Class<?> clazz = Class.forName(job.getClassName());
            Constructor<?>[] declaredConstructors = clazz.getDeclaredConstructors();
            if (!Arrays.isNullOrEmpty(declaredConstructors)) {
                // 只取一個構造器
                if (declaredConstructors.length > 1) {
                    log.warn("{} 存在多個構造器,只能有一個", clazz);
                    return;
                }
                Constructor<?> constructor = declaredConstructors[0];
                Class<?>[] parameterTypes = constructor.getParameterTypes();
                if (parameterTypes.length == 0) {
                    log.warn("{} 無參構造器", clazz);
                    return;
                }
                Class<?> parameterType = parameterTypes[0];
                Object bean = applicationContext.getBean(parameterType);
                runnable = (Runnable) constructor.newInstance(bean);
            }
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
            log.error("任務調度失敗:{}", e);
        }
        if (Objects.nonNull(runnable)) {
            ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(runnable, new CronTrigger(job.getCron()));
            jobsMap.put(job.getJobId(), schedule);
            log.debug("job:{} is running...", job);
        } else {
            log.error("任務調度失敗 job Class is null");
        }
    }

}

pojo

@Table(name = "`eco_schedule_job`")
@Data
public class EcoScheduleJob implements Serializable {
    /**
     * pk
     */
    @Id
    @Column(name = "`job_id`")
    private Integer jobId;

    /**
     * 類名
     */
    @Column(name = "`class_name`")
    private String className;

    /**
     * cron表達式
     */
    @Column(name = "`cron`")
    private String cron;

    /**
     * 狀態 1 停用 0 啟動
     */
    @Column(name = "`status`")
    private String status;
}

controller

@RequestMapping("/job")
@RestController
public class ScheduleJobController {

    @Reference
    private ScheduleJobService scheduleJobService;

    @GetMapping("/status.do")
    public Object status() {
        return scheduleJobService.status();
    }

    @GetMapping("/start.do")
    public Object start(@RequestParam Integer jobId) {
        scheduleJobService.start(jobId);
        return Response.OK();
    }

    @GetMapping("/stop.do")
    public Object stop(@RequestParam Integer jobId) {
        scheduleJobService.stop(jobId);
        return Response.OK();
    }

    @GetMapping("/update.do")
    public Object update(@RequestParam Integer jobId, @RequestParam String cron) {
        scheduleJobService.update(jobId, cron);
        return Response.OK();
    }
}

service

@Service
public class ScheduleJobServiceImpl implements ScheduleJobService {

    private final EcoScheduleJobMapper ecoScheduleJobMapper;

    private final Jobs jobs;

    public ScheduleJobServiceImpl(EcoScheduleJobMapper ecoScheduleJobMapper , Jobs jobs) {
        this.ecoScheduleJobMapper = ecoScheduleJobMapper;
        this.jobs = jobs;
    }

    @Override
    public void update(Integer jobId, String cron) {
        EcoScheduleJob scheduleJob = new EcoScheduleJob();
        scheduleJob.setJobId(jobId);
        scheduleJob.setCron(cron);
        ecoScheduleJobMapper.updateByPrimaryKeySelective(scheduleJob);

        jobs.reset(jobId);
    }

    @Override
    public void stop(Integer jobId) {
        jobs.stop(jobId);
    }

    @Override
    public void start(Integer jobId) {
        jobs.start(jobId);
    }

    @Override
    public List<EcoScheduleJob> status() {
        Set<Integer> integers = jobs.getJobsMap().keySet();
        String join = StringUtils.join(integers, ",");
        return ecoScheduleJobMapper.selectByIds(join);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM