前提
創業小團隊,無論選擇任何方案,都優先考慮節省成本。關於分布式定時調度框架,成熟的候選方案有XXL-JOB
、Easy Scheduler
、Light Task Scheduler
和Elastic Job
等等,其實這些之前都在生產環境使用過。但是想要搭建高可用的分布式調度平台,這些框架(無論是否去中心化)都需要額外的服務器資源去部署中心調度管理服務實例,甚至有時候還會依賴一些中間件如Zookeeper
。回想之前花過一段時間看Quartz
的源碼去分析它的線程模型,想到了它可以基於MySQL
,通過一個不是很推薦的X
鎖方案(SELECT FOR UPDATE
加鎖)實現服務集群中單個觸發器只有一個節點(加鎖成功的那個節點)能夠執行,這樣子,就能夠僅僅依賴於現有的MySQL
實例資源實現分布式調度任務管理。一般來說,有關系型數據保存需求的業務應用都會有自己的MySQL
實例,這樣子就能幾乎零成本引入一個分布式調度管理模塊。某個加班的周六下午敲定了初步方案之后,花了幾個小時把這個輪子造出來了,效果如下:
方案設計
先說說用到的所有依賴:
Uikit
:選用的前端的一個輕量級的UI
框架,主要是考慮到輕量、文檔和組件相對齊全。JQuery
:選用js
框架,原因只有一個:簡單。Freemarker
:模板引擎,主觀上比Jsp
和Thymeleaf
好用。Quartz
:工業級調度器。
項目的依賴如下:
<dependencies>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<exclusions>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
Uikit
和JQuery
可以直接使用現成的CDN
即可:
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/css/uikit.min.css"/>
<script src="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/js/uikit.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/js/uikit-icons.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
表設計
引入了Quartz
的依賴后,在它的org.quartz.impl.jdbcjobstore
包下可以看到一系列的DDL
,一般使用MySQL
的場景下關注tables_mysql.sql
和tables_mysql_innodb.sql
兩個文件即可,筆者所在團隊的開發規范MySQL
的引擎必須選擇innodb
,所以選用了后者。
應用中的定時任務信息應該單獨拎出來管理,方便提供統一的查詢和更變API
。值得注意的是,Quartz
內建的表使用了大量的外鍵,所以盡量通過Quartz
提供的API
去增刪改它內建表的內容,切勿手動操作,否則可能會引發各種意想不到的故障。
引入的兩個新的表包括調度任務表schedule_task
和調度任務參數表schedule_task_parameter
:
CREATE TABLE `schedule_task`
(
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
`creator` VARCHAR(16) NOT NULL DEFAULT 'admin' COMMENT '創建人',
`editor` VARCHAR(16) NOT NULL DEFAULT 'admin' COMMENT '修改人',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
`edit_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
`version` BIGINT NOT NULL DEFAULT 1 COMMENT '版本號',
`deleted` TINYINT NOT NULL DEFAULT 0 COMMENT '軟刪除標識',
`task_id` VARCHAR(64) NOT NULL COMMENT '任務標識',
`task_class` VARCHAR(256) NOT NULL COMMENT '任務類',
`task_type` VARCHAR(16) NOT NULL COMMENT '任務類型,CRON,SIMPLE',
`task_group` VARCHAR(32) NOT NULL DEFAULT 'DEFAULT' COMMENT '任務分組',
`task_expression` VARCHAR(256) NOT NULL COMMENT '任務表達式',
`task_description` VARCHAR(256) COMMENT '任務描述',
`task_status` TINYINT NOT NULL DEFAULT 0 COMMENT '任務狀態',
UNIQUE uniq_task_class_task_group (`task_class`, `task_group`),
UNIQUE uniq_task_id (`task_id`)
) COMMENT '調度任務';
CREATE TABLE `schedule_task_parameter`
(
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
`task_id` VARCHAR(64) NOT NULL COMMENT '任務標識',
`parameter_value` VARCHAR(1024) NOT NULL COMMENT '參數值',
UNIQUE uniq_task_id (`task_id`)
) COMMENT '調度任務參數';
參數統一用JSON
字符串存放,所以一個調度任務實體對應0或者1個調度任務參數實體。這里沒有考慮多個應用使用同一個數據源的問題,其實這個問題應該考慮基於不同的org.quartz.jobStore.tablePrefix
實現隔離,也就是不同的應用如果共庫,或者每個應用的Quartz
使用不同的表前綴區分,或者單獨抽離所有調度任務到同一個應用中。
Quartz的工作模式
Quartz
在設計調度模型的時候實際上是對觸發器Trigger
進行調度,一般在調度對應的任務Job
的時候,需要綁定觸發器和該被調度的任務實例,然后當觸發器到了觸發時間點的時候就會被激發,接着回調該觸發器關聯的Job
實例的execute()
方法。可以簡單理解為觸發器和Job
實例是多對多的關系。簡單來看就是這樣的:
為了實現這個多對多的關系,Quartz
為Job
(實際上是JobDetail
)和Trigger
分別定義了JobKey
和TriggerKey
用於作為兩者的唯一標識。
TriggerKey -> [name, group]
JobKey -> [name, group]
為了降低維護成本,筆者把這個多對多的綁定關系強制約束為一對一,並且把TriggerKey
和JobKey
同化如下:
JobKey,TriggerKey -> [jobClassName, ${spring.application.name} || applicationName]
實際上,調度相關的大部分工作都是委托給org.quartz.Scheduler
完成,舉下例子:
public interface Scheduler {
......省略無關的代碼......
// 添加調度任務 - 包括任務內容和觸發器
void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) throws SchedulerException;
// 移除觸發器
boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException;
// 移除任務內容
boolean deleteJob(JobKey jobKey) throws SchedulerException;
......省略無關的代碼......
}
筆者要做的,就是通過schedule_task
表管理服務的定時任務,通過org.quartz.Scheduler
提供的API
把任務的具體操作移交給Quartz
,並且添加一些擴展功能。這個模塊已經被封裝為一個輕量級的框架,命名為quartz-web-ui-kit
,下稱kit
。
kit核心邏輯分析
kit
的所有核心功能都封裝在模塊quartz-web-ui-kit-core
中,主要功能包括:
其中WebUI
部分是通過Freemarker
、JQuery
和Uikit
簡單編寫出來,主要包括三個頁面:
templates
- common/script.ftl 公共腳本
- task-add.ftl 添加新任務頁面
- task-edit.ftl 編輯任務頁面
- task-list.ftl 任務列表
調度任務管理的核心方法是QuartzWebUiKitService#refreshScheduleTask()
:
@Autowired
private Scheduler scheduler;
public void refreshScheduleTask(ScheduleTask task,
Trigger oldTrigger,
TriggerKey triggerKey,
Trigger newTrigger) throws Exception {
JobDataMap jobDataMap = prepareJobDataMap(task);
JobDetail jobDetail =
JobBuilder.newJob((Class<? extends Job>) Class.forName(task.getTaskClass()))
.withIdentity(task.getTaskClass(), task.getTaskGroup())
.usingJobData(jobDataMap)
.build();
// 總是覆蓋
if (ScheduleTaskStatus.ONLINE == ScheduleTaskStatus.fromType(task.getTaskStatus())) {
scheduler.scheduleJob(jobDetail, Collections.singleton(newTrigger), Boolean.TRUE);
} else {
if (null != oldTrigger) {
scheduler.unscheduleJob(triggerKey);
}
}
}
private JobDataMap prepareJobDataMap(ScheduleTask task) {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("scheduleTask", JsonUtils.X.format(task));
ScheduleTaskParameter taskParameter = scheduleTaskParameterDao.selectByTaskId(task.getTaskId());
if (null != taskParameter) {
Map<String, Object> parameterMap = JsonUtils.X.parse(taskParameter.getParameterValue(),
new TypeReference<Map<String, Object>>() {
});
jobDataMap.putAll(parameterMap);
}
return jobDataMap;
}
其實是任意任務觸發或者變動,都直接覆蓋對應的JobDetail
和Trigger
,這樣就能保證調度任務內容和觸發器都是全新的,下一輪調度就會生效。
任務類被抽象為AbstractScheduleTask
,這個類承載了任務執行和大量的擴展功能:
@DisallowConcurrentExecution
public abstract class AbstractScheduleTask implements Job {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Autowired(required = false)
private List<ScheduleTaskExecutionPostProcessor> processors;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
String scheduleTask = context.getMergedJobDataMap().getString("scheduleTask");
ScheduleTask task = JsonUtils.X.parse(scheduleTask, ScheduleTask.class);
ScheduleTaskInfo info = ScheduleTaskInfo.builder()
.taskId(task.getTaskId())
.taskClass(task.getTaskClass())
.taskDescription(task.getTaskDescription())
.taskExpression(task.getTaskExpression())
.taskGroup(task.getTaskGroup())
.taskType(task.getTaskType())
.build();
long start = System.currentTimeMillis();
info.setStart(start);
// 在MDC中添加traceId便於追蹤調用鏈
MappedDiagnosticContextAssistant.X.processInMappedDiagnosticContext(() -> {
try {
if (enableLogging()) {
logger.info("任務[{}]-[{}]-[{}]開始執行......", task.getTaskId(), task.getTaskClass(), task.getTaskDescription());
}
// 執行前的處理器回調
processBeforeTaskExecution(info);
// 子類實現的任務執行邏輯
executeInternal(context);
// 執行成功的處理器回調
processAfterTaskExecution(info, ScheduleTaskExecutionStatus.SUCCESS);
} catch (Exception e) {
info.setThrowable(e);
if (enableLogging()) {
logger.info("任務[{}]-[{}]-[{}]執行異常", task.getTaskId(), task.getTaskClass(),
task.getTaskDescription(), e);
}
// 執行異常的處理器回調
processAfterTaskExecution(info, ScheduleTaskExecutionStatus.FAIL);
} finally {
long end = System.currentTimeMillis();
long cost = end - start;
info.setEnd(end);
info.setCost(cost);
if (enableLogging() && null == info.getThrowable()) {
logger.info("任務[{}]-[{}]-[{}]執行完畢,耗時:{} ms......", task.getTaskId(), task.getTaskClass(),
task.getTaskDescription(), cost);
}
// 執行結束的處理器回調
processAfterTaskCompletion(info);
}
});
}
protected boolean enableLogging() {
return true;
}
/**
* 內部執行方法 - 子類實現
*
* @param context context
*/
protected abstract void executeInternal(JobExecutionContext context);
/**
* 拷貝任務信息
*/
private ScheduleTaskInfo copyScheduleTaskInfo(ScheduleTaskInfo info) {
return ScheduleTaskInfo.builder()
.cost(info.getCost())
.start(info.getStart())
.end(info.getEnd())
.throwable(info.getThrowable())
.taskId(info.getTaskId())
.taskClass(info.getTaskClass())
.taskDescription(info.getTaskDescription())
.taskExpression(info.getTaskExpression())
.taskGroup(info.getTaskGroup())
.taskType(info.getTaskType())
.build();
}
// 任務執行之前回調
void processBeforeTaskExecution(ScheduleTaskInfo info) {
if (null != processors) {
for (ScheduleTaskExecutionPostProcessor processor : processors) {
processor.beforeTaskExecution(copyScheduleTaskInfo(info));
}
}
}
// 任務執行完畢時回調
void processAfterTaskExecution(ScheduleTaskInfo info, ScheduleTaskExecutionStatus status) {
if (null != processors) {
for (ScheduleTaskExecutionPostProcessor processor : processors) {
processor.afterTaskExecution(copyScheduleTaskInfo(info), status);
}
}
}
// 任務完結時回調
void processAfterTaskCompletion(ScheduleTaskInfo info) {
if (null != processors) {
for (ScheduleTaskExecutionPostProcessor processor : processors) {
processor.afterTaskCompletion(copyScheduleTaskInfo(info));
}
}
}
}
需要執行的目標調度任務類只需要繼承AbstractScheduleTask
即可獲得這些功能。另外,調度任務后置處理器ScheduleTaskExecutionPostProcessor
參考了Spring
中的BeanPostProcessor
和TransactionSynchronization
的設計:
public interface ScheduleTaskExecutionPostProcessor {
default void beforeTaskExecution(ScheduleTaskInfo info) {
}
default void afterTaskExecution(ScheduleTaskInfo info, ScheduleTaskExecutionStatus status) {
}
default void afterTaskCompletion(ScheduleTaskInfo info) {
}
}
通過此后置處理器可以完成任務預警和任務執行日志持久化等各種功能。筆者通過ScheduleTaskExecutionPostProcessor
已經實現了內置的預警功能,抽象出一個預警策略接口AlarmStrategy
:
public interface AlarmStrategy {
void process(ScheduleTaskInfo scheduleTaskInfo);
}
// 默認啟用的實現是無預警策略
public class NoneAlarmStrategy implements AlarmStrategy {
@Override
public void process(ScheduleTaskInfo scheduleTaskInfo) {
}
}
通過覆蓋AlarmStrategy
的Bean
配置即可獲得自定義的預警策略,如:
@Slf4j
@Component
public class LoggingAlarmStrategy implements AlarmStrategy {
@Override
public void process(ScheduleTaskInfo scheduleTaskInfo) {
if (null != scheduleTaskInfo.getThrowable()) {
log.error("任務執行異常,任務內容:{}", JsonUtils.X.format(scheduleTaskInfo), scheduleTaskInfo.getThrowable());
}
}
}
筆者通過此接口的自定義現實,把所有的預警都打印到團隊內部的釘釘群中,打印了任務的執行時間、狀態以及耗時等等信息,一旦出現異常會及時@
所有人,便於及時監控任務的健康和后續的調優。
使用kit項目
quartz-web-ui-kit
的項目結構如下:
quartz-web-ui-kit
- quartz-web-ui-kit-core 核心包
- h2-example H2數據庫的演示例子
- mysql-5.x-example MySQL5.x版本的演示例子
- mysql-8.x-example MySQL8.x版本的演示例子
如果單純想體驗一下kit
的功能,那么直接下載此項目,啟動h2-example
模塊中的club.throwable.h2.example.H2App
,然后訪問http://localhost:8081/quartz/kit/task/list
即可。
基於MySQL
實例的應用,這里挑選目前用戶比較多的MySQL5.x
的例子簡單說明一下。因為輪子剛造好,沒有經過時間的考驗,暫時沒上交到Maven
的倉庫,這里需要進行手動編譯:
git clone https://github.com/zjcscut/quartz-web-ui-kit
cd quartz-web-ui-kit
mvn clean compile install
引入依賴(只需要引入quartz-web-ui-kit-core
,而且quartz-web-ui-kit-core
依賴於spring-boot-starter-web
、spring-boot-starter-web
、spring-boot-starter-jdbc
、spring-boot-starter-freemarker
和HikariCP
):
<dependency>
<groupId>club.throwable</groupId>
<artifactId>quartz-web-ui-kit-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 這個是必須,MySQL的驅動包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
添加一個配置實現QuartzWebUiKitConfiguration
:
@Configuration
public class QuartzWebUiKitConfiguration implements EnvironmentAware {
private Environment environment;
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Bean
public QuartzWebUiKitPropertiesProvider quartzWebUiKitPropertiesProvider() {
return () -> {
QuartzWebUiKitProperties properties = new QuartzWebUiKitProperties();
properties.setDriverClassName(environment.getProperty("spring.datasource.driver-class-name"));
properties.setUrl(environment.getProperty("spring.datasource.url"));
properties.setUsername(environment.getProperty("spring.datasource.username"));
properties.setPassword(environment.getProperty("spring.datasource.password"));
return properties;
};
}
}
這里由於quartz-web-ui-kit-core
設計時候考慮到部分組件的加載順序,使用了ImportBeanDefinitionRegistrar
鈎子接口,所以無法通過@Value
或者@Autowired
實現屬性注入,因為這兩個注解的處理順序比較靠后,如果用過MyBatis
的MapperScannerConfigurer
就會理解這里的問題。quartz-web-ui-kit-core
依賴中已經整理好一份DDL
腳本:
scripts
- quartz-h2.sql
- quartz-web-ui-kit-h2-ddl.sql
- quartz-mysql-innodb.sql
- quartz-web-ui-kit-mysql-ddl.sql
需要提前在目標數據庫執行quartz-mysql-innodb.sql
和quartz-web-ui-kit-mysql-ddl.sql
。一份相對標准的配置文件application.properties
如下:
spring.application.name=mysql-5.x-example
server.port=8082
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# 這個local是本地提前建好的數據庫
spring.datasource.url=jdbc:mysql://localhost:3306/local?characterEncoding=utf8&useUnicode=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# freemarker配置
spring.freemarker.template-loader-path=classpath:/templates/
spring.freemarker.cache=false
spring.freemarker.charset=UTF-8
spring.freemarker.check-template-location=true
spring.freemarker.content-type=text/html
spring.freemarker.expose-request-attributes=true
spring.freemarker.expose-session-attributes=true
spring.freemarker.request-context-attribute=request
spring.freemarker.suffix=.ftl
然后需要添加一個調度任務類,只需要繼承club.throwable.quartz.kit.support.AbstractScheduleTask
:
@Slf4j
public class CronTask extends AbstractScheduleTask {
@Override
protected void executeInternal(JobExecutionContext context) {
logger.info("CronTask觸發,TriggerKey:{}", context.getTrigger().getKey().toString());
}
}
接着啟動SpringBoot
的啟動類,然后訪問http://localhost:8082/quartz/kit/task/list
:
通過左側按鈕添加一個定時任務:
目前的任務表達式支持兩種類型:
CRON
表達式:格式是cron=你的CRON表達式
,如cron=*/20 * * * * ?
。- 簡單的周期性執行表達式:格式是
intervalInMilliseconds=毫秒值
,如intervalInMilliseconds=10000
,表示10000毫秒執行一次。
其他可選的參數有:
repeatCount
:表示簡單的周期性執行任務的重復次數,默認為Integer.MAX_VALUE
。startAt
:任務首次執行的時間戳。
關於任務表達式參數,沒有考慮十分嚴格的校驗,也沒有做字符串的trim
處理,需要輸入緊湊的符合約定格式的特定表達式,如:
cron=*/20 * * * * ?
intervalInMilliseconds=10000
intervalInMilliseconds=10000,repeatCount=10
調度任務還支持輸入用戶的自定義參數,目前簡單約定為JSON
字符串,這個字符串最后會通過Jackson
進行一次處理,再存放到任務的JobDataMap
中,實際上會被Quartz
持久化到數據庫中:
{"key":"value"}
這樣就能從JobExecutionContext#getMergedJobDataMap()
中獲得,例如:
@Slf4j
public class SimpleTask extends AbstractScheduleTask {
@Override
protected void executeInternal(JobExecutionContext context) {
JobDataMap jobDataMap = context.getMergedJobDataMap();
String value = jobDataMap.getString("key");
}
}
其他
關於kit
,有兩點設計是筆者基於團隊中維護的項目面對的場景做了特化處理:
AbstractScheduleTask
使用了@DisallowConcurrentExecution
注解,任務會禁用並發執行,也就是多節點的情況下,只會有一個服務節點在同一輪觸發時間下進行任務調度。CRON
類型的任務被禁用了Misfire
策略,也就是CRON
類型的任務如果錯失了觸發時機不會有任何操作(這一點可以了解一下Quartz
的Misfire
策略)。
如果不能忍受這兩點,切勿直接在生產中使用此工具包。
小結
本文簡單介紹了筆者通過Quartz
的加持造了一個輕量級分布式調度服務的輪子,起到了簡單易用和節省成本的效果。不足的是,因為考慮到目前團隊的項目中存在調度任務需求的服務都是內部的共享服務,筆者沒有花很大的精力去完善鑒權、監控等模塊,這里也是也是從目前遇到的業務場景考慮,如果引入過多的設計,就會演化成一個重量級的調度框架如Elastic-Job
,那樣會違背了節省部署成本的初衷。
quartz-web-ui-kit
項目Github
倉庫:https://github.com/zjcscut/quartz-web-ui-kit
(本文完 c-14-d e-a-20200410 最近太忙這個文章憋了很久......)
技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):