假設場景:單體應用的定時任務,假設我們已經有了一個搭建好的springboot應用,但是需要添加一個定時執行的部分(比如筆者遇到的是定時去請求一個接口數據來更新某個表),這樣作為開發人員,筆者選擇了最簡單的方法,也就是springboot自帶的定時器。
1、使用@Scheduled
demo的結構如下:
啟動器:
package com.wh.timerdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
//開啟定時任務
public class TimerdemoApplication {
public static void main(String[] args) {
SpringApplication.run(TimerdemoApplication.class, args);
}
}
定時器工具類:
package com.wh.timerdemo.util;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* scheduler定時器執行任務的類
*/
public class TimerUtil {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
/**
* 每2s執行一次
*/
fixedRate = 5000) (
public void taskOne(){
System.out.println("定時任務1執行!!!執行時間:" + dateFormat.format(new Date()));
}
/**
* 每天凌晨3:15執行一次
*/
cron = "0 15 03 ? * *")//cron的格式會在后面貼出 (
public void taskTwo(){
System.out.println("定時任務2執行!!!執行時間:" + dateFormat.format(new Date()));
}
}
輸出結果:
最簡單的定時任務就實現啦~
假設場景:分布式應用的定時任務。當這個項目做了一半、第一版即將發布線上時,我司的上雲行動也進行到了白熱化階段,於是筆者就遇到了這樣一個問題:多個實例的定時任務是會同時執行的,這樣不僅會消耗資源,而且可能還會引起數據庫鎖。這時我就想到了quartz。但是要注意,使用quartz的前提是集群的時間要設置統一。
2、使用分布式定時任務框架quartz
首先quartz本身是支持分布式的,通過表來管理各節點之間的關系。
1、去quartz官網下載最新的包 http://www.quartz-scheduler.org/
2、下載之后解壓,進入如下目錄,創建數據庫表
quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables並選擇對應的數據庫SQL(筆者使用的是MySQL數據庫)
3、在pom文件中引入依賴
<!--quartz依賴-->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
4、創建 quartz.proiperties
配置文件
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.makeSchedulerThreadDaemon=true
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemons=true
#線程數量
org.quartz.threadPool.threadCount:20
#線程優先級
org.quartz.threadPool.threadPriority:5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
#特別注意:此處是quartz的數據源,報錯就debug跟蹤一下查看dbName
org.quartz.jobStore.dataSource = springTxDataSource.schedulerFactoryBean
#加入集群
org.quartz.jobStore.isClustered=true
#容許的最大作業延
org.quartz.jobStore.misfireThreshold=25000
#調度實例失效的檢查時間間隔
org.quartz.jobStore.clusterCheckinInterval: 5000
5、quartz的初始化配置,讀取配置文件
package com.wh.timerdemo.config;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
public class QuartzConfig {
// 配置文件路徑
private static final String QUARTZ_CONFIG = "/quartz.properties";
// 按照自己注入的數據源自行修改
"writeDataSource") (
private DataSource dataSource;
private AutoWiredSpringBeanToJobFactory autoWiredSpringBeanToJobFactory;
/**
* 從quartz.properties文件中讀取Quartz配置屬性
* @return
* @throws IOException
*/
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_CONFIG));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/**
* JobFactory與schedulerFactoryBean中的JobFactory相互依賴,注意bean的名稱
* 在這里為JobFactory注入了Spring上下文
*
* @param applicationContext
* @return
*/
public JobFactory buttonJobFactory(ApplicationContext applicationContext) {
AutoWiredSpringBeanToJobFactory jobFactory = new AutoWiredSpringBeanToJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(autoWiredSpringBeanToJobFactory);
factory.setOverwriteExistingJobs(true);
factory.setAutoStartup(true); // 設置自行啟動
// 延時啟動,應用啟動1秒后
factory.setStartupDelay(1);
factory.setQuartzProperties(quartzProperties());
factory.setDataSource(dataSource);// 使用應用的dataSource替換quartz的dataSource
return factory;
}
}
6、將任務工廠注入到Spring
package com.wh.timerdemo.config;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
/**
* 為JobFactory注入SpringBean,否則Job無法使用Spring創建的bean
*/
public class AutoWiredSpringBeanToJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
7、創建任務調度管理,任務的增刪改查,起動停止等。
package com.wh.timerdemo.config;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.List;
/**
* quartz的調度器 包含了任務的增刪改查 可以配置在頁面上調度任務 這里就省略了
*/
public class QuartzManager {
private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();
private Scheduler scheduler = null;
/**
* @Description: 添加一個定時任務
*
* @param jobName 任務名
* @param jobGroupName 任務組名
* @param triggerName 觸發器名
* @param triggerGroupName 觸發器組名
* @param jobClass 任務
* @param cron 時間設置,參考quartz說明文檔
*/
"unchecked", "rawtypes" }) ({
public static void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, String cron) {
try {
// 任務名,任務組,任務執行類
Scheduler scheduler = schedulerFactory.getScheduler();
JobDetail jobDetail= JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
// 觸發器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 觸發器名,觸發器組
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 觸發器時間設定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 創建Trigger對象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();
// 調度容器設置JobDetail和Trigger
scheduler.scheduleJob(jobDetail, trigger);
// 啟動
if (!scheduler.isShutdown()) {
scheduler.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 修改一個任務的觸發時間
*
* @param jobName
* @param jobGroupName
* @param triggerName 觸發器名
* @param triggerGroupName 觸發器組名
* @param cron 時間設置,參考quartz說明文檔
*/
public static void modifyJobTime(String jobName,String jobGroupName, String triggerName, String triggerGroupName, String cron) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(cron)) {
System.out.println("任務:"+jobName+"被修改");
/** 方式一 :調用 rescheduleJob 開始 */
/* // 觸發器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 觸發器名,觸發器組
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 觸發器時間設定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 創建Trigger對象
trigger = (CronTrigger) triggerBuilder.build();
// 方式一 :修改一個任務的觸發時間
scheduler.rescheduleJob(triggerKey, trigger);*/
/** 方式一 :調用 rescheduleJob 結束 */
/** 方式二:先刪除,然后在創建一個新的Job */
JobDetail jobDetail = scheduler.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
Class<? extends Job> jobClass = jobDetail.getJobClass();
removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass,cron);
/** 方式二 :先刪除,然后在創建一個新的Job */
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 移除一個任務
*
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
*/
public static void removeJob(String jobName, String jobGroupName,String triggerName, String triggerGroupName) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
scheduler.pauseTrigger(triggerKey);// 停止觸發器
scheduler.unscheduleJob(triggerKey);// 移除觸發器
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 刪除任務
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:啟動所有定時任務
*/
public static void startJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:關閉所有定時任務
*/
public static void shutdownJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
if (!scheduler.isShutdown()) {
scheduler.shutdown();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 獲取當前正在執行的任務
* @return
*/
public static boolean getCurrentJobs(String name){
try {
Scheduler scheduler = schedulerFactory.getScheduler();
List<JobExecutionContext> jobContexts = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext context : jobContexts) {
if (name.equals(context.getTrigger().getJobKey().getName())) {
return true;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}
public Scheduler getScheduler() {
return scheduler;
}
public void setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}
}
8、創建一個執行的Job,這里包含定時任務執行的邏輯
package com.wh.timerdemo.task;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @DisallowConcurrentExecution : 此標記用在實現Job的類上面,意思是不允許並發執行.
* 注org.quartz.threadPool.threadCount的數量有多個的情況,@DisallowConcurrentExecution才生效
*/
public class ButtonTimerJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(ButtonTimerJob.class);
/**
* 核心方法,Quartz Job真正的執行邏輯。
* @throws JobExecutionException execute()方法只允許拋出JobExecutionException異常
*/
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("--------------定時任務執行邏輯---------------------");
}
}
9、創建啟動Job類:負責任務的創建啟動和配置cron等
package com.wh.timerdemo.task;
import com.wh.timerdemo.config.QuartzManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
/**
* 定時任務的啟動類
*/
public class StartJob implements ApplicationListener<ContextRefreshedEvent> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void run() {
logger.info(">> 啟動定時任務...");
// QuartzManager.startJobs();
QuartzManager.addJob(
"SpecialPeriodJob",
"SpecialPeriodJobGroup",
"SpecialPeriodTrigger",
"SpecialPeriodTriggerGroup",
ButtonTimerJob.class,
"0/30 * * * * ?");
}
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
System.out.println("啟動定時任務......");
run();
}
}
啟動springboot,此時就不需要 @EnableScheduling
注解啦。
執行結果:
雖然IDEA控制台打印的信息顯示我們不是集群啟動,但是上線后查看日志,定時任務確實實現了三個實例但是只有一個會運行。
需要注意的是:任務第一次啟動后就會存入數據庫,再次啟動的時候任務已經存在,就不需要再添加一個任務了,直接執行啟動任務即可。由於quartz的特性,即使集群中有一個服務掛掉了,其他的定時任務仍會接替執行。
4、擴展
附錄1:cron語法——引用自https://www.cnblogs.com/linjiqin/archive/2013/07/08/3178452.html
0 0 10,14,16 * * ? 每天上午10點,下午2點,4點
0 0/30 9-17 * * ? 朝九晚五工作時間內每半小時
0 0 12 ? * WED 表示每個星期三中午12點
"0 0 12 * * ?" 每天中午12點觸發
"0 15 10 ? * *" 每天上午10:15觸發
"0 15 10 * * ?" 每天上午10:15觸發
"0 15 10 * * ? *" 每天上午10:15觸發
"0 15 10 * * ? 2005" 2005年的每天上午10:15觸發
"0 * 14 * * ?" 在每天下午2點到下午2:59期間的每1分鍾觸發
"0 0/5 14 * * ?" 在每天下午2點到下午2:55期間的每5分鍾觸發
"0 0/5 14,18 * * ?" 在每天下午2點到2:55期間和下午6點到6:55期間的每5分鍾觸發
"0 0-5 14 * * ?" 在每天下午2點到下午2:05期間的每1分鍾觸發
"0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44觸發
"0 15 10 ? * MON-FRI" 周一至周五的上午10:15觸發
"0 15 10 15 * ?" 每月15日上午10:15觸發
"0 15 10 L * ?" 每月最后一日的上午10:15觸發
"0 15 10 ? * 6L" 每月的最后一個星期五上午10:15觸發
"0 15 10 ? * 6L 2002-2005" 2002年至2005年的每月的最后一個星期五上午10:15觸發
"0 15 10 ? * 6#3" 每月的第三個星期五上午10:15觸發
附錄2:quartz各張表的作用——引用自https://blog.csdn.net/yhhyhhyhhyhh/article/details/84235374
qrtz_blob_triggers : 以Blob 類型存儲的觸發器。
qrtz_calendars:存放日歷信息, quartz可配置一個日歷來指定一個時間范圍。
qrtz_cron_triggers:存放cron類型的觸發器。
qrtz_fired_triggers:存放已觸發的觸發器。
qrtz_job_details:存放一個jobDetail信息。
qrtz_locks: 存儲程序的悲觀鎖的信息(假如使用了悲觀鎖)。
qrtz_paused_trigger_graps:存放暫停掉的觸發器。
qrtz_scheduler_state:調度器狀態。
qrtz_simple_triggers:簡單觸發器的信息。
qrtz_trigger_listeners:觸發器監聽器。
qrtz_triggers:觸發器的基本信息。
cron方式需要用到的4張數據表:
qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details
附錄3:quartz的工作原理——引用自https://blog.51cto.com/simplelife/2314620?source=drh
Quartz實際並不關心你是在相同還是不同的機器上運行節點。當集群放置在不同的機器上時,稱之為水平集群。節點跑在同一台機器上時,稱之為垂直集群。對於垂直集群,存在着單點故障的問題。這對高可用性的應用來說是無法接受的,因為一旦機器崩潰了,所有的節點也就被終止了。對於水平集群,存在着時間同步問題。
節點用時間戳來通知其他實例它自己的最后檢入時間。假如節點的時鍾被設置為將來的時間,那么運行中的Scheduler將再也意識不到那個結點已經宕掉了。另一方面,如果某個節點的時鍾被設置為過去的時間,也許另一節點就會認定那個節點已宕掉並試圖接過它的Job重運行。最簡單的同步計算機時鍾的方式是使用某一個Internet時間服務器(Internet Time Server ITS)。
節點爭搶Job問題:
因為Quartz使用了一個隨機的負載均衡算法, Job以隨機的方式由不同的實例執行。Quartz官網上提到當前,還不存在一個方法來指派(釘住) 一個 Job 到集群中特定的節點。
可以看出采用了Quartz集群采用了悲觀鎖的方式對triggers表進行行加鎖, 以保證任務同步的正確性。