基於Quartz編寫一個可復用的分布式調度任務管理WebUI組件


前提

創業小團隊,無論選擇任何方案,都優先考慮節省成本。關於分布式定時調度框架,成熟的候選方案有XXL-JOBEasy SchedulerLight Task SchedulerElastic Job等等,其實這些之前都在生產環境使用過。但是想要搭建高可用的分布式調度平台,這些框架(無論是否去中心化)都需要額外的服務器資源去部署中心調度管理服務實例,甚至有時候還會依賴一些中間件如Zookeeper。回想之前花過一段時間看Quartz的源碼去分析它的線程模型,想到了它可以基於MySQL,通過一個不是很推薦的X鎖方案(SELECT FOR UPDATE加鎖)實現服務集群中單個觸發器只有一個節點(加鎖成功的那個節點)能夠執行,這樣子,就能夠僅僅依賴於現有的MySQL實例資源實現分布式調度任務管理。一般來說,有關系型數據保存需求的業務應用都會有自己的MySQL實例,這樣子就能幾乎零成本引入一個分布式調度管理模塊。某個加班的周六下午敲定了初步方案之后,花了幾個小時把這個輪子造出來了,效果如下:

方案設計

先說說用到的所有依賴:

  • Uikit:選用的前端的一個輕量級的UI框架,主要是考慮到輕量、文檔和組件相對齊全。
  • JQuery:選用js框架,原因只有一個:簡單。
  • Freemarker:模板引擎,主觀上比JspThymeleaf好用。
  • Quartz:工業級調度器。

項目的依賴如下:

<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <exclusions>
            <exclusion>
                <groupId>com.zaxxer</groupId>
                <artifactId>HikariCP-java7</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-freemarker</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

UikitJQuery可以直接使用現成的CDN即可:

<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/css/uikit.min.css"/>
<script src="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/js/uikit.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/uikit@3.2.2/dist/js/uikit-icons.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>

表設計

引入了Quartz的依賴后,在它的org.quartz.impl.jdbcjobstore包下可以看到一系列的DDL,一般使用MySQL的場景下關注tables_mysql.sqltables_mysql_innodb.sql兩個文件即可,筆者所在團隊的開發規范MySQL的引擎必須選擇innodb,所以選用了后者。

應用中的定時任務信息應該單獨拎出來管理,方便提供統一的查詢和更變API。值得注意的是,Quartz內建的表使用了大量的外鍵,所以盡量通過Quartz提供的API去增刪改它內建表的內容,切勿手動操作,否則可能會引發各種意想不到的故障。

引入的兩個新的表包括調度任務表schedule_task和調度任務參數表schedule_task_parameter

CREATE TABLE `schedule_task`
(
    `id`               BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
    `creator`          VARCHAR(16)     NOT NULL DEFAULT 'admin' COMMENT '創建人',
    `editor`           VARCHAR(16)     NOT NULL DEFAULT 'admin' COMMENT '修改人',
    `create_time`      DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
    `edit_time`        DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
    `version`          BIGINT          NOT NULL DEFAULT 1 COMMENT '版本號',
    `deleted`          TINYINT         NOT NULL DEFAULT 0 COMMENT '軟刪除標識',
    `task_id`          VARCHAR(64)     NOT NULL COMMENT '任務標識',
    `task_class`       VARCHAR(256)    NOT NULL COMMENT '任務類',
    `task_type`        VARCHAR(16)     NOT NULL COMMENT '任務類型,CRON,SIMPLE',
    `task_group`       VARCHAR(32)     NOT NULL DEFAULT 'DEFAULT' COMMENT '任務分組',
    `task_expression`  VARCHAR(256)    NOT NULL COMMENT '任務表達式',
    `task_description` VARCHAR(256) COMMENT '任務描述',
    `task_status`      TINYINT         NOT NULL DEFAULT 0 COMMENT '任務狀態',
    UNIQUE uniq_task_class_task_group (`task_class`, `task_group`),
    UNIQUE uniq_task_id (`task_id`)
) COMMENT '調度任務';

CREATE TABLE `schedule_task_parameter`
(
    `id`              BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
    `task_id`         VARCHAR(64)     NOT NULL COMMENT '任務標識',
    `parameter_value` VARCHAR(1024)   NOT NULL COMMENT '參數值',
    UNIQUE uniq_task_id (`task_id`)
) COMMENT '調度任務參數';

參數統一用JSON字符串存放,所以一個調度任務實體對應0或者1個調度任務參數實體。這里沒有考慮多個應用使用同一個數據源的問題,其實這個問題應該考慮基於不同的org.quartz.jobStore.tablePrefix實現隔離,也就是不同的應用如果共庫,或者每個應用的Quartz使用不同的表前綴區分,或者單獨抽離所有調度任務到同一個應用中。

Quartz的工作模式

Quartz在設計調度模型的時候實際上是對觸發器Trigger進行調度,一般在調度對應的任務Job的時候,需要綁定觸發器和該被調度的任務實例,然后當觸發器到了觸發時間點的時候就會被激發,接着回調該觸發器關聯的Job實例的execute()方法。可以簡單理解為觸發器和Job實例是多對多的關系。簡單來看就是這樣的:

為了實現這個多對多的關系,QuartzJob(實際上是JobDetail)和Trigger分別定義了JobKeyTriggerKey用於作為兩者的唯一標識。

TriggerKey -> [name, group]
JobKey -> [name, group]

為了降低維護成本,筆者把這個多對多的綁定關系強制約束為一對一,並且把TriggerKeyJobKey同化如下:

JobKey,TriggerKey -> [jobClassName, ${spring.application.name} || applicationName]

實際上,調度相關的大部分工作都是委托給org.quartz.Scheduler完成,舉下例子:

public interface Scheduler {
    ......省略無關的代碼......
    // 添加調度任務 - 包括任務內容和觸發器
    void scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) throws SchedulerException;

    // 移除觸發器
    boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException;
    
    // 移除任務內容
    boolean deleteJob(JobKey jobKey) throws SchedulerException;
    ......省略無關的代碼......
}

筆者要做的,就是通過schedule_task表管理服務的定時任務,通過org.quartz.Scheduler提供的API把任務的具體操作移交給Quartz,並且添加一些擴展功能。這個模塊已經被封裝為一個輕量級的框架,命名為quartz-web-ui-kit,下稱kit

kit核心邏輯分析

kit的所有核心功能都封裝在模塊quartz-web-ui-kit-core中,主要功能包括:

其中WebUI部分是通過FreemarkerJQueryUikit簡單編寫出來,主要包括三個頁面:

templates
  - common/script.ftl 公共腳本
  - task-add.ftl  添加新任務頁面
  - task-edit.ftl 編輯任務頁面
  - task-list.ftl 任務列表

調度任務管理的核心方法是QuartzWebUiKitService#refreshScheduleTask()


@Autowired
private Scheduler scheduler;

public void refreshScheduleTask(ScheduleTask task,
                                Trigger oldTrigger,
                                TriggerKey triggerKey,
                                Trigger newTrigger) throws Exception {
    JobDataMap jobDataMap = prepareJobDataMap(task);
    JobDetail jobDetail =
            JobBuilder.newJob((Class<? extends Job>) Class.forName(task.getTaskClass()))
                    .withIdentity(task.getTaskClass(), task.getTaskGroup())
                    .usingJobData(jobDataMap)
                    .build();
    // 總是覆蓋
    if (ScheduleTaskStatus.ONLINE == ScheduleTaskStatus.fromType(task.getTaskStatus())) {
        scheduler.scheduleJob(jobDetail, Collections.singleton(newTrigger), Boolean.TRUE);
    } else {
        if (null != oldTrigger) {
            scheduler.unscheduleJob(triggerKey);
        }
    }
}

private JobDataMap prepareJobDataMap(ScheduleTask task) {
    JobDataMap jobDataMap = new JobDataMap();
    jobDataMap.put("scheduleTask", JsonUtils.X.format(task));
    ScheduleTaskParameter taskParameter = scheduleTaskParameterDao.selectByTaskId(task.getTaskId());
    if (null != taskParameter) {
        Map<String, Object> parameterMap = JsonUtils.X.parse(taskParameter.getParameterValue(),
                new TypeReference<Map<String, Object>>() {
                });
        jobDataMap.putAll(parameterMap);
    }
    return jobDataMap;
}

其實是任意任務觸發或者變動,都直接覆蓋對應的JobDetailTrigger,這樣就能保證調度任務內容和觸發器都是全新的,下一輪調度就會生效。

任務類被抽象為AbstractScheduleTask,這個類承載了任務執行和大量的擴展功能:

@DisallowConcurrentExecution
public abstract class AbstractScheduleTask implements Job {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired(required = false)
    private List<ScheduleTaskExecutionPostProcessor> processors;

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        String scheduleTask = context.getMergedJobDataMap().getString("scheduleTask");
        ScheduleTask task = JsonUtils.X.parse(scheduleTask, ScheduleTask.class);
        ScheduleTaskInfo info = ScheduleTaskInfo.builder()
                .taskId(task.getTaskId())
                .taskClass(task.getTaskClass())
                .taskDescription(task.getTaskDescription())
                .taskExpression(task.getTaskExpression())
                .taskGroup(task.getTaskGroup())
                .taskType(task.getTaskType())
                .build();
        long start = System.currentTimeMillis();
        info.setStart(start);
        // 在MDC中添加traceId便於追蹤調用鏈
        MappedDiagnosticContextAssistant.X.processInMappedDiagnosticContext(() -> {
            try {
                if (enableLogging()) {
                    logger.info("任務[{}]-[{}]-[{}]開始執行......", task.getTaskId(), task.getTaskClass(), task.getTaskDescription());
                }
                // 執行前的處理器回調
                processBeforeTaskExecution(info);
                // 子類實現的任務執行邏輯
                executeInternal(context);
                // 執行成功的處理器回調
                processAfterTaskExecution(info, ScheduleTaskExecutionStatus.SUCCESS);
            } catch (Exception e) {
                info.setThrowable(e);
                if (enableLogging()) {
                    logger.info("任務[{}]-[{}]-[{}]執行異常", task.getTaskId(), task.getTaskClass(),
                            task.getTaskDescription(), e);
                }
                // 執行異常的處理器回調
                processAfterTaskExecution(info, ScheduleTaskExecutionStatus.FAIL);
            } finally {
                long end = System.currentTimeMillis();
                long cost = end - start;
                info.setEnd(end);
                info.setCost(cost);
                if (enableLogging() && null == info.getThrowable()) {
                    logger.info("任務[{}]-[{}]-[{}]執行完畢,耗時:{} ms......", task.getTaskId(), task.getTaskClass(),
                            task.getTaskDescription(), cost);
                }
                // 執行結束的處理器回調
                processAfterTaskCompletion(info);
            }
        });
    }

    protected boolean enableLogging() {
        return true;
    }

    /**
     * 內部執行方法 - 子類實現
     *
     * @param context context
     */
    protected abstract void executeInternal(JobExecutionContext context);

    /**
     * 拷貝任務信息
     */
    private ScheduleTaskInfo copyScheduleTaskInfo(ScheduleTaskInfo info) {
        return ScheduleTaskInfo.builder()
                .cost(info.getCost())
                .start(info.getStart())
                .end(info.getEnd())
                .throwable(info.getThrowable())
                .taskId(info.getTaskId())
                .taskClass(info.getTaskClass())
                .taskDescription(info.getTaskDescription())
                .taskExpression(info.getTaskExpression())
                .taskGroup(info.getTaskGroup())
                .taskType(info.getTaskType())
                .build();
    }
    
    // 任務執行之前回調
    void processBeforeTaskExecution(ScheduleTaskInfo info) {
        if (null != processors) {
            for (ScheduleTaskExecutionPostProcessor processor : processors) {
                processor.beforeTaskExecution(copyScheduleTaskInfo(info));
            }
        }
    }
    
    // 任務執行完畢時回調
    void processAfterTaskExecution(ScheduleTaskInfo info, ScheduleTaskExecutionStatus status) {
        if (null != processors) {
            for (ScheduleTaskExecutionPostProcessor processor : processors) {
                processor.afterTaskExecution(copyScheduleTaskInfo(info), status);
            }
        }
    }
    
    // 任務完結時回調
    void processAfterTaskCompletion(ScheduleTaskInfo info) {
        if (null != processors) {
            for (ScheduleTaskExecutionPostProcessor processor : processors) {
                processor.afterTaskCompletion(copyScheduleTaskInfo(info));
            }
        }
    }
}

需要執行的目標調度任務類只需要繼承AbstractScheduleTask即可獲得這些功能。另外,調度任務后置處理器ScheduleTaskExecutionPostProcessor參考了Spring中的BeanPostProcessorTransactionSynchronization的設計:

public interface ScheduleTaskExecutionPostProcessor {
    
    default void beforeTaskExecution(ScheduleTaskInfo info) {

    }

    default void afterTaskExecution(ScheduleTaskInfo info, ScheduleTaskExecutionStatus status) {

    }

    default void afterTaskCompletion(ScheduleTaskInfo info) {

    }
}

通過此后置處理器可以完成任務預警和任務執行日志持久化等各種功能。筆者通過ScheduleTaskExecutionPostProcessor已經實現了內置的預警功能,抽象出一個預警策略接口AlarmStrategy

public interface AlarmStrategy {

    void process(ScheduleTaskInfo scheduleTaskInfo);
}

// 默認啟用的實現是無預警策略
public class NoneAlarmStrategy implements AlarmStrategy {

    @Override
    public void process(ScheduleTaskInfo scheduleTaskInfo) {

    }
}

通過覆蓋AlarmStrategyBean配置即可獲得自定義的預警策略,如:

@Slf4j
@Component
public class LoggingAlarmStrategy implements AlarmStrategy {

    @Override
    public void process(ScheduleTaskInfo scheduleTaskInfo) {
        if (null != scheduleTaskInfo.getThrowable()) {
            log.error("任務執行異常,任務內容:{}", JsonUtils.X.format(scheduleTaskInfo), scheduleTaskInfo.getThrowable());
        }
    }
}

筆者通過此接口的自定義現實,把所有的預警都打印到團隊內部的釘釘群中,打印了任務的執行時間、狀態以及耗時等等信息,一旦出現異常會及時@所有人,便於及時監控任務的健康和后續的調優。

使用kit項目

quartz-web-ui-kit的項目結構如下:

quartz-web-ui-kit
  - quartz-web-ui-kit-core 核心包
  - h2-example H2數據庫的演示例子
  - mysql-5.x-example MySQL5.x版本的演示例子
  - mysql-8.x-example MySQL8.x版本的演示例子

如果單純想體驗一下kit的功能,那么直接下載此項目,啟動h2-example模塊中的club.throwable.h2.example.H2App,然后訪問http://localhost:8081/quartz/kit/task/list即可。

基於MySQL實例的應用,這里挑選目前用戶比較多的MySQL5.x的例子簡單說明一下。因為輪子剛造好,沒有經過時間的考驗,暫時沒上交到Maven的倉庫,這里需要進行手動編譯:

git clone https://github.com/zjcscut/quartz-web-ui-kit
cd quartz-web-ui-kit
mvn clean compile install

引入依賴(只需要引入quartz-web-ui-kit-core,而且quartz-web-ui-kit-core依賴於spring-boot-starter-webspring-boot-starter-webspring-boot-starter-jdbcspring-boot-starter-freemarkerHikariCP):

<dependency>
    <groupId>club.throwable</groupId>
    <artifactId>quartz-web-ui-kit-core</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>
<!-- 這個是必須,MySQL的驅動包 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
</dependency>

添加一個配置實現QuartzWebUiKitConfiguration

@Configuration
public class QuartzWebUiKitConfiguration implements EnvironmentAware {

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    @Bean
    public QuartzWebUiKitPropertiesProvider quartzWebUiKitPropertiesProvider() {
        return () -> {
            QuartzWebUiKitProperties properties = new QuartzWebUiKitProperties();
            properties.setDriverClassName(environment.getProperty("spring.datasource.driver-class-name"));
            properties.setUrl(environment.getProperty("spring.datasource.url"));
            properties.setUsername(environment.getProperty("spring.datasource.username"));
            properties.setPassword(environment.getProperty("spring.datasource.password"));
            return properties;
        };
    }
}

這里由於quartz-web-ui-kit-core設計時候考慮到部分組件的加載順序,使用了ImportBeanDefinitionRegistrar鈎子接口,所以無法通過@Value或者@Autowired實現屬性注入,因為這兩個注解的處理順序比較靠后,如果用過MyBatisMapperScannerConfigurer就會理解這里的問題。quartz-web-ui-kit-core依賴中已經整理好一份DDL腳本:

scripts
  - quartz-h2.sql
  - quartz-web-ui-kit-h2-ddl.sql
  - quartz-mysql-innodb.sql
  - quartz-web-ui-kit-mysql-ddl.sql

需要提前在目標數據庫執行quartz-mysql-innodb.sqlquartz-web-ui-kit-mysql-ddl.sql。一份相對標准的配置文件application.properties如下:

spring.application.name=mysql-5.x-example
server.port=8082
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# 這個local是本地提前建好的數據庫
spring.datasource.url=jdbc:mysql://localhost:3306/local?characterEncoding=utf8&useUnicode=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=root
# freemarker配置
spring.freemarker.template-loader-path=classpath:/templates/
spring.freemarker.cache=false
spring.freemarker.charset=UTF-8
spring.freemarker.check-template-location=true
spring.freemarker.content-type=text/html
spring.freemarker.expose-request-attributes=true
spring.freemarker.expose-session-attributes=true
spring.freemarker.request-context-attribute=request
spring.freemarker.suffix=.ftl

然后需要添加一個調度任務類,只需要繼承club.throwable.quartz.kit.support.AbstractScheduleTask

@Slf4j
public class CronTask extends AbstractScheduleTask {

    @Override
    protected void executeInternal(JobExecutionContext context) {
        logger.info("CronTask觸發,TriggerKey:{}", context.getTrigger().getKey().toString());
    }
}

接着啟動SpringBoot的啟動類,然后訪問http://localhost:8082/quartz/kit/task/list

通過左側按鈕添加一個定時任務:

目前的任務表達式支持兩種類型:

  • CRON表達式:格式是cron=你的CRON表達式,如cron=*/20 * * * * ?
  • 簡單的周期性執行表達式:格式是intervalInMilliseconds=毫秒值,如intervalInMilliseconds=10000,表示10000毫秒執行一次。

其他可選的參數有:

  • repeatCount:表示簡單的周期性執行任務的重復次數,默認為Integer.MAX_VALUE
  • startAt:任務首次執行的時間戳。

關於任務表達式參數,沒有考慮十分嚴格的校驗,也沒有做字符串的trim處理,需要輸入緊湊的符合約定格式的特定表達式,如:

cron=*/20 * * * * ?

intervalInMilliseconds=10000

intervalInMilliseconds=10000,repeatCount=10

調度任務還支持輸入用戶的自定義參數,目前簡單約定為JSON字符串,這個字符串最后會通過Jackson進行一次處理,再存放到任務的JobDataMap中,實際上會被Quartz持久化到數據庫中:

{"key":"value"}

這樣就能從JobExecutionContext#getMergedJobDataMap()中獲得,例如:

@Slf4j
public class SimpleTask extends AbstractScheduleTask {

    @Override
    protected void executeInternal(JobExecutionContext context) {
        JobDataMap jobDataMap = context.getMergedJobDataMap();
        String value = jobDataMap.getString("key");
    }
}

其他

關於kit,有兩點設計是筆者基於團隊中維護的項目面對的場景做了特化處理:

  1. AbstractScheduleTask使用了@DisallowConcurrentExecution注解,任務會禁用並發執行,也就是多節點的情況下,只會有一個服務節點在同一輪觸發時間下進行任務調度。
  2. CRON類型的任務被禁用了Misfire策略,也就是CRON類型的任務如果錯失了觸發時機不會有任何操作(這一點可以了解一下QuartzMisfire策略)。

如果不能忍受這兩點,切勿直接在生產中使用此工具包。

小結

本文簡單介紹了筆者通過Quartz的加持造了一個輕量級分布式調度服務的輪子,起到了簡單易用和節省成本的效果。不足的是,因為考慮到目前團隊的項目中存在調度任務需求的服務都是內部的共享服務,筆者沒有花很大的精力去完善鑒權、監控等模塊,這里也是也是從目前遇到的業務場景考慮,如果引入過多的設計,就會演化成一個重量級的調度框架如Elastic-Job,那樣會違背了節省部署成本的初衷。

(本文完 c-14-d e-a-20200410 最近太忙這個文章憋了很久......)

技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM