一. 整合的步骤
- 建立springboot工程映入相关依赖
- 导入quartz的tables_mysql_innodb.sql文件到数据库中
- 建立中间数据控制表
- 建立Job工厂类
- 建立任务Trigger触发器监听类
- 建立业务控制处理类
- 项目重启重置任务处理
- 配置quartz参数
二. 详细描述
(1) 建立springboot工程映入相关依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>
(2) 导入quartz的tables_mysql_innodb.sql文件到数据库中
(3) 建立中间数据控制表
中间数据控制表用于管理任务的增删改以及初始化
CREATE TABLE `sys_schedule_info` ( `id` varchar(100) NOT NULL, `task_description` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '任务描述', `task_name` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '任务名称', `task_group` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '任务组名称', `trigger_name` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '触发器名称', `trigger_group` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '触发器组', `trigger_cron_expression` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '表达式', `execute_class_name` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '目标执行类类名', `execute_method_name` varchar(500) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '执行类的具体执行方法', `target_table` text CHARACTER SET utf8 COLLATE utf8_general_ci COMMENT '数据目标所在表集合","分割用于统计', `is_start` tinyint(1) DEFAULT NULL COMMENT '是否启动', `status` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT '1' COMMENT '0删,1允正常', `updated_id` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人id', `updated_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
(4) 建立Job工厂类
import com.tools.SpringUtil; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; /** * @CreateTime: 2019-05-20 15:07 * @Description: 任务执行器 * @Author: WH */ public class SystemJobFactory implements Job { Logger log = LoggerFactory.getLogger(getClass()); @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { JobDetail jobDetail = jobExecutionContext.getJobDetail(); log.debug(jobDetail.getKey().getName()+"="+jobDetail.getKey().getGroup()); JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); //具体处理类组件名(处理类上加了@Component()注解,注意默认首字母数据库存的时候是小写) String className = jobDataMap.getString("className"); //具体处理类的方法名 String methodName = jobDataMap.getString("methodName"); //获取对应的Bean Object object = SpringUtil.getBean(className); try { //利用反射执行对应方法 Method method = object.getClass().getMethod(methodName); method.invoke(object); } catch (Exception e) { e.printStackTrace(); } } }
(5) 建立任务Trigger触发器监听类
import org.quartz.JobExecutionContext; import org.quartz.Trigger; import org.quartz.TriggerListener; /** * @CreateTime: 2019-05-20 16:13 * @Description: 任务监听 * @Author: WH */ public class SystemJobTriggerListener implements TriggerListener { @Override public String getName() { return this.getClass().getSimpleName(); } @Override public void triggerFired(Trigger trigger, JobExecutionContext jobExecutionContext) { } @Override public boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobExecutionContext) { return false; } @Override public void triggerMisfired(Trigger trigger) { } @Override public void triggerComplete(Trigger trigger, JobExecutionContext jobExecutionContext, Trigger.CompletedExecutionInstruction completedExecutionInstruction) { } }
(6) 建立业务控制处理类
private Boolean startTask(SysScheduleInfo sysScheduleInfo){ boolean ret = true; try { //创建触发器 Trigger trigger = TriggerBuilder.newTrigger().withIdentity(sysScheduleInfo.getTriggerName(),sysScheduleInfo.getTaskGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(sysScheduleInfo.getTriggerCronExpression()).withMisfireHandlingInstructionDoNothing()) .startNow() .build(); //创建任务 JobDetail jobDetail = JobBuilder.newJob(SystemJobFactory.class) .withIdentity(sysScheduleInfo.getTaskName(),sysScheduleInfo.getTaskGroup()) .usingJobData("className",sysScheduleInfo.getExecuteClassName()) .usingJobData("methodName",sysScheduleInfo.getExecuteMethodName()) .build(); //调度作业 scheduler.scheduleJob(jobDetail, trigger); //添加监听 ListenerManager listenerManager = scheduler.getListenerManager(); TriggerListener systemJobTriggerListener = listenerManager.getTriggerListener("systemJobTriggerListener"); if(systemJobTriggerListener==null){ listenerManager.addTriggerListener(new SystemJobTriggerListener(), EverythingMatcher.allTriggers()); } } catch (SchedulerException e) { ret = false; e.printStackTrace(); } return ret; } private Boolean deleteTask(SysScheduleInfo sysScheduleInfo){ boolean ret = true; try { //触发器标识 TriggerKey triggerKey = TriggerKey.triggerKey(sysScheduleInfo.getTriggerName(),sysScheduleInfo.getTriggerGroup()); //任务标识 JobKey jobKey = JobKey.jobKey(sysScheduleInfo.getTaskName(),sysScheduleInfo.getTaskGroup()); //停止任务 scheduler.pauseJob(jobKey); //停止触发器 scheduler.pauseTrigger(triggerKey); //移除触发器 scheduler.unscheduleJob(triggerKey); //删除任务 scheduler.deleteJob(jobKey); } catch (SchedulerException e) { ret = false; e.printStackTrace(); } return ret; }
(7) 项目重启重置任务处理
import com.base.taskmanager.bean.SysScheduleInfo; import com.base.taskmanager.bean.SystemJobFactory; import com.base.taskmanager.bean.SystemJobTriggerListener; import com.base.taskmanager.dao.SysScheduleInfoMapper; import org.quartz.*; import org.quartz.impl.matchers.EverythingMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; /** * @CreateTime: 2019-05-21 09:12 * @Description: 项目初始化-在springboot的最后一步执行 * @Author: WH */ @Component public class DataCenerInitialization implements CommandLineRunner { Logger log = LoggerFactory.getLogger(getClass()); @Autowired private SysScheduleInfoMapper sysScheduleInfoMapper; @Autowired private Scheduler scheduler; @Override public void run(String... args) throws Exception { //项目重新启动的时候从新加载任务数据 List<SysScheduleInfo> sysScheduleInfos = sysScheduleInfoMapper.selectEffectiveData();//状态1,并且允许启动的数据 //添加监听 ListenerManager listenerManager = scheduler.getListenerManager(); TriggerListener systemJobTriggerListener = listenerManager.getTriggerListener("systemJobTriggerListener"); if(systemJobTriggerListener==null){ listenerManager.addTriggerListener(new SystemJobTriggerListener(), EverythingMatcher.allTriggers()); } //终止之前的所有任务 int start = 0; for (SysScheduleInfo sysScheduleInfo : sysScheduleInfos) { //触发器标识 TriggerKey triggerKey = TriggerKey.triggerKey(sysScheduleInfo.getTriggerName(),sysScheduleInfo.getTriggerGroup()); //任务标识 JobKey jobKey = JobKey.jobKey(sysScheduleInfo.getTaskName(),sysScheduleInfo.getTaskGroup()); //停止任务 scheduler.pauseJob(jobKey); //停止触发器 scheduler.pauseTrigger(triggerKey); //移除触发器 scheduler.unscheduleJob(triggerKey); //删除任务 scheduler.deleteJob(jobKey); start++; } //从新加载当前任务 int end = 0; for (SysScheduleInfo sysScheduleInfo : sysScheduleInfos) { try { //创建触发器 Trigger trigger = TriggerBuilder.newTrigger().withIdentity(sysScheduleInfo.getTriggerName(),sysScheduleInfo.getTaskGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(sysScheduleInfo.getTriggerCronExpression()).withMisfireHandlingInstructionDoNothing()) .startAt(date) .build(); //创建任务 JobDetail jobDetail = JobBuilder.newJob(SystemJobFactory.class) .withIdentity(sysScheduleInfo.getTaskName(),sysScheduleInfo.getTaskGroup()) .usingJobData("className",sysScheduleInfo.getExecuteClassName()) .usingJobData("methodName",sysScheduleInfo.getExecuteMethodName()) .build(); //调度作业 scheduler.scheduleJob(jobDetail, trigger); end++; } catch (SchedulerException e) { e.printStackTrace(); } } } }
(8) 配置quartz参数
spring: profiles: dev quartz: job-store-type: jdbc #数据库方式 jdbc: initialize-schema: never #不初始化表结构 properties: org: quartz: scheduler: instanceId: AUTO #默认主机名和时间戳生成实例ID,可以是任何字符串,但对于所有调度程序来说,必须是唯一的 对应qrtz_scheduler_state INSTANCE_NAME字段 instanceName: clusteredScheduler #quartzScheduler jobStore: acquireTriggersWithinLock: true class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置 driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #我们仅为数据库制作了特定于数据库的代理 useProperties: true #以指示JDBCJobStore将JobDataMaps中的所有值都作为字符串,因此可以作为名称 - 值对存储而不是在BLOB列中以其序列化形式存储更多复杂的对象。从长远来看,这是更安全的,因为您避免了将非String类序列化为BLOB的类版本问题。 tablePrefix: qrtz_ #数据库表前缀 misfireThreshold: 60000 #在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。默认值(如果您在配置中未输入此属性)为60000(60秒)。 clusterCheckinInterval: 5000 #设置此实例“检入”*与群集的其他实例的频率(以毫秒为单位)。影响检测失败实例的速度。 isClustered: false #打开群集功能,集群模式需要在多台服务器上做时间同步或者使用zookeeper去解决 threadPool: #连接池 class: org.quartz.simpl.SimpleThreadPool threadCount: 10 threadPriority: 5 threadsInheritContextClassLoaderOfInitializingThread: true startup-delay: 30 overwrite-existing-jobs: true
三. 目前的认识(个人理解)
因为是单服务器项目其实现在并不涉及集群部署,但是自己想尝试下动态配置quartz的感觉,所有就这几天复习了下基础,随便搭建了一个可以动态调度任务的简单服务出来;
在做的时候越遇到了一些坑,比如停止服务后重启服务时会重新执行停止期间的任务,动态调用直接使用quartz的sql表进行处理很难,需要建立中间表进行处理,对于整个quartz而已无非就是开启任务和关闭任务等;
关闭任务 = 移除任务 --> delete
暂停任务 = 移除任务 --> delete
开启任务 = 新建任务添加到调度中 --> add
停止服务器后,重启的时候直接移除调度队列中的所有任务,从新在数据库中间表把任务从新添加进去(初始化);
重启服务器后重启的问题,给服务增加延迟启动时间,在启动好在初始化数据库中间表中的任务,这样就不会启动过程中去执行停止服务期间遗漏的任务了;
多个CommandLineRunner可以被同时执行在同一个spring上下文中并且执行顺序是以order注解的参数顺序一致,用于初始化