【前情提要】由於項目需要,需要一個定時任務集群,故此有了這個spring-boot-starter-quartz集群的實踐。springboot的版本為:2.1.6.RELEASE;quartz的版本為:2.3.1.假如這里一共有兩個定時任務的節點,它們的代碼完全一樣。
壹.jar包依賴
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
這里選擇將定時任務的數據入庫,避免數據直接存在內存中,因應用重啟造成的數據丟失和做集群控制。
貳、項目配置
spring:
server:
port: 8080
servlet:
context-path: /lovin
datasource:
url: jdbc:mysql://127.0.0.1:3306/training?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
quartz:
job-store-type: jdbc #數據庫方式
jdbc:
initialize-schema: never #不初始化表結構
properties:
org:
quartz:
scheduler:
instanceId: AUTO #默認主機名和時間戳生成實例ID,可以是任何字符串,但對於所有調度程序來說,必須是唯一的 對應qrtz_scheduler_state INSTANCE_NAME字段
#instanceName: clusteredScheduler #quartzScheduler
jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX #持久化配置
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate #我們僅為數據庫制作了特定於數據庫的代理
useProperties: false #以指示JDBCJobStore將JobDataMaps中的所有值都作為字符串,因此可以作為名稱 - 值對存儲而不是在BLOB列中以其序列化形式存儲更多復雜的對象。從長遠來看,這是更安全的,因為您避免了將非String類序列化為BLOB的類版本問題。
tablePrefix: qrtz_ #數據庫表前綴
misfireThreshold: 60000 #在被認為“失火”之前,調度程序將“容忍”一個Triggers將其下一個啟動時間通過的毫秒數。默認值(如果您在配置中未輸入此屬性)為60000(60秒)。
clusterCheckinInterval: 5000 #設置此實例“檢入”*與群集的其他實例的頻率(以毫秒為單位)。影響檢測失敗實例的速度。
isClustered: true #打開群集功能
threadPool: #連接池
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
threadPriority: 5
threadsInheritContextClassLoaderOfInitializingThread: true
這里需要注意的是兩個節點的端口號應該不一致,避免沖突
叄、實現一個Job
@Slf4j
public class Job extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// 獲取參數
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// 業務邏輯 ...
log.info("------springbootquartzonejob執行"+jobDataMap.get("name").toString()+"###############"+jobExecutionContext.getTrigger());
}
其中的日志輸出是為了便於觀察任務執行情況
肆、封裝定時任務操作
@Service
public class QuartzService {
@Autowired
private Scheduler scheduler;
@PostConstruct
public void startScheduler() {
try {
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 增加一個job
*
* @param jobClass
* 任務實現類
* @param jobName
* 任務名稱
* @param jobGroupName
* 任務組名
* @param jobTime
* 時間表達式 (這是每隔多少秒為一次任務)
* @param jobTimes
* 運行的次數 (<0:表示不限次數)
* @param jobData
* 參數
*/
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
int jobTimes, Map jobData) {
try {
// 任務名稱和組構成任務key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
.build();
// 設置job參數
if(jobData!= null && jobData.size()>0){
jobDetail.getJobDataMap().putAll(jobData);
}
// 使用simpleTrigger規則
Trigger trigger = null;
if (jobTimes < 0) {
trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
.startNow().build();
} else {
trigger = TriggerBuilder
.newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
.startNow().build();
}
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 增加一個job
*
* @param jobClass
* 任務實現類
* @param jobName
* 任務名稱(建議唯一)
* @param jobGroupName
* 任務組名
* @param jobTime
* 時間表達式 (如:0/5 * * * * ? )
* @param jobData
* 參數
*/
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
try {
// 創建jobDetail實例,綁定Job實現類
// 指明job的名稱,所在組的名稱,以及綁定job類
// 任務名稱和組構成任務key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
.build();
// 設置job參數
if(jobData!= null && jobData.size()>0){
jobDetail.getJobDataMap().putAll(jobData);
}
// 定義調度觸發規則
// 使用cornTrigger規則
// 觸發器key
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
// 把作業和觸發器注冊到任務調度中
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 修改 一個job的 時間表達式
*
* @param jobName
* @param jobGroupName
* @param jobTime
*/
public void updateJob(String jobName, String jobGroupName, String jobTime) {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
// 重啟觸發器
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 刪除任務一個job
*
* @param jobName
* 任務名稱
* @param jobGroupName
* 任務組名
*/
public void deleteJob(String jobName, String jobGroupName) {
try {
scheduler.deleteJob(new JobKey(jobName, jobGroupName));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 暫停一個job
*
* @param jobName
* @param jobGroupName
*/
public void pauseJob(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 恢復一個job
*
* @param jobName
* @param jobGroupName
*/
public void resumeJob(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 立即執行一個job
*
* @param jobName
* @param jobGroupName
*/
public void runAJobNow(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* 獲取所有計划中的任務列表
*
* @return
*/
public List<Map<String, Object>> queryAllJob() {
List<Map<String, Object>> jobList = null;
try {
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
jobList = new ArrayList<Map<String, Object>>();
for (JobKey jobKey : jobKeys) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
Map<String, Object> map = new HashMap<>();
map.put("jobName", jobKey.getName());
map.put("jobGroupName", jobKey.getGroup());
map.put("description", "觸發器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression);
}
jobList.add(map);
}
}
} catch (SchedulerException e) {
e.printStackTrace();
}
return jobList;
}
/**
* 獲取所有正在運行的job
*
* @return
*/
public List<Map<String, Object>> queryRunJob() {
List<Map<String, Object>> jobList = null;
try {
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
Map<String, Object> map = new HashMap<String, Object>();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
map.put("jobName", jobKey.getName());
map.put("jobGroupName", jobKey.getGroup());
map.put("description", "觸發器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression);
}
jobList.add(map);
}
} catch (SchedulerException e) {
e.printStackTrace();
}
return jobList;
}
陸、初始化任務
這里不准備給用戶用web界面來配置定時任務,故此采用CommandLineRunner來子啊應用初始化的時候來初始化任務。只需要實現CommandLineRunner的run()方法即可。
@Override
public void run(String... args) throws Exception {
HashMap<String,Object> map = new HashMap<>();
map.put("name",1);
quartzService.deleteJob("job", "test");
quartzService.addJob(Job.class, "job", "test", "0 * * * * ?", map);
map.put("name",2);
quartzService.deleteJob("job2", "test");
quartzService.addJob(Job.class, "job2", "test", "10 * * * * ?", map);
map.put("name",3);
quartzService.deleteJob("job3", "test2");
quartzService.addJob(Job.class, "job3", "test2", "15 * * * * ?", map);
}
柒、測試驗證
分別夏侯啟動兩個應用,然后觀察任務執行,以及在運行過程中殺死某個服務,來觀察定時任務的執行。