SpringBoot整合Quartz 實現分布式定時任務調度


一、Quartz 集群架構

Quartz 是 Java 領域最著名的開源任務調度工具。

在上篇文章中,我們詳細的介紹了 Quartz 的單體應用實踐,如果只在單體環境中應用,Quartz 未必是最好的選擇,例如Spring Scheduled一樣也可以實現任務調度,並且與SpringBoot無縫集成,支持注解配置,非常簡單,但是它有個缺點就是在集群環境下,會導致任務被重復調度!

而與之對應的 Quartz 提供了極為廣用的特性,如任務持久化、集群部署和分布式調度任務等等,正因如此,基於 Quartz 任務調度功能在系統開發中應用極為廣泛!

在集群環境下,Quartz 集群中的每個節點是一個獨立的 Quartz 應用,沒有負責集中管理的節點,而是通過數據庫表來感知另一個應用,利用數據庫鎖的方式來實現集群環境下進行並發控制,每個任務當前運行的有效節點有且只有一個!

 

特別需要注意的是:分布式部署時需要保證各個節點的系統時間一致!

 

二、數據表初始化

數據庫表結構官網已經提供,我們可以直接訪問Quartz對應的官方網站,找到對應的版本,然后將其下載!

 

 

 我選擇的是quartz-2.3.0-distribution.tar.gz,下載完成之后將其解壓,在文件中搜索sql,在里面選擇適合當前環境的數據庫腳本文件,然后將其初始化到數據庫中即可!

 

 

 例如,我使用的數據庫是mysql-5.7,因此我選擇的是tables_mysql_innodb.sql腳本,具體內容如下:

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;
 
CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(190) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    STR_PROP_1 VARCHAR(512) NULL,
    STR_PROP_2 VARCHAR(512) NULL,
    STR_PROP_3 VARCHAR(512) NULL,
    INT_PROP_1 INT NULL,
    INT_PROP_2 INT NULL,
    LONG_PROP_1 BIGINT NULL,
    LONG_PROP_2 BIGINT NULL,
    DEC_PROP_1 NUMERIC(13,4) NULL,
    DEC_PROP_2 NUMERIC(13,4) NULL,
    BOOL_PROP_1 VARCHAR(1) NULL,
    BOOL_PROP_2 VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(190) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(190) NULL,
JOB_GROUP VARCHAR(190) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;
 
CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;
 
CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
 
CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
 
CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
 
commit;

具體表描述如下:

 

 

 其中,QRTZ_LOCKS 就是 Quartz 集群實現同步機制的行鎖表!

 

三、Quartz 集群實踐

3.1、創建springboot項目,導入maven依賴包

<!--引入boot父類-->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.0.RELEASE</version>
</parent>
 
<!--引入相關包-->
<dependencies>
    <!--spring boot核心-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 測試-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--開發環境調試-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <optional>true</optional>
    </dependency>
    <!--jpa 支持-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <!--mysql 數據源-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!--druid 數據連接池-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.1.17</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-quartz</artifactId>
    </dependency>
    <!--Alibaba Json處理包 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.46</version>
    </dependency>
</dependencies>

3.2、創建 application.properties 配置文件

spring.application.name=springboot-quartz-001
server.port=8080
 
#引入數據源
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

3.3、創建 quartz.properties 配置文件

#調度配置
#調度器實例名稱
org.quartz.scheduler.instanceName=SsmScheduler
#調度器實例編號自動生成
org.quartz.scheduler.instanceId=AUTO
#是否在Quartz執行一個job前使用UserTransaction
org.quartz.scheduler.wrapJobExecutionInUserTransaction=false
 
#線程池配置
#線程池的實現類
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
#線程池中的線程數量
org.quartz.threadPool.threadCount=10
#線程優先級
org.quartz.threadPool.threadPriority=5
#配置是否啟動自動加載數據庫內的定時任務,默認true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
#是否設置為守護線程,設置后任務將不會執行
#org.quartz.threadPool.makeThreadsDaemons=true
 
#持久化方式配置
#JobDataMaps是否都為String類型
org.quartz.jobStore.useProperties=true
#數據表的前綴,默認QRTZ_
org.quartz.jobStore.tablePrefix=QRTZ_
#最大能忍受的觸發超時時間
org.quartz.jobStore.misfireThreshold=60000
#是否以集群方式運行
org.quartz.jobStore.isClustered=true
#調度實例失效的檢查時間間隔,單位毫秒
org.quartz.jobStore.clusterCheckinInterval=2000
#數據保存方式為數據庫持久化
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
#數據庫代理類,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以滿足大部分數據庫
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#數據庫別名 隨便取
org.quartz.jobStore.dataSource=qzDS
 
#數據庫連接池,將其設置為druid
org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider
#數據庫引擎
org.quartz.dataSource.qzDS.driver=com.mysql.cj.jdbc.Driver
#數據庫連接
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/test-quartz?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
#數據庫用戶
org.quartz.dataSource.qzDS.user=root
#數據庫密碼
org.quartz.dataSource.qzDS.password=123456
#允許最大連接
org.quartz.dataSource.qzDS.maxConnection=5
#驗證查詢sql,可以不設置
org.quartz.dataSource.qzDS.validationQuery=select 0 from dual

3.4、注冊 Quartz 任務工廠

@Component
public class QuartzJobFactory extends AdaptableJobFactory {
 
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;
 
    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //調用父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        //進行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

3.5、注冊調度工廠

@Configuration
public class QuartzConfig {
 
    @Autowired
    private QuartzJobFactory jobFactory;
 
 
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        //獲取配置屬性
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        //在quartz.properties中的屬性被讀取並注入后再初始化對象
        propertiesFactoryBean.afterPropertiesSet();
        //創建SchedulerFactoryBean
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(propertiesFactoryBean.getObject());
        factory.setJobFactory(jobFactory);//支持在JOB實例中注入其他的業務對象
        factory.setApplicationContextSchedulerContextKey("applicationContextKey");
        factory.setWaitForJobsToCompleteOnShutdown(true);//這樣當spring關閉時,會等待所有已經啟動的quartz job結束后spring才能完全shutdown。
        factory.setOverwriteExistingJobs(false);//是否覆蓋己存在的Job
        factory.setStartupDelay(10);//QuartzScheduler 延時啟動,應用啟動完后 QuartzScheduler 再啟動
 
        return factory;
    }
 
    /**
     * 通過SchedulerFactoryBean獲取Scheduler的實例
     * @return
     * @throws IOException
     * @throws SchedulerException
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler() throws IOException, SchedulerException {
        Scheduler scheduler = schedulerFactoryBean().getScheduler();
        return scheduler;
    }
}

3.6、重新設置 Quartz 數據連接池

默認 Quartz 的數據連接池是 c3p0,由於性能不太穩定,不推薦使用,因此我們將其改成driud數據連接池,配置如下:

public class DruidConnectionProvider implements ConnectionProvider {
 
    /**
     * 常量配置,與quartz.properties文件的key保持一致(去掉前綴),同時提供set方法,Quartz框架自動注入值。
     * @return
     * @throws SQLException
     */
 
    //JDBC驅動
    public String driver;
    //JDBC連接串
    public String URL;
    //數據庫用戶名
    public String user;
    //數據庫用戶密碼
    public String password;
    //數據庫最大連接數
    public int maxConnection;
    //數據庫SQL查詢每次連接返回執行到連接池,以確保它仍然是有效的。
    public String validationQuery;
 
    private boolean validateOnCheckout;
 
    private int idleConnectionValidationSeconds;
 
    public String maxCachedStatementsPerConnection;
 
    private String discardIdleConnectionsSeconds;
 
    public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;
 
    public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;
 
    //Druid連接池
    private DruidDataSource datasource;
 
    @Override
    public Connection getConnection() throws SQLException {
        return datasource.getConnection();
    }
 
    @Override
    public void shutdown() throws SQLException {
        datasource.close();
    }
 
    @Override
    public void initialize() throws SQLException {
        if (this.URL == null) {
            throw new SQLException("DBPool could not be created: DB URL cannot be null");
        }
 
        if (this.driver == null) {
            throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
        }
 
        if (this.maxConnection < 0) {
            throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");
        }
 
        datasource = new DruidDataSource();
        try{
            datasource.setDriverClassName(this.driver);
        } catch (Exception e) {
            try {
                throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
            } catch (SchedulerException e1) {
            }
        }
 
        datasource.setUrl(this.URL);
        datasource.setUsername(this.user);
        datasource.setPassword(this.password);
        datasource.setMaxActive(this.maxConnection);
        datasource.setMinIdle(1);
        datasource.setMaxWait(0);
        datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);
 
        if (this.validationQuery != null) {
            datasource.setValidationQuery(this.validationQuery);
            if(!this.validateOnCheckout)
                datasource.setTestOnReturn(true);
            else
                datasource.setTestOnBorrow(true);
            datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
        }
    }
 
    public String getDriver() {
        return driver;
    }
 
    public void setDriver(String driver) {
        this.driver = driver;
    }
 
    public String getURL() {
        return URL;
    }
 
    public void setURL(String URL) {
        this.URL = URL;
    }
 
    public String getUser() {
        return user;
    }
 
    public void setUser(String user) {
        this.user = user;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
 
    public int getMaxConnection() {
        return maxConnection;
    }
 
    public void setMaxConnection(int maxConnection) {
        this.maxConnection = maxConnection;
    }
 
    public String getValidationQuery() {
        return validationQuery;
    }
 
    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }
 
    public boolean isValidateOnCheckout() {
        return validateOnCheckout;
    }
 
    public void setValidateOnCheckout(boolean validateOnCheckout) {
        this.validateOnCheckout = validateOnCheckout;
    }
 
    public int getIdleConnectionValidationSeconds() {
        return idleConnectionValidationSeconds;
    }
 
    public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {
        this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;
    }
 
    public DruidDataSource getDatasource() {
        return datasource;
    }
 
    public void setDatasource(DruidDataSource datasource) {
        this.datasource = datasource;
    }
 
    public String getDiscardIdleConnectionsSeconds() {
        return discardIdleConnectionsSeconds;
    }
 
    public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) {
        this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds;
    }
}

創建完成之后,還需要在quartz.properties配置文件中設置一下即可!

#數據庫連接池,將其設置為druid
org.quartz.dataSource.qzDS.connectionProvider.class=com.example.cluster.quartz.config.DruidConnectionProvider

如果已經配置,請忽略!

3.7、編寫 Job 具體任務類

public class TfCommandJob implements Job {
 
    private static final Logger log = LoggerFactory.getLogger(TfCommandJob.class);
 
    @Override
    public void execute(JobExecutionContext context) {
        try {
            System.out.println(context.getScheduler().getSchedulerInstanceId() + "--" + new SimpleDateFormat("YYYY-MM-dd HH:mm:ss").format(new Date()));
        } catch (SchedulerException e) {
            log.error("任務執行失敗",e);
        }
    }
}

3.8、編寫 Quartz 服務層接口

public interface QuartzJobService {
    /**
     * 添加任務可以傳參數
     * @param clazzName
     * @param jobName
     * @param groupName
     * @param cronExp
     * @param param
     */
    void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param);
 
    /**
     * 暫停任務
     * @param jobName
     * @param groupName
     */
    void pauseJob(String jobName, String groupName);
 
    /**
     * 恢復任務
     * @param jobName
     * @param groupName
     */
    void resumeJob(String jobName, String groupName);
 
    /**
     * 立即運行一次定時任務
     * @param jobName
     * @param groupName
     */
    void runOnce(String jobName, String groupName);
 
    /**
     * 更新任務
     * @param jobName
     * @param groupName
     * @param cronExp
     * @param param
     */
    void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param);
 
    /**
     * 刪除任務
     * @param jobName
     * @param groupName
     */
    void deleteJob(String jobName, String groupName);
 
    /**
     * 啟動所有任務
     */
    void startAllJobs();
 
    /**
     * 暫停所有任務
     */
    void pauseAllJobs();
 
    /**
     * 恢復所有任務
     */
    void resumeAllJobs();
 
    /**
     * 關閉所有任務
     */
    void shutdownAllJobs();
}

對應的實現類QuartzJobServiceImpl如下:

@Service
public class QuartzJobServiceImpl implements QuartzJobService {
 
    private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class);
 
    @Autowired
    private Scheduler scheduler;
 
    @Override
    public void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) {
        try {
            // 啟動調度器,默認初始化的時候已經啟動
//            scheduler.start();
            //構建job信息
            Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName);
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build();
            //表達式調度構建器(即任務執行的時間)
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
            //按新的cronExpression表達式構建一個新的trigger
            CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build();
            //獲得JobDataMap,寫入數據
            if (param != null) {
                trigger.getJobDataMap().putAll(param);
            }
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (Exception e) {
            log.error("創建任務失敗", e);
        }
    }
 
    @Override
    public void pauseJob(String jobName, String groupName) {
        try {
            scheduler.pauseJob(JobKey.jobKey(jobName, groupName));
        } catch (SchedulerException e) {
            log.error("暫停任務失敗", e);
        }
    }
 
    @Override
    public void resumeJob(String jobName, String groupName) {
        try {
            scheduler.resumeJob(JobKey.jobKey(jobName, groupName));
        } catch (SchedulerException e) {
            log.error("恢復任務失敗", e);
        }
    }
 
    @Override
    public void runOnce(String jobName, String groupName) {
        try {
            scheduler.triggerJob(JobKey.jobKey(jobName, groupName));
        } catch (SchedulerException e) {
            log.error("立即運行一次定時任務失敗", e);
        }
    }
 
    @Override
    public void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (cronExp != null) {
                // 表達式調度構建器
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
                // 按新的cronExpression表達式重新構建trigger
                trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
            }
            //修改map
            if (param != null) {
                trigger.getJobDataMap().putAll(param);
            }
            // 按新的trigger重新設置job執行
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (Exception e) {
            log.error("更新任務失敗", e);
        }
    }
 
    @Override
    public void deleteJob(String jobName, String groupName) {
        try {
            //暫停、移除、刪除
            scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName));
            scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName));
            scheduler.deleteJob(JobKey.jobKey(jobName, groupName));
        } catch (Exception e) {
            log.error("刪除任務失敗", e);
        }
    }
 
    @Override
    public void startAllJobs() {
        try {
            scheduler.start();
        } catch (Exception e) {
            log.error("開啟所有的任務失敗", e);
        }
    }
 
    @Override
    public void pauseAllJobs() {
        try {
            scheduler.pauseAll();
        } catch (Exception e) {
            log.error("暫停所有任務失敗", e);
        }
    }
 
    @Override
    public void resumeAllJobs() {
        try {
            scheduler.resumeAll();
        } catch (Exception e) {
            log.error("恢復所有任務失敗", e);
        }
    }
 
    @Override
    public void shutdownAllJobs() {
        try {
 
            if (!scheduler.isShutdown()) {
                // 需謹慎操作關閉scheduler容器
                // scheduler生命周期結束,無法再 start() 啟動scheduler
                scheduler.shutdown(true);
            }
        } catch (Exception e) {
            log.error("關閉所有的任務失敗", e);
        }
    }
}

3.9、編寫 contoller 服務

先創建一個請求參數實體類

public class QuartzConfigDTO implements Serializable {
 
 
    private static final long serialVersionUID = 1L;
    /**
     * 任務名稱
     */
    private String jobName;
 
    /**
     * 任務所屬組
     */
    private String groupName;
 
    /**
     * 任務執行類
     */
    private String jobClass;
 
    /**
     * 任務調度時間表達式
     */
    private String cronExpression;
 
    /**
     * 附加參數
     */
    private Map<String, Object> param;
 
 
    public String getJobName() {
        return jobName;
    }
 
    public QuartzConfigDTO setJobName(String jobName) {
        this.jobName = jobName;
        return this;
    }
 
    public String getGroupName() {
        return groupName;
    }
 
    public QuartzConfigDTO setGroupName(String groupName) {
        this.groupName = groupName;
        return this;
    }
 
    public String getJobClass() {
        return jobClass;
    }
 
    public QuartzConfigDTO setJobClass(String jobClass) {
        this.jobClass = jobClass;
        return this;
    }
 
    public String getCronExpression() {
        return cronExpression;
    }
 
    public QuartzConfigDTO setCronExpression(String cronExpression) {
        this.cronExpression = cronExpression;
        return this;
    }
 
    public Map<String, Object> getParam() {
        return param;
    }
 
    public QuartzConfigDTO setParam(Map<String, Object> param) {
        this.param = param;
        return this;
    }
}

編寫 web 服務接口

@RestController
@RequestMapping("/test")
public class TestController {
 
    private static final Logger log = LoggerFactory.getLogger(TestController.class);
 
    @Autowired
    private QuartzJobService quartzJobService;
 
    /**
     * 添加新任務
     * @param configDTO
     * @return
     */
    @RequestMapping("/addJob")
    public Object addJob(@RequestBody QuartzConfigDTO configDTO) {
        quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
        return HttpStatus.OK;
    }
 
    /**
     * 暫停任務
     * @param configDTO
     * @return
     */
    @RequestMapping("/pauseJob")
    public Object pauseJob(@RequestBody QuartzConfigDTO configDTO) {
        quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName());
        return HttpStatus.OK;
    }
 
    /**
     * 恢復任務
     * @param configDTO
     * @return
     */
    @RequestMapping("/resumeJob")
    public Object resumeJob(@RequestBody QuartzConfigDTO configDTO) {
        quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName());
        return HttpStatus.OK;
    }
 
    /**
     * 立即運行一次定時任務
     * @param configDTO
     * @return
     */
    @RequestMapping("/runOnce")
    public Object runOnce(@RequestBody QuartzConfigDTO configDTO) {
        quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName());
        return HttpStatus.OK;
    }
 
    /**
     * 更新任務
     * @param configDTO
     * @return
     */
    @RequestMapping("/updateJob")
    public Object updateJob(@RequestBody QuartzConfigDTO configDTO) {
        quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
        return HttpStatus.OK;
    }
 
    /**
     * 刪除任務
     * @param configDTO
     * @return
     */
    @RequestMapping("/deleteJob")
    public Object deleteJob(@RequestBody QuartzConfigDTO configDTO) {
        quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName());
        return HttpStatus.OK;
    }
 
    /**
     * 啟動所有任務
     * @return
     */
    @RequestMapping("/startAllJobs")
    public Object startAllJobs() {
        quartzJobService.startAllJobs();
        return HttpStatus.OK;
    }
 
    /**
     * 暫停所有任務
     * @return
     */
    @RequestMapping("/pauseAllJobs")
    public Object pauseAllJobs() {
        quartzJobService.pauseAllJobs();
        return HttpStatus.OK;
    }
 
    /**
     * 恢復所有任務
     * @return
     */
    @RequestMapping("/resumeAllJobs")
    public Object resumeAllJobs() {
        quartzJobService.resumeAllJobs();
        return HttpStatus.OK;
    }
 
    /**
     * 關閉所有任務
     * @return
     */
    @RequestMapping("/shutdownAllJobs")
    public Object shutdownAllJobs() {
        quartzJobService.shutdownAllJobs();
        return HttpStatus.OK;
    }
 
}

3.10、服務接口測試

運行 SpringBoot 的Application類,啟動服務!

 

 

  創建一個每5秒鍾執行一次的定時任務

 

 

 

 

 

 

可以看到服務正常運行!

 

3.11、注冊監聽器(選用)

當然,如果你想在 SpringBoot 里面集成 Quartz 的監聽器,操作也很簡單!

創建任務調度監聽器

@Component
public class SimpleSchedulerListener extends SchedulerListenerSupport {
 
    @Override
    public void jobScheduled(Trigger trigger) {
        System.out.println("任務被部署時被執行");
    }
 
    @Override
    public void jobUnscheduled(TriggerKey triggerKey) {
        System.out.println("任務被卸載時被執行");
    }
 
    @Override
    public void triggerFinalized(Trigger trigger) {
        System.out.println("任務完成了它的使命,光榮退休時被執行");
    }
 
    @Override
    public void triggerPaused(TriggerKey triggerKey) {
        System.out.println(triggerKey + "(一個觸發器)被暫停時被執行");
    }
 
    @Override
    public void triggersPaused(String triggerGroup) {
        System.out.println(triggerGroup + "所在組的全部觸發器被停止時被執行");
    }
 
    @Override
    public void triggerResumed(TriggerKey triggerKey) {
        System.out.println(triggerKey + "(一個觸發器)被恢復時被執行");
    }
 
    @Override
    public void triggersResumed(String triggerGroup) {
        System.out.println(triggerGroup + "所在組的全部觸發器被回復時被執行");
    }
 
    @Override
    public void jobAdded(JobDetail jobDetail) {
        System.out.println("一個JobDetail被動態添加進來");
    }
 
    @Override
    public void jobDeleted(JobKey jobKey) {
        System.out.println(jobKey + "被刪除時被執行");
    }
 
    @Override
    public void jobPaused(JobKey jobKey) {
        System.out.println(jobKey + "被暫停時被執行");
    }
 
    @Override
    public void jobsPaused(String jobGroup) {
        System.out.println(jobGroup + "(一組任務)被暫停時被執行");
    }
 
    @Override
    public void jobResumed(JobKey jobKey) {
        System.out.println(jobKey + "被恢復時被執行");
    }
 
    @Override
    public void jobsResumed(String jobGroup) {
        System.out.println(jobGroup + "(一組任務)被恢復時被執行");
    }
 
    @Override
    public void schedulerError(String msg, SchedulerException cause) {
        System.out.println("出現異常" + msg + "時被執行");
        cause.printStackTrace();
    }
 
    @Override
    public void schedulerInStandbyMode() {
        System.out.println("scheduler被設為standBy等候模式時被執行");
    }
 
    @Override
    public void schedulerStarted() {
        System.out.println("scheduler啟動時被執行");
    }
 
    @Override
    public void schedulerStarting() {
        System.out.println("scheduler正在啟動時被執行");
    }
 
    @Override
    public void schedulerShutdown() {
        System.out.println("scheduler關閉時被執行");
    }
 
    @Override
    public void schedulerShuttingdown() {
        System.out.println("scheduler正在關閉時被執行");
    }
 
    @Override
    public void schedulingDataCleared() {
        System.out.println("scheduler中所有數據包括jobs, triggers和calendars都被清空時被執行");
    }
}

 

創建任務觸發監聽器

@Component
public class SimpleTriggerListener extends TriggerListenerSupport {
 
    /**
     * Trigger監聽器的名稱
     * @return
     */
    @Override
    public String getName() {
        return "mySimpleTriggerListener";
    }
 
    /**
     * Trigger被激發 它關聯的job即將被運行
     * @param trigger
     * @param context
     */
    @Override
    public void triggerFired(Trigger trigger, JobExecutionContext context) {
        System.out.println("myTriggerListener.triggerFired()");
    }
 
    /**
     * Trigger被激發 它關聯的job即將被運行, TriggerListener 給了一個選擇去否決 Job 的執行,如果返回TRUE 那么任務job會被終止
     * @param trigger
     * @param context
     * @return
     */
    @Override
    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
        System.out.println("myTriggerListener.vetoJobExecution()");
        return false;
    }
 
    /**
     * 當Trigger錯過被激發時執行,比如當前時間有很多觸發器都需要執行,但是線程池中的有效線程都在工作,
     * 那么有的觸發器就有可能超時,錯過這一輪的觸發。
     * @param trigger
     */
    @Override
    public void triggerMisfired(Trigger trigger) {
        System.out.println("myTriggerListener.triggerMisfired()");
    }
 
    /**
     * 任務完成時觸發
     * @param trigger
     * @param context
     * @param triggerInstructionCode
     */
    @Override
    public void triggerComplete(Trigger trigger, JobExecutionContext context, Trigger.CompletedExecutionInstruction triggerInstructionCode) {
        System.out.println("myTriggerListener.triggerComplete()");
    }
}

 

創建任務執行監聽器

@Component
public class SimpleJobListener extends JobListenerSupport {
 
 
    /**
     * job監聽器名稱
     * @return
     */
    @Override
    public String getName() {
        return "mySimpleJobListener";
    }
 
    /**
     * 任務被調度前
     * @param context
     */
    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        System.out.println("simpleJobListener監聽器,准備執行:"+context.getJobDetail().getKey());
    }
 
    /**
     * 任務調度被拒了
     * @param context
     */
    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        System.out.println("simpleJobListener監聽器,取消執行:"+context.getJobDetail().getKey());
    }
 
    /**
     * 任務被調度后
     * @param context
     * @param jobException
     */
    @Override
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        System.out.println("simpleJobListener監聽器,執行結束:"+context.getJobDetail().getKey());
    }
}

 

最后,將監聽器注冊到Scheduler

@Autowired
private SimpleSchedulerListener simpleSchedulerListener;
 
@Autowired
private SimpleJobListener simpleJobListener;
 
@Autowired
private SimpleTriggerListener simpleTriggerListener;
 
@Bean(name = "scheduler")
public Scheduler scheduler() throws IOException, SchedulerException {
    Scheduler scheduler = schedulerFactoryBean().getScheduler();
    //全局添加監聽器
    //添加SchedulerListener監聽器
    scheduler.getListenerManager().addSchedulerListener(simpleSchedulerListener);
 
    // 添加JobListener, 支持帶條件匹配監聽器
    scheduler.getListenerManager().addJobListener(simpleJobListener, KeyMatcher.keyEquals(JobKey.jobKey("myJob", "myGroup")));
 
    // 添加triggerListener,設置全局監聽
    scheduler.getListenerManager().addTriggerListener(simpleTriggerListener, EverythingMatcher.allTriggers());
    return scheduler;
}

3.12、采用項目數據源(選用)

在上面的 Quartz 數據源配置中,我們使用了自定義的數據源,目的是和項目中的數據源實現解耦,當然有的同學不想單獨建庫,想和項目中數據源保持一致,配置也很簡單!

quartz.properties配置文件中,去掉org.quartz.jobStore.dataSource配置:

#注釋掉quartz的數據源配置
#org.quartz.jobStore.dataSource=qzDS

QuartzConfig配置類中加入dataSource數據源,並將其注入到quartz中:

@Autowired
private DataSource dataSource;
 
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
    //...
 
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setQuartzProperties(propertiesFactoryBean.getObject());
    //使用數據源,自定義數據源
    factory.setDataSource(dataSource);
    
    //...
    return factory;
}

 

四、任務調度測試

在實際的部署中,項目都是集群進行部署,因此為了和正式環境一致,我們再新建兩個相同的項目來測試一下在集群環境下 quartz 是否可以實現分布式調度,保證任何一個定時任務只有一台機器在運行

理論上,我們只需要將剛剛新建好的項目,重新復制一份,然后修改一下端口號就可以實現本地測試!

因為curd服務只需要一個,因此我們不需要再編寫QuartzJobService等增、刪、改服務,僅僅保持QuartzConfig、DruidConnectionProvider、QuartzJobFactory、TfCommandJob、quartz.properties類和配置都是相同的就可以了!

 

 

 依次啟動服務quartz-001quartz-002quartz-003,看看效果如何:

第一個啟動的服務quartz-001會優先加載數據庫中已經配置好的定時任務,其他兩個服務quartz-002quartz-003都沒有主動調度服務

 

 

 

當我們主動關閉quartz-001quartz-002服務主動接收任務調度 

 

 

 

當我們主動關閉quartz-002,同樣quartz-003服務主動接收任務調度

 

 

 最終結果,和我們預期效果一致!

 

五、小結

本文主要圍繞springboot + quartz + mysql實現持久化分布式調度進行介紹,所有的代碼功能,都測試過。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM