Java 定時任務調度(4)--Quartz 入門實戰


Quartz 是一個完全由 Java 編寫的開源作業調度框架,為在 Java 應用程序中進行作業調度提供了簡單卻強大的機制。本文主要介紹 Quartz 的基本使用,文中使用到的軟件及版本:Java 1.8.0_191、Quartz 2.3.2、SpringBoot 2.4.4、MySQL 5.7。

1、Quartz 核心概念

Job 表示一個工作,要執行的具體內容。
JobDetail 表示一個具體的可執行的調度程序,Job 是這個可執行程調度程序所要執行的內容,另外 JobDetail 還包含了這個任務調度的方案和策略。一個 JobDetail 可對應多個 Trigger。
Trigger 代表一個調度參數的配置,什么時候去調。一個 Trigger 對應 一個 JobDetail。
Scheduler 代表一個調度容器,一個調度容器中可以注冊多個 JobDetail 和 Trigger。當 Trigger 與 JobDetail 組合,就可以被 Scheduler 容器調度了。

2、Quartz 常用配置

2.1、Quartz 主配置

1、org.quartz.scheduler.instanceName scheduler 實例名稱,默認值為 'QuartzScheduler'

2、org.quartz.scheduler.instanceId scheduler實例Id,必須唯一;啟用集群可設為 'AUTO',默認值為 'NON_CLUSTERED'

3、org.quartz.scheduler.threadName 線程名稱,默認值為 instanceName + '_QuartzSchedulerThread'

4、org.quartz.scheduler.makeSchedulerThreadDaemon 調度程序的主線程是否設為守護線程,默認值為 false

5、org.quartz.scheduler.batchTriggerAcquisitionMaxCount 調度節點一次獲取觸發器的最大數量,默認為 1

2.2、ThreadPool 配置

1、org.quartz.threadPool.class ThreadPool實現的類名,默認值為 'org.quartz.simpl.SimpleThreadPool'

2、org.quartz.threadPool.threadCount 線程數,默認值為 10

3、org.quartz.threadPool.threadPriority 線程優先級,默認值 5

4、org.quartz.threadPool.makeThreadsDaemons 執行任務的線程是否設為守護線程,默認為 false

2.3、JobStore 配置

org.quartz.jobStore.class 任務存儲實現類名,可設為 org.quartz.simpl.RAMJobStore、org.quartz.impl.jdbcjobstore.JobStoreTX(quartz管理事務)、org.quartz.impl.jdbcjobstore.JobStoreCMT(應用程序管理事務)、org.terracotta.quartz.TerracottaJobStore;當設置為jdbc存儲時,有以下屬性可以調整設置:

1、org.quartz.jobStore.driverDelegateClass 驅動代理類,可設置為標准的 jdbc 驅動程序:org.quartz.impl.jdbcjobstore.StdJDBCDelegate

2、org.quartz.jobStore.dataSource 數據源名稱

3、org.quartz.jobStore.tablePrefix JDBCJobStore的表前綴;如果使用不同的表前綴,則可以在同一數據庫中擁有多組Quartz表;默認值 'QRTZ_'

4、org.quartz.jobStore.useProperties 指示JobDataMaps中的所有值都將是字符串,避免了將非String類序列化為BLOB時可能產生的類版本控制問題

5、org.quartz.jobStore.isClustered 是否開啟集群,如果有多個 Quartz 實例使用同一組數據庫表,則此屬性必須設置為 true,默認值為 false

6、org.quartz.jobStore.clusterCheckinInterval 設置此實例檢入與群集的其他實例的頻率(以毫秒為單位),默認值為 15000

7、org.quartz.jobStore.acquireTriggersWithinLock 獲取觸發器時是否添加數據庫鎖,如果 org.quartz.scheduler.batchTriggerAcquisitionMaxCount > 1,必須設為 true;默認為 false

8、org.quartz.jobStore.misfireThreshold  觸發器超時觸發的閾值,超過該值(有空閑線程能執行任務的實際時間-任務應該被執行的時間)將不會觸發任務的執行;單位為毫秒,默認為 60000

2.4、DataSource 配置

1、org.quartz.dataSource.NAME.driver JDBC 驅動程序

2、org.quartz.dataSource.NAME.URL JDBC URL

3、org.quartz.dataSource.NAME.user 數據庫用戶名

4、org.quartz.dataSource.NAME.password 數據庫密碼

5、org.quartz.dataSource.NAME.maxConnections 最大連接數,默認值 10

6、org.quartz.dataSource.NAME.validationQuery 驗證sql

7、org.quartz.dataSource.NAME.idleConnectionValidationSeconds 空閑連接檢測間隔,默認值 50

8、org.quartz.dataSource.NAME.validateOnCheckout 獲取連接后是否驗證連接,默認值 false

9、org.quartz.dataSource.NAME.discardIdleConnectionsSeconds 空閑連接多長時間后丟棄該連接,0 表示禁用該功能,默認值為 0。

或者使用容器提供的數據源:

1、org.quartz.dataSource.NAME.jndiURL 數據源的 jndi

2、org.quartz.dataSource.NAME.java.naming.security.principal  jndi 需要的用戶名

3、org.quartz.dataSource.NAME.java.naming.security.credentials jndi 需要的密碼

3、Quartz JDBCJobStore表說明

表名 說明
qrtz_blob_triggers 存儲自定義 trigger 實例
qrtz_calendars 存儲 Calendar 信息
qrtz_cron_triggers 存儲 CronTrigger 實例信息
qrtz_fired_triggers 存儲正在觸發的 Trigger 相關的狀態信息
qrtz_job_details 存儲 JobDetail 信息
qrtz_locks 存儲行鎖
qrtz_paused_trigger_grps 存儲暫停的 Trigger 的信息
qrtz_scheduler_state 存儲 Scheduler 的狀態信息
qrtz_simple_triggers 存儲 SimpleTrigger 實例信息
qrtz_simprop_triggers 存儲 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 實例信息
qrtz_triggers 存儲所有的 trigger 及關聯的 job 信息

在 quartz jar 包的 org.quartz.impl.jdbcjobstore 目錄下可以找到對應數據庫的執行基本:

3、Quartz 單獨使用

3.1、引入依賴

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- quartz 默認使用 c3p0 連接池-->
<dependency>
    <groupId>com.mchange</groupId>
    <artifactId>c3p0</artifactId>
    <version>0.9.5.5</version>
</dependency>

3.2、quartz.properties

3.2.1、任務相關數據放在內存中

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.threadPool.threadCount = 3
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

3.2.2、任務相關數據放到數據庫

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.threadCount = 3

org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.tablePrefix = qrtz_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.useProperties = true

org.quartz.dataSource.myDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql://10.49.196.10:3306/itest?useUnicode=true&characterEncoding=UTF-8
org.quartz.dataSource.myDS.user = root
org.quartz.dataSource.myDS.password = 123456
org.quartz.dataSource.myDS.maxConnections = 5

3.3、使用例子

package com.abc.demo.quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * quartz使用例子
 */
public class TestJob implements Job {
    private static Logger logger = LoggerFactory.getLogger(TestJob.class);

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        logger.info("JobKey={}", context.getJobDetail().getKey());
        logger.info("TriggerKey={}", context.getTrigger().getKey());
        logger.info("jkey2={}", context.getJobDetail().getJobDataMap().get("jkey2"));
        logger.info("jkey1={}", context.getJobDetail().getJobDataMap().get("jkey1"));
        logger.info("jkey2={}", context.getJobDetail().getJobDataMap().get("jkey2"));
        logger.info("tkey1={}", context.getTrigger().getJobDataMap().get("tkey1"));
        logger.info("tkey2={}", context.getTrigger().getJobDataMap().get("tkey2"));
        try {
            logger.info("ckey={}", context.getScheduler().getContext().get("ckey"));
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        logger.info("----------------");
    }

    public static void main(String[] args) throws Exception {
        //創建一個scheduler
        Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
        scheduler.getContext().put("ckey", "cvalue");

        //創建一個job
        JobDetail jobDetail = JobBuilder.newJob(TestJob.class)
                //設置job相關數據
                .usingJobData("jkey1", "jvalue1")
                .withIdentity("job1", "group2")
                .storeDurably()
                .build();
        jobDetail.getJobDataMap().put("jkey2", "jvalue2");

        //創建一個Trigger
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1")
                //設置Trigger相關數據
                .usingJobData("tkey1", "tvalue1")
                //延遲5s開始執行
                .startAt(DateBuilder.futureDate(3, DateBuilder.IntervalUnit.SECOND))
                //每隔5s執行一次,重復無數次
                //.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
                .withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?"))
                //下一個整的分鍾停止
                .endAt(DateBuilder.evenMinuteDate(null))
                .forJob(jobDetail)
                .build();
        trigger.getJobDataMap().put("tkey2", "tvalue2");

        //scheduler.addJob(jobDetail, true);
        //注冊trigger
        scheduler.scheduleJob(jobDetail, trigger);
        //啟動scheduler
        scheduler.start();

        Thread.sleep(1000 * 100);

        //停止diao調度
        scheduler.shutdown();
    }
}

4、Spring Boot 中使用 Quartz

4.1、引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

4.2、application.yml

4.2.1、任務相關數據放在內存中

spring:
    quartz:
       job-store-type: memory

4.2.2、任務相關數據放在數據庫

spring:
  datasource:
    druid:
      primary:
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://10.49.196.10:3306/itest?useUnicode=true&characterEncoding=UTF-8
        username: root
        password: 123456
        initialSize: 2
        minIdle: 2
        maxActive: 10
        validationQuery: SELECT 1
        testWhileIdle: true
        testOnBorrow: true
        testOnReturn: false
        maxWait: 6000
        filters: wall,stat,slf4j

  quartz:
    job-store-type: jdbc
    properties:
      org:
        quartz:
          scheduler:
            instanceName: myScheduler
            instanceId: AUTO
          threadPool:
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 10
          jobStore:
            class: org.quartz.impl.jdbcjobstore.JobStoreTX
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: QRTZ_
            isClustered: true
            clusterCheckinInterval: 15000
            useProperties: true

4.3、數據源配置(任務相關數據放數據庫時需要)

package com.abc.demo.config;

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.autoconfigure.quartz.QuartzDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

    @Primary
    @Bean
    @QuartzDataSource
    @ConfigurationProperties(prefix="spring.datasource.druid.primary")
    public DataSource dataSource() {
        return DruidDataSourceBuilder.create().build();
    }

}

4.4、定義Job

DemoJob:

package com.abc.demo.quartz;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;

@DisallowConcurrentExecution
public class DemoJob extends QuartzJobBean {
    private static Logger logger = LoggerFactory.getLogger(DemoJob.class);

    /**業務參數*/
    private String businessParam;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        logger.info("hello quartz。businessParam={}", businessParam);
    }

    public void setBusinessParam(String businessParam) {
        this.businessParam = businessParam;
    }
}

InvokeJob,通用的 Job,利用反射執行某個類的某個方法:

package com.abc.demo.quartz;

import com.abc.demo.util.SpringContext;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.lang.reflect.Method;

/**
 * 執行某個類的某個方法,適用於無參數方法
 */
@DisallowConcurrentExecution
public class InvokeJob extends QuartzJobBean {
    private String className;
    private String methodName;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            Object object = SpringContext.getBean(Class.forName(className));
            Method method = object.getClass().getMethod(methodName);
            method.invoke(object);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
}

用到的 SpringContext.java:

package com.abc.demo.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * spring上下文
 */
@Component
public class SpringContext implements ApplicationContextAware {
    protected static ApplicationContext context;
    
    @Override
    public void setApplicationContext(ApplicationContext context)
            throws BeansException {
        SpringContext.context = context;
    }
    
    public static ApplicationContext getContext() {
        return context;
    }
    
    public static Object getBean(String beanId) {
        return context.getBean(beanId);
    }
    
    public static <T> T getBean(Class<T> c) {
        return context.getBean(c);
    }
    
    public static <T> T getBean(String name, Class<T> c) {
        return context.getBean(name, c);
    }
}
SpringContext.java

4.5、定義需要利用反射執行的方法

這里定義一個 Service,利用反射來執行該 Service 中的方法。

package com.abc.demo.quartz;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class MyService {
    private static Logger logger = LoggerFactory.getLogger(MyService.class);

    public void test() {
        logger.info("test");
    }

    public void test2() {
        logger.info("test2");
    }
}

4.6、應用啟動時任務初始化

package com.abc.demo.runner;

import com.abc.demo.entity.DemoTaskConfig;
import com.abc.demo.entity.InvokeTaskConfig;
import com.abc.demo.quartz.DemoJob;
import com.abc.demo.quartz.InvokeJob;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 應用啟動后,初始化任務
 */
@Component
public class QuartzRunner implements CommandLineRunner {
    @Autowired
    private Scheduler scheduler;

    /**模擬數據庫中DemoJob的配置數據*/
    private Map<Integer, DemoTaskConfig> demoTaskConfigs = new HashMap() {{
        put(1, new DemoTaskConfig(1, "業務1", "0/3 * * * * ?", "demoJob1", "demoGroup", "demoTrigger1", "demoGroup"));
        put(2, new DemoTaskConfig(2, "業務2", "0/4 * * * * ?", "demoJob2", "demoGroup", "demoTrigger2", "demoGroup"));
    }};

    /**模擬數據庫中InvokeJob的配置數據*/
    private Map<Integer, InvokeTaskConfig> invokeTaskConfigs = new HashMap() {{
        put(1, new InvokeTaskConfig(1, "com.abc.demo.quartz.MyService", "test","0/5 * * * * ?", "myJob1", "myGroup", "myTrigger1", "myGroup"));
        put(2, new InvokeTaskConfig(2, "com.abc.demo.quartz.MyService", "test2", "0/6 * * * * ?", "myJob2", "myGroup", "myTrigger2", "myGroup"));
    }};

    @Override
    public void run(String... args) throws Exception {
        for (DemoTaskConfig demoTaskConfig : demoTaskConfigs.values()) {
            JobKey jobKey = new JobKey(demoTaskConfig.getJobName(), demoTaskConfig.getJobGroup());
            JobDetail jobDetail = JobBuilder.newJob(DemoJob.class)
                    .withIdentity(jobKey)
                    .usingJobData("businessParam", demoTaskConfig.getBusinessParam())
                    .build();
            TriggerKey triggerKey = new TriggerKey(demoTaskConfig.getTriggerName(), demoTaskConfig.getTriggerGroup());
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    .withSchedule(CronScheduleBuilder.cronSchedule(demoTaskConfig.getCron()).withMisfireHandlingInstructionDoNothing())
                    .forJob(jobDetail)
                    .build();

            JobDetail jobDetailOld = scheduler.getJobDetail(jobKey);
            Trigger triggerOld = scheduler.getTrigger(triggerKey);
            if (triggerOld != null) {
                scheduler.rescheduleJob(triggerKey, trigger);
            } else {
                if (jobDetailOld == null) {
                    scheduler.scheduleJob(jobDetail, trigger);
                } else {
                    scheduler.scheduleJob(trigger);
                }
            }
        }

        for (InvokeTaskConfig quartzTaskConfig : invokeTaskConfigs.values()) {
            JobKey jobKey = new JobKey(quartzTaskConfig.getJobName(), quartzTaskConfig.getJobGroup());
            JobDetail jobDetail = JobBuilder.newJob(InvokeJob.class)
                    .withIdentity(jobKey)
                    .usingJobData("className", quartzTaskConfig.getClassName())
                    .usingJobData("methodName", quartzTaskConfig.getMethodName())
                    .build();
            TriggerKey triggerKey = new TriggerKey(quartzTaskConfig.getTriggerName(), quartzTaskConfig.getTriggerGroup());
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    .withSchedule(CronScheduleBuilder.cronSchedule(quartzTaskConfig.getCron()).withMisfireHandlingInstructionDoNothing())
                    .forJob(jobDetail)
                    .build();

            JobDetail jobDetailOld = scheduler.getJobDetail(jobKey);
            Trigger triggerOld = scheduler.getTrigger(triggerKey);
            if (triggerOld != null) {
                scheduler.rescheduleJob(triggerKey, trigger);
            } else {
                if (jobDetailOld == null) {
                    scheduler.scheduleJob(jobDetail, trigger);
                } else {
                    scheduler.scheduleJob(trigger);
                }
            }
        }

    }
}

 5、Quartz 集群

當 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX,可以啟用集群功能部署多個實例:

org.quartz.jobStore.isClustered = true

如果實例部署在多台機器上,機器之間需要需要有時間同步,需要保證機器間的時間誤差不超過 1 秒;可參考官網說明:https://github.com/quartz-scheduler/quartz/blob/master/docs/configuration.adoc#configuration-of-database-clustering-achieve-fail-over-and-load-balancing-with-jdbc-jobstore

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM