org.quartz實現定時任務並自定義切換任務數據源
在工作中經常會需要使用到定時任務處理各種周期性的任務,org.quartz是處理此類定時任務的一個優秀框架。隨着項目一點點推進,此時我們並不滿足於任務僅僅是定時執行,我們還想要對任務進行更多的控制,隨時能對任務進行人為干預,就需要對quartz有更深入的了解。而隨着微服務的流行,項目中多數據源的情況也越來越常見,在定時任務中集成多數據源切換的功能也需要集成進來。
集成quartz實現定時任務
quartz中實現定時任務需要了解的基本概念
Job
通過實現Job
類,在實現方法中寫我們具體想要定時任務完成的工作,然后交給quartz
管理。
JobDetail
Job
只負責實現具體任務,所以還需要借助JobDetail
來存儲一些描述Job
的基本信息。
Quartz JobBuilder
為構造JobDetail
實體提供的builder-style API
。你可以這樣使用它來構建一個JobDetail
:
@Bean
public JobDetail jobDetail() {
return JobBuilder.newJob().ofType(SampleJob.class)
.storeDurably()
.withIdentity("Qrtz_Job_Detail")
.withDescription("Invoke Sample Job service...")
.build();
}
Spring JobDetailFactoryBean
在Spring
中配置JobDetail
的方式:
@Bean
public JobDetailFactoryBean jobDetail() {
JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
jobDetailFactory.setJobClass(SampleJob.class);
jobDetailFactory.setDescription("Invoke Sample Job service...");
jobDetailFactory.setDurability(true);
return jobDetailFactory;
}
Trigger
觸發器,代表一個調度參數的配置,什么時候去調度:
@Bean
public Trigger trigger(JobDetail job) {
return TriggerBuilder.newTrigger().forJob(job)
.withIdentity("Qrtz_Trigger")
.withDescription("Sample trigger")
.withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
.build();
}
Scheduler
調度器,通過Job
和Trigger
來注冊一個調度器:
@Bean
public Scheduler scheduler(Trigger trigger, JobDetail job) {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(new ClassPathResource("quartz.properties").getInputStream());
Scheduler scheduler = factory.getScheduler();
scheduler.setJobFactory(springBeanJobFactory());
scheduler.scheduleJob(job, trigger);
scheduler.start();
return scheduler;
}
給系統添加一個Job
在quartz
中Job
就是我們需要去執行的任務,由Scheduler
調度器負責調度任務們依靠制定好的Trigger
來定時執行任務。
因此首先我們需要結合以上基礎給系統添加一個Job。
addJob
public void addJob(BaseJob job) throws SchedulerException {
/** 創建JobDetail實例,綁定Job實現類
* JobDetail 表示一個具體的可執行的調度程序,job是這個可執行調度程序所要執行的內容
* 另外JobDetail還包含了這個任務調度的方案和策略**/
// 指明job的名稱,所在組的名稱,以及綁定job類
JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())
.withIdentity(job.getJobKey())
.withDescription(job.getDescription())
.usingJobData(job.getDataMap())
.build();
/**
* Trigger代表一個調度參數的配置,什么時候去調度
*/
//定義調度觸發規則, 使用cronTrigger規則
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(job.getJobName(),job.getJobGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
.startNow()
.build();
//將任務和觸發器注冊到任務調度中去
scheduler.scheduleJob(jobDetail,trigger);
//判斷調度器是否啟動
if(!scheduler.isStarted()){
scheduler.start();
}
log.info(String.format("定時任務:%s.%s-已添加到調度器!", job.getJobGroup(),job.getJobName()));
}
首先需要定義好我們的Job,之后通過Job初始化JobDetail
和Trigger
,最后將JobDetail
和Trigger
注冊到調度器中。
BaseJob
Job
的結構如下:
public abstract class BaseJob implements Job,Serializable {
private static final long serialVersionUID = 1L;
private static final String JOB_MAP_KEY = "self";
/**
* 任務名稱
*/
private String jobName;
/**
* 任務分組
*/
private String jobGroup;
/**
* 任務狀態 是否啟動任務
*/
private String jobStatus;
/**
* cron表達式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 任務執行時調用哪個類的方法 包名+類名
*/
private Class beanClass = this.getClass();
/**
* 任務是否有狀態
*/
private String isConcurrent;
/**
* Spring bean
*/
private String springBean;
/**
* 任務調用的方法名
*/
private String methodName;
/**
* 該任務所使用的數據源
*/
private String dataSource = DataSourceEnum.DB1.getName();
/**
* 為了將執行后的任務持久化到數據庫中
*/
@JsonIgnore
private JobDataMap dataMap = new JobDataMap();
public JobKey getJobKey(){
return JobKey.jobKey(jobName, jobGroup);// 任務名稱和組構成任務key
}
...
}
可以看到Job
中定義了任務的一些基本信息,重點關注其中的dataSource
和dataMap
屬性。其中dataSource
是任務所使用的數據源,並給了一個默認值;由於任務在添加后會持久化到數據庫中,之后解析任務就會用到dataMap
。
SchedulerConfig
在添加Job
的時候,JobDetail
和Trigger
都是通過關鍵字new
生成的,而調度器Scheduler
則需要放在容器中維護。
@Configuration
@Order
public class SchedulerConfig {
@Autowired
private MyJobFactory myJobFactory;
@Value("${spring.profiles.active}")
private String profile;
/*
* 通過SchedulerFactoryBean獲取Scheduler的實例
*/
@Bean(name = "scheduler")
public Scheduler scheduler() throws Exception {
return schedulerFactoryBean().getScheduler();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setOverwriteExistingJobs(true);
// 延時啟動
factory.setStartupDelay(20);
// 加載quartz數據源配置
factory.setQuartzProperties(quartzProperties());
// 自定義Job Factory,用於Spring注入
factory.setJobFactory(myJobFactory);
/*********全局監聽器配置************/
JobListener myJobListener = new SchedulerListener();
factory.setGlobalJobListeners(myJobListener);//直接添加為全局監聽器
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
if (Util.PRODUCT.equals(profile)) {//正式環境
System.out.println("正式環境quartz配置");
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));
} else {
System.out.println("測試環境quartz配置");
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
}
//在quartz.properties中的屬性被讀取並注入后再初始化對象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/*
* quartz初始化監聽器
*/
@Bean
public QuartzInitializerListener executorListener() {
return new QuartzInitializerListener();
}
}
上述代碼中,將scheduler
加入到Spring
容器中。scheduler
是由SchedulerFactoryBean
進行維護的,在SchedulerFactoryBean
中對調度器工廠做了一些基本設置並從配置文件中加載了quartz數據源配置(配置文件的讀取會根據運行環境profile
來進行自動切換),配置了一個全局監聽器用以監聽任務的執行過程。
MyJobFactory
使用Spring提供的JobFactory
。
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
// 調用父類的方法
Object jobInstance = super.createJobInstance(bundle);
// 進行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
quartz.properties
quartz.properties
中是quartz連接數據庫的一些配置信息。
# \u56FA\u5B9A\u524D\u7F00org.quartz
# \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206
#
#
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
# \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
# threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B
# \u5E76\u53D1\u4E2A\u6570
org.quartz.threadPool.threadCount = 5
# \u4F18\u5148\u7EA7
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
org.quartz.jobStore.misfireThreshold = 5000
# \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
#\u6301\u4E45\u5316
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#org.quartz.jobStore.useProperties=false
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = qzDS
org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true
org.quartz.dataSource.qzDS.user=quartz
org.quartz.dataSource.qzDS.password=123456
org.quartz.dataSource.qzDS.maxConnections = 30
org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL
org.quartz.dataSource.qzDS.validateOnCheckout = true
org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40
#org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60
quartz
會根據這個配置文件將Job
持久化到數據庫中,也因此quartz
會需要初始化一些數據庫表,表結構文件在文末。
SchedulerListener
調度器監聽器用以監聽任務的執行狀態。
public class SchedulerListener implements JobListener {
private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);
public static final String LISTENER_NAME = "QuartSchedulerListener";
@Override
public String getName() {
return LISTENER_NAME; //must return a name
}
//任務被調度前
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");
// 切換任務的數據源
DataSourceContextHolder.setDB(dataSource);
String jobName = context.getJobDetail().getKey().toString();
LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());
}
//任務調度被拒了
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
LOG.error("job {} is jobExecutionVetoed", jobName);
//可以做一些日志記錄原因
}
//任務被調度后
@Override
public void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException) {
// 清空存儲的數據源
String jobName = context.getJobDetail().getKey().toString();
DataSourceContextHolder.clearDB();
LOG.info("Job : {} is finished", jobName);
if (jobException != null && !jobException.getMessage().equals("")) {
LOG.error("Exception thrown by: " + jobName
+ " Exception: " + jobException.getMessage());
}
}
}
SchedulerListener
監聽任務被調度前、調度后和調度被拒絕時的狀態,在任務被調度之前和之后對任務所使用的數據源進行了處理。如果項目中不需要數據源切換的話,這個監聽器是不需要的,到此已經完成了quartz
的集成。
多數據源切換
通過自定義DynamicDataSource
來覆蓋Spring Boot中原有的數據源。
DataSourceConfig
通過讀取配置文件中不同的數據源,初始化項目中可能用到的數據源用以切換。
/**
* 多數據源配置類
*/
@Configuration
public class DataSourceConfig {
//數據源1
@Bean(name = "datasource1")
@ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中對應屬性的前綴
public DataSource dataSource1() {
return DataSourceBuilder.create().build();
}
//數據源2
@Bean(name = "datasource2")
@ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中對應屬性的前綴
public DataSource dataSource2() {
return DataSourceBuilder.create().build();
}
/**
* 動態數據源: 通過AOP在不同數據源之間動態切換
*
* @return
*/
@Primary
@Bean(name = "dynamicDataSource")
public DataSource dynamicDataSource() {
DynamicDataSource dynamicDataSource = new DynamicDataSource();
// 默認數據源
dynamicDataSource.setDefaultTargetDataSource(dataSource1());
// 配置多數據源
Map<Object, Object> dsMap = new HashMap();
dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());
dsMap.put(DataSourceEnum.DB2.getName(), dataSource2());
dynamicDataSource.setTargetDataSources(dsMap);
return dynamicDataSource;
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
//設置數據源
sqlSessionFactoryBean.setDataSource(dataSource);
return sqlSessionFactoryBean.getObject();
}
/**
* 配置@Transactional注解事物
*
* @return
*/
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dynamicDataSource());
}
}
數據源配置
spring:
datasource:
db1:
driver-class-name: com.mysql.cj.jdbc.Driver
username: doctor
password: 123456
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true
db2:
driver-class-name: com.mysql.cj.jdbc.Driver
username: quartz
password: 123456
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true
DataSourceContextHolder
由於quartz
在執行過程中是通過不同的線程來執行Job
的,因此此處通過ThreadLocal
來保存線程所使用的數據源情況。
/**
* 保存本地數據源
*/
public class DataSourceContextHolder {
private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);
/**
* 默認數據源
*/
public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();
/**
* ThreadLocal之后會進行講解
*/
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
// 設置數據源名
public static void setDB(String dbType) {
LOG.info("切換到{}數據源", dbType);
contextHolder.set(dbType);
}
// 獲取數據源名
public static String getDB() {
return (contextHolder.get());
}
// 清除數據源名
public static void clearDB() {
contextHolder.remove();
}
}
DynamicDataSource
獲取執行中所使用的數據源。由於數據源被保存在了DataSourceContextHolder
中的ThreadLocal
中,所以直接獲取就行了。
/**
* 獲取本地數據源
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class);
@Override
protected Object determineCurrentLookupKey() {
LOG.info("數據源為{}", DataSourceContextHolder.getDB());
return DataSourceContextHolder.getDB();
}
}
至此就完成了集成quartz
及數據源切換的功能。然后就是具體的任務了。
執行任務
具體的任務需要繼承BaseJob
並在execute
方法中重寫具體需要執行的任務。
execute
@Slf4j
@Service
public class ReadNumJob extends BaseJob {
@Autowired
private RedisService redisService;
@Autowired
private JdbcTemplate jdbcTemplate;
private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class);
@Override
public void execute(JobExecutionContext context) {
doSomething();
}
}
指定數據源
然后在添加任務時指定任務所使用的數據源
ReadNumJob job = new ReadNumJob();
job.setJobName("test");
job.setJobGroup("hys");
job.setDescription("test");
// 指定數據源
job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());
job.setCronExpression(
"0 */1 * * * ?"
);
try {
jobAndTriggerService.addJob(job);
} catch (SchedulerException e) {
e.printStackTrace();
}
轉評贊就是最大的鼓勵