Spring Boot Quartz
主要內容
- Spring Scheduler 框架
- Quartz 框架,功能強大,配置靈活
- Quartz 集群
- mysql 持久化定時任務腳本(tables_mysql.sql)
介紹
在工程中時常會遇到一些需求,例如定時刷新一下配置、隔一段時間檢查下網絡狀態並發送郵件等諸如此類的定時任務。
定時任務本質就是一個異步的線程,線程可以查詢或修改並執行一系列的操作。由於本質是線程,在 Java 中可以自行編寫一個線程池對定時任務進行控制,但這樣效率太低了,且功能有限,屬於重復造輪子。
分布式任務調度應用場景
Quartz的集群功能通過故障轉移和負載平衡功能為您的調度程序帶來高可用性和可擴展性。
調度程序中會有很多定時任務需要執行,一台服務器已經不能滿足使用,需要解決定時任務單機單點故障問題。
用Quartz框架,在集群環境下,通過數據庫鎖機制來實現定時任務的執行;獨立的 Quartz 節點並不與另一其的節點或是管理節點通信。
Spring Scheduler 實現定時任務
1.定義 Task 類
/**
* Spring Scheduled示例
*/
@Component
public class ScheduledTask {
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private Integer count0 = 1;
private Integer count1 = 1;
private Integer count2 = 1;
@Scheduled(fixedRate = 5000)
public void reportCurrentTime() throws InterruptedException {
System.out.println(String.format("reportCurrentTime第%s次執行,當前時間為:%s", count0++, dateFormat.format(new Date())));
}
@Scheduled(fixedDelay = 5000)
public void reportCurrentTimeAfterSleep() throws InterruptedException {
System.out.println(String.format("reportCurrentTimeAfterSleep第%s次執行,當前時間為:%s", count1++, dateFormat.format(new Date())));
}
@Scheduled(cron = "0 0 1 * * *")
public void reportCurrentTimeCron() throws InterruptedException {
System.out.println(String.format("reportCurrentTimeCron第%s次執行,當前時間為:%s", count2++, dateFormat.format(new Date())));
}
}
2.啟動定時任務
在Spring Boot的主類中加入@EnableScheduling注解,啟用定時任務的配置
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableScheduling
public class ScheduledTaskTests {
@Test
public void test() {
log.info("啟動了ScheduledTask定時作業");
while (true) {
}
}
}
quartz實現分布式定時任務
quartz 是一個開源的分布式調度庫,它基於java實現。
> 它有着強大的調度功能,支持豐富多樣的調度方式,比如簡單調度,基於cron表達式的調度等等。
> 支持調度任務的多種持久化方式。比如支持內存存儲,數據庫存儲,Terracotta server 存儲。
> 支持分布式和集群能力。
> 采用JDBCJobStore方式存儲時,針對事務的處理方式支持全局事務(和業務服務共享同一個事務)和局部事務(quarzt 單獨管理自己的事務)
> 基於plugin機制以及listener機制支持靈活的擴展。
1.pom.xml配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- orm -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
2.spring-quartz.properties集群配置
#============================================================================
# 配置JobStore
#============================================================================
# JobDataMaps是否都為String類型,默認false
org.quartz.jobStore.useProperties=false
# 表的前綴,默認QRTZ_
org.quartz.jobStore.tablePrefix = QRTZ_
# 是否加入集群
org.quartz.jobStore.isClustered = true
# 調度實例失效的檢查時間間隔 ms
org.quartz.jobStore.clusterCheckinInterval = 5000
# 當設置為“true”時,此屬性告訴Quartz 在非托管JDBC連接上調用setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)。
org.quartz.jobStore.txIsolationLevelReadCommitted = true
# 數據保存方式為數據庫持久化
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
# 數據庫代理類,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以滿足大部分數據庫
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#============================================================================
# Scheduler 調度器屬性配置
#============================================================================
# 調度標識名 集群中每一個實例都必須使用相同的名稱
org.quartz.scheduler.instanceName = ClusterQuartz
# ID設置為自動獲取 每一個必須不同
org.quartz.scheduler.instanceId= AUTO
#============================================================================
# 配置ThreadPool
#============================================================================
# 線程池的實現類(一般使用SimpleThreadPool即可滿足幾乎所有用戶的需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
# 指定線程數,一般設置為1-100直接的整數,根據系統資源配置
org.quartz.threadPool.threadCount = 5
# 設置線程的優先級(可以是Thread.MIN_PRIORITY(即1)和Thread.MAX_PRIORITY(這是10)之間的任何int 。默認值為Thread.NORM_PRIORITY(5)。)
org.quartz.threadPool.threadPriority = 5
3.定義兩個job
- QuartzJob.java
//持久化
@PersistJobDataAfterExecution
//禁止並發執行(Quartz不要並發地執行同一個job定義(這里指一個job類的多個實例))
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
String taskName = context.getJobDetail().getJobDataMap().getString("name");
log.info("---> Quartz job {}, {} <----", new Date(), taskName);
}
}
- QuartzJob2.java
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob2 extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
String taskName = context.getJobDetail().getJobDataMap().getString("name");
log.info("---> Quartz job 2 {}, {} <----", new Date(), taskName);
}
}
4.初始化觸發器等信息,這里通過Listener初始化
@Slf4j
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
SchedulerConfig schedulerConfig;
public static AtomicInteger count = new AtomicInteger(0);
private static String TRIGGER_GROUP_NAME = "test_trriger";
private static String JOB_GROUP_NAME = "test_job";
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 防止重復執行
if (event.getApplicationContext().getParent() == null && count.incrementAndGet() <= 1) {
initMyJob();
}
}
public void initMyJob() {
Scheduler scheduler = null;
try {
scheduler = schedulerConfig.scheduler();
TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == trigger) {
Class clazz = QuartzJob.class;
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, trigger);
log.info("Quartz 創建了job:...:{}", jobDetail.getKey());
} else {
log.info("job已存在:{}", trigger.getKey());
}
TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
if (null == trigger2) {
Class clazz = QuartzJob2.class;
JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/15 * * * * ?");
trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail2, trigger2);
log.info("Quartz 創建了job:...:{}", jobDetail2.getKey());
} else {
log.info("job已存在:{}", trigger2.getKey());
}
scheduler.start();
} catch (Exception e) {
log.info(e.getMessage());
}
}
}
5.啟動定時器
啟動兩個Application,分別是示例中的DemoQuartzApplication和DemoQuartzApplication2,會發現,兩個Job會分別在兩個應用執行。
當手動停止一個應用的時候,另一個應用會自動接管所有任務並繼續執行,如果任務太多,我們可以再開一台服務即可。實現了調度任務的高可用性和可擴展性
運行效果如圖: