前言
現在大多數項目都使用了springboot,所以本文主要講springboot與quartz的完美整合,簡化配置、持久化數據並自定義quartz數據源。
正文
一、增加依賴
我們使用的spring-boot-starter-quartz,所以不用顯示指定版本號:
<!--quartz相關依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>
二、yml配置信息
quartz: job-store-type: jdbc jdbc: initialize-schema: never properties: org: quartz: scheduler: #在集群中每個實例都必須有一個唯一的instanceId,但是應該有一個相同的instanceName instanceId: AUTO instanceName: um-scheduler skipUpdateCheck: true #是否跳過Quartz版本更新檢查。如果檢查並且找到更新,則會在Quartz的日志中報告它。生產部署要禁止 jobStore: acquireTriggersWithinLock: true #獲取trigger的時候是否上鎖,默認false采用樂觀鎖,但有可能出現ABA導致重復調度 #此存儲機制用於Quartz獨立於應用容器的事務管理,如果是Tomcat容器管理的數據源,那我們定義的事物也不會傳播給Quartz框架內部。 #通俗的講就是不管我們的Service服務本身業務代碼是否執行成功,只要代碼中調用了Quartz API的數據庫操作,那任務狀態就永久持久化了, #就算業務代碼拋出運行時異常任務狀態也不會回滾到之前的狀態。與之相反的是JobStoreCMT。 class: org.quartz.impl.jdbcjobstore.JobStoreTX driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #JDBC代理類 useProperties: true #讓JDBCJobStore將JobDataMaps中的所有值都作為字符串,因此可以作為鍵值對存儲而不是在BLOB列中以其序列化形式存儲,從而避免序列化的版本問題 tablePrefix: QRTZ_ #數據庫表前綴 misfireThreshold: 60_000 #超過這個時間還未觸發的trigger,就被認為發生了misfire,默認60s。job成功觸發叫fire,misfire就是未成功觸發。 isClustered: true #是否開啟群集,集群模式需要在多台服務器上做時間同步或者使用zookeeper去解決 clusterCheckinInterval: 20_000 #定義了Scheduler實例檢入到數據庫中的頻率(單位:毫秒)。 threadPool: class: org.quartz.simpl.SimpleThreadPool #SimpleThreadPool這個線程池只是簡單地在它的池中保持固定數量的線程,不增長也不縮小。但是它非常健壯且經過良好的測試,差不多每個Quartz用戶都使用這個池 threadCount: 10 #最大線程數,意味着最多有多少個job可以同時執行 threadPriority: 5 #線程優先級 threadsInheritContextClassLoaderOfInitializingThread: true #線程上下文類加載器是否繼承自初始線程的加載器 startup-delay: 60 #延時啟動,要有足夠長的時間讓你的應用先啟動完成后再讓Scheduler啟動(單位秒) overwrite-existing-jobs: true #是否每次系統運行都會清空數據庫中的Job信息,重新進行初始化
數據源配置:
spring: # 數據庫配置 datasource: type: com.alibaba.druid.pool.DruidDataSource dynamic: primary: master #設置默認的數據源或者數據源組,默認值即為master datasource: master: url: xxx driver-class-name: com.mysql.cj.jdbc.Driver username: root password: xxx quartz: url: xxx driver-class-name: com.mysql.cj.jdbc.Driver username: root
自定生成表結構,需要配置如下信息:
spring.quartz.jdbc.initialize-schema: always
spring.quartz.job-store-type: jdbc
項目啟動后生成的表信息:
三、定時任務邏輯封裝
1.QuartzConfig定時任務配置類。
mport org.quartz.spi.JobFactory; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.scheduling.quartz.SpringBeanJobFactory; import javax.sql.DataSource; import java.io.IOException; /** * @version v1.0 * @description: * @author: 47 on 2020/4/9 14:32 */ @Configuration public class QuartzConfig { public void setSchedulerFactoryBeanProperties(SchedulerFactoryBean schedulerFactoryBean, DataSource dataSource, JobFactory jobFactory) throws IOException { schedulerFactoryBean.setJobFactory(jobFactory); //設置數據源 schedulerFactoryBean.setDataSource(dataSource); } /** * 自定義JobFactory,以便Job類里可以使用Spring類注入 */ @Bean public JobFactory jobFactory() { return new QuartzSpringBeanJobFactory(); } private class QuartzSpringBeanJobFactory extends SpringBeanJobFactory { private AutowireCapableBeanFactory beanFactory; @Override public void setApplicationContext(ApplicationContext context) { beanFactory = context.getAutowireCapableBeanFactory(); } @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { final Object job = super.createJobInstance(bundle); beanFactory.autowireBean(job); return job; } } }
2.定義SchedulerJob計划任務類
/** * @version v1.0 * @description: * @author: 47 on 2020/4/9 14:32 */ public class SchedulerJob { private String name; private String group; private String cron; private String jobClass; private String desc; //間隔時長 private Long interval; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getCron() { return cron; } public void setCron(String cron) { this.cron = cron; } public String getJobClass() { return jobClass; } public void setJobClass(String jobClass) { this.jobClass = jobClass; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } public Long getInterval() { return interval; } public void setInterval(Long interval) { this.interval = interval; } }
3.定義SchedulerJobs (用來主要注入配置文件里多個定時任務)
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.List; /** * @version v1.0 * @description: * @author: 47 on 2020/4/9 14:26 */ @Component @ConfigurationProperties(prefix = "quartz") public class SchedulerJobs { private List<SchedulerJob> jobs; public List<SchedulerJob> getJobs() { return jobs; } public void setJobs(List<SchedulerJob> jobs) { this.jobs = jobs; } }
4.封裝定時任務的方法 SchedulerManager
import com.gamer.um.quartz.configure.SchedulerJob; import com.gamer.um.quartz.jobs.MonitorCronJob; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * @version v1.0 * @description: * @author: 47 on 2020/4/9 11:37 */ @Component public class SchedulerManager { private final static Logger logger = LoggerFactory.getLogger(SchedulerManager.class); @Autowired private Scheduler scheduler; /** * 激活任務 * @param schedulerJob */ public void activeJob(SchedulerJob schedulerJob){ JobKey jobKey = JobKey.jobKey(schedulerJob.getName(), schedulerJob.getGroup()); try { if (scheduler.checkExists(jobKey) && !MonitorCronJob.JOB_NAME.equals(schedulerJob.getName())) { updateJob(schedulerJob); }else { createJob(schedulerJob); } } catch (SchedulerException e) { logger.error("activeJob {}", e); } } /** * 創建任務並加入調度 * @param schedulerJob */ public void createJob(SchedulerJob schedulerJob){ JobKey jobKey = JobKey.jobKey(schedulerJob.getName(), schedulerJob.getGroup()); try { if (scheduler.checkExists(jobKey)) { return; } Class<?> clazz = Class.forName(schedulerJob.getJobClass()); JobDetail jobDetail = getJobDetail(schedulerJob, (Class<Job>) clazz); Trigger cronTrigger = getCronTrigger(schedulerJob); //加入調度器 scheduler.scheduleJob(jobDetail, cronTrigger); } catch (ClassNotFoundException | SchedulerException e) { logger.error("createJob {}", e); } } /** * 更新任務觸發器 * @param schedulerJob */ public void updateJob(SchedulerJob schedulerJob){ TriggerKey triggerKey = TriggerKey.triggerKey(schedulerJob.getName(), schedulerJob.getGroup()); try { Trigger trigger = scheduler.getTrigger(triggerKey); if (trigger == null) { return; } JobKey jobKey = trigger.getJobKey(); //查詢cron String oldCron = ((CronTrigger)trigger).getCronExpression(); //沒有變化則返回 if (oldCron.equals(schedulerJob.getCron())){ return; } Trigger cronTrigger = getCronTrigger(schedulerJob); //加入調度器 scheduler.rescheduleJob(triggerKey, cronTrigger); } catch (SchedulerException e) { logger.error("updateJob {}", e); } } public void deleteJobs(List<JobKey> jobKeys) { try { scheduler.deleteJobs(jobKeys); } catch (SchedulerException e) { logger.error("deleteJobs {}", e); } } /** * 創建任務 * @param schedulerJob * @param clazz * @return */ private JobDetail getJobDetail(SchedulerJob schedulerJob, Class<Job> clazz) { return JobBuilder.newJob() .ofType(clazz) .withIdentity(schedulerJob.getName(), schedulerJob.getGroup()) .withDescription(schedulerJob.getDesc()) .build(); } /** * 創建觸發器 * @param schedulerJob * @return */ private Trigger getCronTrigger(SchedulerJob schedulerJob) { CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(schedulerJob.getCron()); if (!MonitorCronJob.JOB_NAME.equals(schedulerJob.getName())){ //任務錯過執行策略,以錯過的第一個頻率時間立刻開始執行,重做錯過的所有頻率周期后,當下一次觸發頻率發生時間大於當前時間后,再按照正常的Cron頻率依次執行 cronScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires(); } return TriggerBuilder.newTrigger() .withIdentity(schedulerJob.getName(), schedulerJob.getGroup()) .withDescription(schedulerJob.getDesc()) .withSchedule(cronScheduleBuilder) .build(); } }
5.監控其他定時任務的總任務MonitorCronJob(用於監控cron的更新)
package com.gamer.um.quartz.jobs; import com.gamer.um.quartz.SchedulerManager; import com.gamer.um.quartz.configure.SchedulerJob; import com.gamer.um.quartz.configure.SchedulerJobs; import com.gamer.um.quartz.utils.LicUtil; import org.quartz.*; import org.quartz.impl.matchers.GroupMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.context.refresh.ContextRefresher; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; /** * @version v1.0 * @description: * @author: 47 on 2020/4/9 15:07 */ public class MonitorCronJob implements Job { private final static Logger logger = LoggerFactory.getLogger(MonitorCronJob.class); public static final String JOB_NAME = "monitor_cron"; public static final String GROUP_NAME = "monitor"; public static final String CRON = "0 0/10 * * * ?"; public static final String DESC = "監控cron更新"; @Autowired private SchedulerManager schedulerManager; @Autowired private SchedulerJobs schedulerJobs; @Autowired private ContextRefresher contextRefresher; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { //重新加載配置 contextRefresher.refresh(); Set<JobKey> oldJobKeys = null; try { oldJobKeys = jobExecutionContext.getScheduler().getJobKeys(GroupMatcher.anyJobGroup()); } catch (SchedulerException e) { logger.error("MonitorCronJob {}", e); } List<String> newJobKeys = new ArrayList<>(); for (SchedulerJob job : schedulerJobs.getJobs()) { //過濾掉monitor_cron任務 if (job.getName().equals(JOB_NAME)) { continue; } newJobKeys.add(job.getName()); logger.info("job【{}】,cron【{}】", job.getName(), job.getCron()); schedulerManager.activeJob(job); } if (oldJobKeys == null) { return; } //刪除沒有配置的任務 List<JobKey> shouldDeleteJobKeys = oldJobKeys.stream() .filter(jobKey -> !JOB_NAME.equals(jobKey.getName()) && !newJobKeys.contains(jobKey.getName())) .collect(Collectors.toList()); logger.info("delete jobs {}", shouldDeleteJobKeys); schedulerManager.deleteJobs(shouldDeleteJobKeys); } }
6.配置一個定時任務(以后其他都不管,直接在配置文件里新加其他的定時任務即可)
#定時任務 quartz: jobs: - name: myName #(隨便取任務名) group: collect cron: 0 0/5 * * * ? * jobClass: com.gamer.me.quartz.jobs.MyJob #(自己的定時任務的執行類,也就是你寫業務代碼的類) desc: 我的任務
7.設置啟動項目就初始化定時任務(主要是上面的監控cron的類需要初始化)
import com.gamer.um.quartz.configure.SchedulerJob; import com.gamer.um.quartz.jobs.MonitorCronJob; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; /** * @version v1.0 * @description: * @author: 47 on 2020/4/9 17:12 */ @Component public class Initialization implements ApplicationRunner { @Autowired private SchedulerManager schedulerManager; @Override public void run(ApplicationArguments args) throws Exception { SchedulerJob schedulerJob = new SchedulerJob(); schedulerJob.setName(MonitorCronJob.JOB_NAME); schedulerJob.setGroup(MonitorCronJob.GROUP_NAME); schedulerJob.setCron(MonitorCronJob.CRON); schedulerJob.setDesc(MonitorCronJob.DESC); schedulerJob.setJobClass(MonitorCronJob.class.getName()); schedulerManager.activeJob(schedulerJob); } }
8.測試
public class MyJob implements Job { @Autowired private SchedulerManager schedulerManager;
測試時需要去過濾下當前Jobs里的job是當前執行的(下面會給出代碼)
.......你的業務代碼
}
@Override
public SchedulerJob getSchedulerJobDetail(Class jobClass) {
List<SchedulerJob> jobs = schedulerJobs.getJobs();
if (Objects.isNull(jobs)) {
return null;
}
SchedulerJob job = jobs.stream()
.filter(schedulerJob -> Objects.equals(schedulerJob.getJobClass(), jobClass.getName()))
.findFirst()
.orElse(null);
return job;
}