目標:定時任務持久化到數據庫,動態調整數據庫里保存的cron表達式使定時任務可以跟隨變化。
1、核心依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>
2、數據庫表
-- ---------------------------- -- 任務調度表 job_info -- ---------------------------- CREATE TABLE `job_info` ( `id` varchar(20) NOT NULL COMMENT 'ID', `job_name` varchar(50) NOT NULL COMMENT '名稱', `cron_expression` varchar(255) NOT NULL COMMENT 'java cron表達式(秒分時日月周[年])', `bean_name` varchar(255) NOT NULL COMMENT 'spring bean名稱,被ioc容器管理的,需要執行調度任務的bean', `method_name` varchar(255) NOT NULL COMMENT 'bean里需要執行方法名', `params` varchar(255) NULL COMMENT '方法參數', `enable` tinyint(1) NOT NULL default 1 COMMENT '是否開啟,1開啟,0關閉', `remark` varchar(255) NULL COMMENT '說明', `create_time` datetime NULL DEFAULT NULL COMMENT '創建日期', `update_time` datetime NULL DEFAULT NULL COMMENT '更新日期', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COMMENT = '任務調度表'; insert into job_info values ('3', '用於測試定時任務觸發run', '0/30 * * * * ?', 'testJob', 'run', null, false, '用於測試定時任務工作情況run', now(), null); insert into job_info values ('4', '用於測試定時任務觸發call', '0/20 * * * * ?', 'testJob', 'call', null, false, '用於測試定時任務工作情況call', now(), null); -- ---------------------------- -- 任務日志表 job_log -- ---------------------------- CREATE TABLE `job_log` ( `id` varchar(20) NOT NULL COMMENT 'ID', `job_name` varchar(50) NOT NULL COMMENT '名稱', `cron_expression` varchar(255) NOT NULL COMMENT 'java cron表達式(秒分時日月周[年])', `bean_name` varchar(255) NOT NULL COMMENT 'spring bean名稱,被ioc容器管理的,需要執行調度任務的bean', `method_name` varchar(255) NOT NULL COMMENT 'bean里需要執行方法名', `params` varchar(255) NULL COMMENT '方法參數', `success` tinyint(1) NOT NULL default 1 COMMENT '是否成功,1成功,0失敗', `time` bigint(20) NOT NULL default 0 COMMENT '執行耗時,毫秒', `detail` text NULL COMMENT '詳細內容', `create_time` datetime NULL DEFAULT NULL COMMENT '創建日期', PRIMARY KEY (`id`) USING BTREE, KEY `idx_job_log_create_time` (`create_time`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COMMENT = '任務日志表';
從job_info表和job_log表構建兩個對應的實體類:JobInfo和JobLog
3、通過反射的方式調用定時任務,這樣就不用手動生成每個Quartz的Job
public class CallMethod { private Object target; private Method method; // 單個字符串參數,如有需要可以改成數組參數 private String params; public CallMethod(String beanName, String methodName, String params) throws NoSuchMethodException { this.target = SpringContextHolder.getBean(beanName); this.params = params; if (StringUtils.isNotBlank(params)) { this.method = target.getClass().getDeclaredMethod(methodName, String.class); } else { this.method = target.getClass().getDeclaredMethod(methodName); } } public void call() throws InvocationTargetException, IllegalAccessException { ReflectionUtils.makeAccessible(method); if (StringUtils.isNotBlank(params)) { method.invoke(target, params); } else { method.invoke(target); } } }
讀取JobExecutionContext上下文里JobDataMap,讀取bean或者類,然后通過反射調用定時的方法
@Slf4j @DisallowConcurrentExecution public class QuartzSerialJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { JobInfo jobInfo = (JobInfo) context.getMergedJobDataMap().get(JobConstant.JOB_KEY); JobLogService jobLogService = SpringContextHolder.getBean(JobLogService.class); JobLog jobLog = new JobLog(); BeanUtils.copyProperties(jobInfo, jobLog); jobLog.setCreateTime(new Date()); long startTime = System.currentTimeMillis(); try { log.info("定時任務准備執行,任務名稱:{}", jobInfo.getJobName()); CallMethod callMethod = new CallMethod(jobInfo.getBeanName(), jobInfo.getMethodName(), jobInfo.getParams()); callMethod.call(); jobLog.setSuccess(true); long times = System.currentTimeMillis() - startTime; log.info("定時任務執行完畢,任務名稱:{} 總共耗時:{} 毫秒", jobLog.getJobName(), times); } catch (Exception e) { log.error("定時任務執行失敗,任務名稱:"+ jobInfo.getJobName(), e); jobLog.setSuccess(false); jobLog.setDetail(e.toString()); } finally { long times = System.currentTimeMillis() - startTime; jobLog.setTime(times); jobLogService.insert(jobLog); } } }
ps:@DisallowConcurrentExecution注解是為了串行執行同樣的定時任務
SpringContextHolder方法的實現:
@Slf4j public class SpringContextHolder implements ApplicationContextAware, DisposableBean { private static ApplicationContext applicationContext = null; public static <T> T getBean(String name) { assertContextInjected(); return (T) applicationContext.getBean(name); } public static <T> T getBean(Class<T> requiredType) { assertContextInjected(); return applicationContext.getBean(requiredType); } /** * 檢查ApplicationContext不為空. */ private static void assertContextInjected() { if (applicationContext == null) { throw new IllegalStateException("applicaitonContext屬性未注入, 請在applicationContext" + ".xml中定義SpringContextHolder或在SpringBoot啟動類中注冊SpringContextHolder."); } } /** * 清除SpringContextHolder中的ApplicationContext為Null. */ private static void clearHolder() { log.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext); applicationContext = null; } @Override public void destroy(){ SpringContextHolder.clearHolder(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringContextHolder.applicationContext != null) { log.warn("SpringContextHolder中的ApplicationContext被覆蓋, 原有ApplicationContext為:" + SpringContextHolder.applicationContext); } SpringContextHolder.applicationContext = applicationContext; } }
4、實現定時任務增刪的基本操作
@Slf4j @Service("quartzJobService") public class QuartzJobService { private static final String JOB_NAME = "TASK_"; @Resource private Scheduler scheduler; /** * 創建串行定時任務 */ public void addSerialJob(JobInfo jobInfo){ if(!jobInfo.getEnable()) return; JobDetail jobDetail = JobBuilder.newJob(QuartzSerialJob.class) .storeDurably() .withIdentity(JOB_NAME + jobInfo.getId()) .build(); CronTrigger cronTrigger = TriggerBuilder.newTrigger() .withIdentity(JOB_NAME + jobInfo.getId()) .startNow() .withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCronExpression())) .build(); cronTrigger.getJobDataMap().put(JobConstant.JOB_KEY,jobInfo); try { scheduler.scheduleJob(jobDetail,cronTrigger); } catch (SchedulerException e) { log.error("創建定時任務錯誤"); e.printStackTrace(); } } /** * 重啟job */ public void ResetSerialJob(JobInfo jobInfo){ deleteJob(jobInfo); addSerialJob(jobInfo); } /** * 刪除job */ public void deleteJob(JobInfo jobInfo){ try { JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId()); scheduler.pauseJob(jobKey); scheduler.deleteJob(jobKey); } catch (Exception e){ log.error("刪除定時任務失敗", e); throw new CommonException(StatusCode.ERROR,"刪除定時任務失敗"); } } /** * 恢復job * 慎用 */ public void resumeJob(JobInfo jobInfo){ try { TriggerKey triggerKey = TriggerKey.triggerKey(JOB_NAME + jobInfo.getId()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 如果不存在則創建一個定時任務 if(trigger == null) { addSerialJob(jobInfo); } JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId()); scheduler.resumeJob(jobKey); } catch (Exception e){ log.error("恢復定時任務失敗", e); throw new CommonException(StatusCode.ERROR,"恢復定時任務失敗"); } } /** * 暫停job * 慎用 */ public void pauseJob(JobInfo jobInfo){ try { JobKey jobKey = JobKey.jobKey(JOB_NAME + jobInfo.getId()); scheduler.pauseJob(jobKey); } catch (Exception e){ log.error("定時任務暫停失敗", e); throw new CommonException(StatusCode.ERROR,"定時任務暫停失敗"); } } }
5、程序啟動時,自動加載有效的定時任務
@Slf4j @Component public class JobStarter implements ApplicationRunner { @Resource private JobInfoService jobInfoService; @Resource private QuartzJobService quartzJobService; @Override public void run(ApplicationArguments args) throws Exception { JobInfo jobInfo = new JobInfo(); jobInfo.setEnable(true); List<JobInfo> jobInfoList = jobInfoService.queryAll(jobInfo); log.info("加載定時任務..."); jobInfoList.forEach(job ->{ log.info("定時任務:"+job.getJobName()+" cron:"+job.getCronExpression()); quartzJobService.addSerialJob(job); }); log.info("定時任務加載完成..."); } }
6、任務調度管理的service實現類
@Slf4j @Service("jobInfoService") public class JobInfoServiceImpl implements JobInfoService { @Resource private JobInfoDao jobInfoDao; @Resource private QuartzJobService quartzJobService; @Resource private IdWorker idWorker; @Override public void startJob(String id) { JobInfo jobInfo = jobInfoDao.queryById(id); if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在"); // 正常情況下數據庫里所有enable=true的定時任務在程序啟動后就開始自動執行, if(jobInfo.getEnable()){ // 執行重啟命令 quartzJobService.ResetSerialJob(jobInfo); return; } JobInfo updateJob = new JobInfo(); updateJob.setId(id); updateJob.setEnable(true); int update = this.jobInfoDao.update(updateJob); if(update < 1) throw new CommonException(StatusCode.ERROR,"更新任務失敗"); jobInfo.setEnable(true); quartzJobService.addSerialJob(jobInfo); } @Override public void resetJob(String id) { JobInfo jobInfo = jobInfoDao.queryById(id); if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在"); quartzJobService.ResetSerialJob(jobInfo); } @Override public void pauseJob(String id) { JobInfo jobInfo = jobInfoDao.queryById(id); if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在"); JobInfo updateJob = new JobInfo(); updateJob.setId(id); updateJob.setEnable(false); this.jobInfoDao.update(updateJob); quartzJobService.pauseJob(jobInfo); } @Override public void resumeJob(String id) { JobInfo jobInfo = jobInfoDao.queryById(id); if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在"); if(!jobInfo.getEnable()){ JobInfo updateJob = new JobInfo(); updateJob.setId(id); updateJob.setEnable(true); this.jobInfoDao.update(updateJob); } quartzJobService.resumeJob(jobInfo); } @Override public void deleteJob(String id) { JobInfo jobInfo = jobInfoDao.queryById(id); if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在"); if(jobInfo.getEnable()){ JobInfo updateJob = new JobInfo(); updateJob.setId(id); updateJob.setEnable(false); this.jobInfoDao.update(updateJob); } quartzJobService.deleteJob(jobInfo); } @Override @Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE) public JobInfo updateState(JobInfo jobInfo) { JobInfo updateJob = new JobInfo(); updateJob.setId(jobInfo.getId()); updateJob.setEnable(jobInfo.getEnable()); updateJob.setUpdateTime(new Date()); int count = this.jobInfoDao.update(updateJob); if(count < 1) throw new CommonException(StatusCode.ERROR,"更新數據失敗"); JobInfo result = this.queryById(jobInfo.getId()); if(result.getEnable()){ this.quartzJobService.ResetSerialJob(result); }else { this.quartzJobService.deleteJob(jobInfo); } return result; } @Override public PageInfo<JobInfo> queryByPage(JobInfoQueryDto jobInfo, int pageNum, int pageSize) { PageHelper.startPage(pageNum, pageSize); return new PageInfo<>(this.jobInfoDao.queryByBlurry(jobInfo)); } @Override public JobInfo queryById(String id) { return this.jobInfoDao.queryById(id); } @Override public List<JobInfo> queryAll(JobInfo jobInfo) { return this.jobInfoDao.queryAll(jobInfo); } /** * 新增數據 * */ @Override public JobInfo insert(JobInfo jobInfo) { if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){ throw new CommonException(StatusCode.INVALID_DATA,"cron表達式格式錯誤"); } // 判斷bean是否存在 Object bean = SpringContextHolder.getBean(jobInfo.getBeanName()); log.info("新增定時任務,bean ->"+bean+" method->"+jobInfo.getMethodName()); jobInfo.setId(String.valueOf(idWorker.nextId())); jobInfo.setCreateTime(new Date()); this.jobInfoDao.insert(jobInfo); if(jobInfo.getEnable()){ this.quartzJobService.addSerialJob(jobInfo); } return jobInfo; } /** * 修改任務數據 * 修改后如果查詢到enable=true會重啟任務 * */ @Override @Transactional(rollbackFor = Exception.class,isolation = Isolation.SERIALIZABLE) public JobInfo update(JobInfo jobInfo) { if (!CronExpression.isValidExpression(jobInfo.getCronExpression())){ throw new CommonException(StatusCode.INVALID_DATA,"cron表達式格式錯誤"); } // 判斷bean是否存在 Object bean = SpringContextHolder.getBean(jobInfo.getBeanName()); log.info("更新定時任務,bean ->"+bean+" method->"+jobInfo.getMethodName()); jobInfo.setUpdateTime(new Date()); this.jobInfoDao.update(jobInfo); JobInfo result = this.queryById(jobInfo.getId()); if(result.getEnable()){ this.quartzJobService.ResetSerialJob(result); } return result; } /** * 通過主鍵刪除數據 * 刪除前會停止任務 */ @Override public boolean deleteById(String id) { JobInfo jobInfo = this.jobInfoDao.queryById(id); if(jobInfo==null) throw new CommonException(StatusCode.ERROR,"任務不存在"); this.quartzJobService.deleteJob(jobInfo); return this.jobInfoDao.deleteById(id) > 0; } }
@DisallowConcurrentExecution