Quartz實現動態定時任務


目標:定時任務持久化到數據庫,動態調整數據庫里保存的cron表達式使定時任務可以跟隨變化。

1、核心依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>

2、數據庫表

-- ----------------------------
-- 任務調度表  job_info
-- ----------------------------
CREATE TABLE `job_info`
(
    `id`              varchar(20)  NOT NULL COMMENT 'ID',
    `job_name`        varchar(50)  NOT NULL COMMENT '名稱',
    `cron_expression` varchar(255) NOT NULL COMMENT 'java cron表達式(秒分時日月周[年])',
    `bean_name`       varchar(255) NOT NULL COMMENT 'spring bean名稱,被ioc容器管理的,需要執行調度任務的bean',
    `method_name`     varchar(255) NOT NULL COMMENT 'bean里需要執行方法名',
    `params`          varchar(255) NULL COMMENT '方法參數',
    `enable`          tinyint(1)   NOT NULL default 1 COMMENT '是否開啟,1開啟,0關閉',
    `remark`          varchar(255) NULL COMMENT '說明',
    `create_time`     datetime     NULL     DEFAULT NULL COMMENT '創建日期',
    `update_time`     datetime     NULL     DEFAULT NULL COMMENT '更新日期',
    PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB
  CHARACTER SET = utf8mb4 COMMENT = '任務調度表';

insert into job_info
values ('3', '用於測試定時任務觸發run', '0/30 * * * * ?', 'testJob', 'run', null, false, '用於測試定時任務工作情況run', now(), null);
insert into job_info
values ('4', '用於測試定時任務觸發call', '0/20 * * * * ?', 'testJob', 'call', null, false, '用於測試定時任務工作情況call', now(), null);


-- ----------------------------
-- 任務日志表  job_log
-- ----------------------------
CREATE TABLE `job_log`
(
    `id`              varchar(20)  NOT NULL COMMENT 'ID',
    `job_name`        varchar(50)  NOT NULL COMMENT '名稱',
    `cron_expression` varchar(255) NOT NULL COMMENT 'java cron表達式(秒分時日月周[年])',
    `bean_name`       varchar(255) NOT NULL COMMENT 'spring bean名稱,被ioc容器管理的,需要執行調度任務的bean',
    `method_name`     varchar(255) NOT NULL COMMENT 'bean里需要執行方法名',
    `params`          varchar(255) NULL COMMENT '方法參數',
    `success`         tinyint(1)   NOT NULL default 1 COMMENT '是否成功,1成功,0失敗',
    `time`            bigint(20)   NOT NULL default 0 COMMENT '執行耗時,毫秒',
    `detail`          text         NULL COMMENT '詳細內容',
    `create_time`     datetime     NULL     DEFAULT NULL COMMENT '創建日期',
    PRIMARY KEY (`id`) USING BTREE,
    KEY `idx_job_log_create_time` (`create_time`) USING BTREE
) ENGINE = InnoDB
  CHARACTER SET = utf8mb4 COMMENT = '任務日志表';

從job_info表和job_log表構建兩個對應的實體類:JobInfo和JobLog

 

3、通過反射的方式調用定時任務,這樣就不用手動生成每個Quartz的Job

public class CallMethod {
    private Object target;
    private Method method;
    // 單個字符串參數,如有需要可以改成數組參數
    private String params;

    public CallMethod(String beanName, String methodName, String params) throws NoSuchMethodException {
        this.target = SpringContextHolder.getBean(beanName);
        this.params = params;

        if (StringUtils.isNotBlank(params)) {
            this.method = target.getClass().getDeclaredMethod(methodName, String.class);
        } else {
            this.method = target.getClass().getDeclaredMethod(methodName);
        }
    }

    public void call() throws InvocationTargetException, IllegalAccessException {
        ReflectionUtils.makeAccessible(method);
        if (StringUtils.isNotBlank(params)) {
            method.invoke(target, params);
        } else {
            method.invoke(target);
        }
    }
}

讀取JobExecutionContext上下文里JobDataMap,讀取bean或者類,然后通過反射調用定時的方法

@Slf4j
@DisallowConcurrentExecution
public class QuartzSerialJob extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        JobInfo jobInfo = (JobInfo) context.getMergedJobDataMap().get(JobConstant.JOB_KEY);
        JobLogService jobLogService = SpringContextHolder.getBean(JobLogService.class);

        JobLog jobLog = new JobLog();
        BeanUtils.copyProperties(jobInfo, jobLog);
        jobLog.setCreateTime(new Date());
        long startTime = System.currentTimeMillis();
        try {
            log.info("定時任務准備執行,任務名稱:{}", jobInfo.getJobName());
            CallMethod callMethod = new CallMethod(jobInfo.getBeanName(), jobInfo.getMethodName(), jobInfo.getParams());
            callMethod.call();
            jobLog.setSuccess(true);
            long times = System.currentTimeMillis() - startTime;
            log.info("定時任務執行完畢,任務名稱:{} 總共耗時:{} 毫秒", jobLog.getJobName(), times);
        } catch (Exception e) {
            log.error("定時任務執行失敗,任務名稱:"+ jobInfo.getJobName(), e);
            jobLog.setSuccess(false);
            jobLog.setDetail(e.toString());
        } finally {
            long times = System.currentTimeMillis() - startTime;
            jobLog.setTime(times);
            jobLogService.insert(jobLog);
        }
    }
}
ps:@DisallowConcurrentExecution注解是為了串行執行同樣的定時任務


SpringContextHolder方法的實現:
@Slf4j
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {

    private static ApplicationContext applicationContext = null;

   
    public static <T> T getBean(String name) {
        assertContextInjected();
        return (T) applicationContext.getBean(name);
    }

   
    public static <T> T getBean(Class<T> requiredType) {
        assertContextInjected();
        return applicationContext.getBean(requiredType);
    }

    /**
     * 檢查ApplicationContext不為空.
     */
    private static void assertContextInjected() {
        if (applicationContext == null) {
            throw new IllegalStateException("applicaitonContext屬性未注入, 請在applicationContext" +
                    ".xml中定義SpringContextHolder或在SpringBoot啟動類中注冊SpringContextHolder.");
        }
    }

    /**
     * 清除SpringContextHolder中的ApplicationContext為Null.
     */
    private static void clearHolder() {
        log.debug("清除SpringContextHolder中的ApplicationContext:"
                + applicationContext);
        applicationContext = null;
    }

    @Override
    public void destroy(){
        SpringContextHolder.clearHolder();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringContextHolder.applicationContext != null) {
            log.warn("SpringContextHolder中的ApplicationContext被覆蓋, 原有ApplicationContext為:" + SpringContextHolder.applicationContext);
        }
        SpringContextHolder.applicationContext = applicationContext;
    }
}
 
        

4、實現定時任務增刪的基本操作

@Slf4j
@Service("quartzJobService")
public class QuartzJobService {
    private static final String JOB_NAME = "TASK_";

    @Resource
    private Scheduler scheduler;

    /**
     * 創建串行定時任務
     */
    public void addSerialJob(JobInfo jobInfo){
        if(!jobInfo.getEnable()) return;
        JobDetail jobDetail = JobBuilder.newJob(QuartzSerialJob.class)
                .storeDurably()
                .withIdentity(JOB_NAME + jobInfo.getId())
                .build();

        CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                .withIdentity(JOB_NAME + jobInfo.getId())
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCronExpression()))
                .build();
        cronTrigger.getJobDataMap().put(JobConstant.JOB_KEY,jobInfo);

        try {
            scheduler.scheduleJob(jobDetail,cronTrigger);
        } catch (SchedulerException e) {
            log.error("創建定時任務錯誤");
            e.printStackTrace();
        }
    }

    /**
     * 重啟job
     */
    public void ResetSerialJob(JobInfo jobInfo){
        deleteJob(jobInfo);
        addSerialJob(jobInfo);
    }

    /**
     * 刪除job
     */
    public void deleteJob(JobInfo jobInfo){
        try {
            JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
            scheduler.pauseJob(jobKey);
            scheduler.deleteJob(jobKey);
        } catch (Exception e){
            log.error("刪除定時任務失敗", e);
            throw new CommonException(StatusCode.ERROR,"刪除定時任務失敗");
        }
    }

    /**
     * 恢復job
     * 慎用
     */
    public void resumeJob(JobInfo jobInfo){
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + jobInfo.getId());
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            // 如果不存在則創建一個定時任務
            if(trigger == null) {
                addSerialJob(jobInfo);
            }
            JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
            scheduler.resumeJob(jobKey);
        } catch (Exception e){
            log.error("恢復定時任務失敗", e);
            throw new CommonException(StatusCode.ERROR,"恢復定時任務失敗");
        }
    }

    /**
     * 暫停job
     * 慎用
     */
    public void pauseJob(JobInfo jobInfo){
        try {
            JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId());
            scheduler.pauseJob(jobKey);
        } catch (Exception e){
            log.error("定時任務暫停失敗", e);
            throw new CommonException(StatusCode.ERROR,"定時任務暫停失敗");
        }
    }
}

 

5、程序啟動時,自動加載有效的定時任務

@Slf4j
@Component
public class JobStarter implements ApplicationRunner {
    @Resource
    private JobInfoService jobInfoService;

    @Resource
    private QuartzJobService quartzJobService;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        JobInfo jobInfo = new JobInfo();
        jobInfo.setEnable(true);
        List<JobInfo> jobInfoList = jobInfoService.queryAll(jobInfo);
        log.info("加載定時任務...");
        jobInfoList.forEach(job ->{
            log.info("定時任務:"+job.getJobName()+"  cron:"+job.getCronExpression());
            quartzJobService.addSerialJob(job);
        });
        log.info("定時任務加載完成...");
    }
}

 

6、任務調度管理的service實現類

@Slf4j
@Service("jobInfoService")
public class JobInfoServiceImpl implements JobInfoService {
    @Resource
    private JobInfoDao jobInfoDao;

    @Resource
    private QuartzJobService quartzJobService;

    @Resource
    private IdWorker idWorker;

    @Override
    public void startJob(String id) {
        JobInfo jobInfo = jobInfoDao.queryById(id);
        if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在");
        // 正常情況下數據庫里所有enable=true的定時任務在程序啟動后就開始自動執行,
        if(jobInfo.getEnable()){
            // 執行重啟命令
            quartzJobService.ResetSerialJob(jobInfo);
            return;
        }

        JobInfo updateJob = new JobInfo();
        updateJob.setId(id);
        updateJob.setEnable(true);
        int update = this.jobInfoDao.update(updateJob);
        if(update < 1) throw new CommonException(StatusCode.ERROR,"更新任務失敗");
        jobInfo.setEnable(true);
        quartzJobService.addSerialJob(jobInfo);
    }

    @Override
    public void resetJob(String id) {
        JobInfo jobInfo = jobInfoDao.queryById(id);
        if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在");
        quartzJobService.ResetSerialJob(jobInfo);
    }

    @Override
    public void pauseJob(String id) {
        JobInfo jobInfo = jobInfoDao.queryById(id);
        if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在");

        JobInfo updateJob = new JobInfo();
        updateJob.setId(id);
        updateJob.setEnable(false);
        this.jobInfoDao.update(updateJob);
        quartzJobService.pauseJob(jobInfo);
    }

    @Override
    public void resumeJob(String id) {
        JobInfo jobInfo = jobInfoDao.queryById(id);
        if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在");
        if(!jobInfo.getEnable()){
            JobInfo updateJob = new JobInfo();
            updateJob.setId(id);
            updateJob.setEnable(true);
            this.jobInfoDao.update(updateJob);
        }
        quartzJobService.resumeJob(jobInfo);
    }

    @Override
    public void deleteJob(String id) {
        JobInfo jobInfo = jobInfoDao.queryById(id);
        if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在");
        if(jobInfo.getEnable()){
            JobInfo updateJob = new JobInfo();
            updateJob.setId(id);
            updateJob.setEnable(false);
            this.jobInfoDao.update(updateJob);
        }
        quartzJobService.deleteJob(jobInfo);
    }

    @Override
    @Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
    public JobInfo updateState(JobInfo jobInfo) {
        JobInfo updateJob = new JobInfo();
        updateJob.setId(jobInfo.getId());
        updateJob.setEnable(jobInfo.getEnable());
        updateJob.setUpdateTime(new Date());

        int count = this.jobInfoDao.update(updateJob);
        if(count < 1) throw new CommonException(StatusCode.ERROR,"更新數據失敗");

        JobInfo result = this.queryById(jobInfo.getId());
        if(result.getEnable()){
            this.quartzJobService.ResetSerialJob(result);
        }else {
            this.quartzJobService.deleteJob(jobInfo);
        }
        return result;
    }

    @Override
    public PageInfo<JobInfo> queryByPage(JobInfoQueryDto jobInfo, int pageNum, int pageSize) {
        PageHelper.startPage(pageNum, pageSize);
        return new PageInfo<>(this.jobInfoDao.queryByBlurry(jobInfo));
    }


    @Override
    public JobInfo queryById(String id) {
        return this.jobInfoDao.queryById(id);
    }


    @Override
    public List<JobInfo> queryAll(JobInfo jobInfo) {
        return this.jobInfoDao.queryAll(jobInfo);
    }

    /**
     * 新增數據
     *
     */
    @Override
    public JobInfo insert(JobInfo jobInfo) {
        if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){
            throw new CommonException(StatusCode.INVALID_DATA,"cron表達式格式錯誤");
        }
        // 判斷bean是否存在
        Object bean = SpringContextHolder.getBean(jobInfo.getBeanName());
        log.info("新增定時任務,bean ->"+bean+" method->"+jobInfo.getMethodName());

        jobInfo.setId(String.valueOf(idWorker.nextId()));
        jobInfo.setCreateTime(new Date());
        this.jobInfoDao.insert(jobInfo);
        if(jobInfo.getEnable()){
            this.quartzJobService.addSerialJob(jobInfo);
        }
        return jobInfo;
    }

    /**
     * 修改任務數據
     * 修改后如果查詢到enable=true會重啟任務
     *
     */
    @Override
    @Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE)
    public JobInfo update(JobInfo jobInfo) {
if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){
            throw new CommonException(StatusCode.INVALID_DATA,"cron表達式格式錯誤");
        }
        // 判斷bean是否存在
        Object bean = SpringContextHolder.getBean(jobInfo.getBeanName());
        log.info("更新定時任務,bean ->"+bean+" method->"+jobInfo.getMethodName());

        jobInfo.setUpdateTime(new Date());
        this.jobInfoDao.update(jobInfo);
        JobInfo result = this.queryById(jobInfo.getId());
        if(result.getEnable()){
            this.quartzJobService.ResetSerialJob(result);
        }
        return result;
    }

    /**
     * 通過主鍵刪除數據
     * 刪除前會停止任務
     */
    @Override
    public boolean deleteById(String id) {

        JobInfo jobInfo = this.jobInfoDao.queryById(id);
        if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在");
        this.quartzJobService.deleteJob(jobInfo);
        return this.jobInfoDao.deleteById(id) > 0;
    }
}

 

 
@DisallowConcurrentExecution


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM