任務調度之 Quartz


任務調度的背景

  在業務系統中有很多這樣的場景:

  • 賬單日或者還款日上午 10 點,給每個信用卡客戶發送賬單通知,還款通知。如何判斷客戶的賬單日、還款日,完成通知的發送?
  • 銀行業務系統,夜間要完成跑批的一系列流程,清理數據,下載文件,解析文件,對賬清算、切換結算日期等等。如何觸發一系列流程的執行?
  • 金融機構跟人民銀行二代支付系統對接,人民銀行要求低於 5W 的金額(小額支付)半個小時打一次包發送,以緩解並發壓力。所以,銀行的跨行轉賬分成了多個流程:錄入、復核、發送。如何把半個小時以內的所有數據一次性發送?

  類似於這種基於准確的時刻或者固定的時間間隔觸發的任務,或者有批量數據需要處理,或者要實現兩個動作解耦的場景,我們都可以用任務調度來實現。

任務調度需求分析:

  任務調度的實現方式有很多,如果要實現我們的調度需求,我們對這個工具有什么樣的基本要求呢?

  1. 可以定義觸發的規則,比如基於時刻、時間間隔、表達式。
  2. 可以定義需要執行的任務。比如執行一個腳本或者一段代碼。任務和規則是分開的。
  3. 集中管理配置,持久配置。不用把規則寫在代碼里面,可以看到所有的任務配置,方便維護。重啟之后任務可以再次調度——配置文件或者配置中心。
  4. 支持任務的串行執行,例如執行 A 任務后再執行 B 任務再執行 C 任務。
  5. 支持多個任務並發執行,互不干擾(例如 ScheduledThreadPoolExecutor)。
  6. 有自己的調度器,可以啟動、中斷、停止任務。
  7. 容易集成到 Spring。

任務調度工具對比:

  • 操作系統 Linux crontab:Windows 計划任務,只能執行簡單腳本或者命令
  • 數據庫 MySQL、Oracle 可以操作數據。不能執行 Java 代碼
  • 工具 Kettle 可以操作數據,執行腳本。沒有集中配置
  • 開發語言 JDK Timer、ScheduledThreadPool Timer:單線程JDK1.5 之后:ScheduledThreadPool(Cache、Fiexed、Single):沒有集中配置,日程管理不夠靈活
  • 容器 Spring Task、@Scheduled 不支持集群
  • 分布式框架 XXL-JOB,Elastic-Job

  其中JDK Timer的案例如下:

public class TestTimerTask extends TimerTask {
    /**
     * 此計時器任務要執行的操作。
     */
    public void run() {
        Date executeTime = new Date(this.scheduledExecutionTime());
        String dateStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        System.out.println("任務執行了:" + dateStr);
    }
}
public class TestTimer {
    public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task = new TestTimerTask();
        timer.schedule(task, 5000L, 1000L);
    }
}

  @Scheduled 也是用 JUC 的 ScheduledExecutorService 實現的Scheduled(cron = “0 15 10 15 * ?”)

  1. ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法將@Scheduled 的方法包裝為指定的 task添加到 ScheduledTaskRegistrar 中
  2. ScheduledAnnotationBeanPostProcessor 會監聽 Spring 的容器初始化事件,在 Spring 容器初始化完成后進行TaskScheduler 實現類實例的查找,若發現有 SchedulingConfigurer 的實現類實例,則跳過 3
  3. 查找 TaskScheduler 的實現類實例默認是通過類型查找,若有多個實現則會查找名字為"taskScheduler"的實現 Bean,若沒有找到則在 ScheduledTaskRegistrar 調度任務的時候會創建一個 newSingleThreadScheduledExecutor,將TaskScheduler 的實現類實例設置到 ScheduledTaskRegistrar 屬性中
  4. ScheduledTaskRegistrar 的 scheduleTasks 方法觸發任務調度
  5. 真正調度任務的類是 TaskScheduler 實現類中的 ScheduledExecutorService,由 J.U.C 提供

Quzartz 基本介紹:

  Quatz 是一個特性豐富的,開源的任務調度庫,它幾乎可以嵌入所有的 Java 程序,從很小的獨立應用程序到大型商業系統。Quartz 可以用來創建成百上千的簡單的或者復雜的任務,這些任務可以用來執行任何程序可以做的事情。Quartz 擁有很多企業級的特性,包括支持 JTA 事務和集群。Quartz 的目的就是讓任務調度更加簡單,開發人員只需要關注業務即可。他是用 Java 語言編寫的(也有.NET 的版本)。Java 代碼能做的任何事情,Quartz 都可以調度。特點:

  • 精確到毫秒級別的調度
  • 可以獨立運行,也可以集成到容器中
  • 支持事務(JobStoreCMT )
  • 支持集群
  • 支持持久化

Quzartz Java 編程:

1.引入依賴:

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.0</version>
</dependency>

2.默認配置文件

  org.quartz包下,有一個默認的配置文件,quartz.properties。當我們沒有定義一個同名的配置文件的時候,就會使用默認配置文件里面的配置。

org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
//線程池
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

org.quartz.jobStore.misfireThreshold: 60000
//內存持久化
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

3.創建 Job

  實現唯一的方法 execute(),方法中的代碼就是任務執行的內容。此處僅輸出字符串。

public class MyJob1 implements Job {
    
    public void execute(JobExecutionContext context) throws JobExecutionException {
        Date date = new Date();
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        System.out.println( " " + sf.format(date) + " 任務1執行了,"+dataMap.get("name"));
    }
}

  Job 進一步包裝成 JobDetail。必須要指定 JobName 和 groupName,兩個合起來是唯一標識符。可以攜帶 KV 的數據(JobDataMap),用於擴展屬性,在運行的時候可以從 context獲取到。

// JobDetail  
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
    .withIdentity("myJob1", "group1")
    .usingJobData("name", "wuzz")
    .usingJobData("moon", 5.21F)
    .build();

4 創建 Trigger

  基於 SimpleTrigger 定義了一個每 2 秒鍾運行一次、不斷重復的 Trigger:

// Trigger
Trigger trigger = TriggerBuilder.newTrigger()
    .withIdentity("trigger1", "group1")
    .startNow()
    .withSchedule(SimpleScheduleBuilder.simpleSchedule()
    .withIntervalInSeconds(2)
    .repeatForever())
    .build();

5.創建 Scheduler

  通過 Factory 獲取調度器的實例,把 JobDetail 和 Trigger綁定,注冊到容器中。Scheduler 先啟動后啟動無所謂,只要有 Trigger 到達觸發條件,就會執行任務。

// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();

 // 綁定關系是1:N
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

  注意這里,調度器一定是單例的。

體系結構總結:

JobDetail : 我們創建一個實現 Job 接口的類,使用 JobBuilder 包裝成 JobDetail,它可以攜帶KV 的數據。

Trigger : 定義任務的觸發規律,Trigger,使用 TriggerBuilder 來構建。JobDetail 跟 Trigger 是 1:N 的關系。Trigger 接口在 Quartz 有 4 個繼承的子接口:

  1. SimpleTrigger :簡單觸發器 固定時刻或時間間隔,毫秒
  2. CalendarIntervalTrigger :基於日歷的觸發器 比簡單觸發器更多時間單位,支持非固定時間的觸發,例如一年可能 365/366,一個月可能 28/29/30/31
  3. DailyTimeIntervalTrigger :基於日期的觸發器 每天的某個時間段
  4. CronTrigger :基於 Cron 表達式的觸發器 ,可以借助 http://cron.qqe2.com/ 生成表達式
Trigger dailyTimeIntervalTrigger = TriggerBuilder.newTrigger()
  .withIdentity("trigger1", "group1")
  .startNow()
  .withSchedule(DailyTimeIntervalScheduleBuilder.dailyTimeIntervalSchedule()
  .startingDailyAt(TimeOfDay.hourAndMinuteOfDay(9, 0)) //第天9:00開始
  .endingDailyAt(TimeOfDay.hourAndMinuteOfDay(16, 0)) //16:00 結束
  .onDaysOfTheWeek(1,2,3,4,5) //周一至周五執行
  .withIntervalInHours(1) //每間隔1小時執行一次
  .withRepeatCount(100))//最多重復100次(實際執行100+1次)
  .build();

  上面我們定義的都是在什么時間執行,但是我們有一些在什么時間不執行的需求,比如:理財周末和法定假日購買不計息;證券公司周末和法定假日休市。是不是要把日期寫在數據庫中,然后讀取基於當前時間判斷呢?

基於 Calendar 的 排除規則

  如果要在觸發器的基礎上,排除一些時間區間不執行任務,就要用到 Quartz 的Calendar 類(注意不是 JDK 的 Calendar)。可以按年、月、周、日、特定日期、Cron表達式排除。調用 Trigger 的 modifiedByCalendar()添加到觸發器中,並且調用調度器的addCalendar()方法注冊排除規則。

// 定義日歷
AnnualCalendar holidays = new AnnualCalendar();// 排除中秋節
Calendar midAutumn = new GregorianCalendar(2019, 9, 13);
holidays.setDayExcluded(midAutumn, true);
// 排除聖誕節
Calendar christmas = new GregorianCalendar(2019, 12, 25);
holidays.setDayExcluded(christmas, true);

// 調度器添加日歷
scheduler.addCalendar("holidays", holidays, false, false);

JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
        .withIdentity("job1", "group1")
        .usingJobData("name","wuzz")
        .build();

Trigger trigger = TriggerBuilder.newTrigger()
        .withIdentity("trigger1", "group1")
        .startNow()
        .modifiedByCalendar("holidays")
        .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                .withIntervalInSeconds(2)
                .repeatForever())
        .build();

 

  • BaseCalendar :為高級的 Calendar 實現了基本的功能,實現了 org.quartz.Calendar 接口AnnualCalendar 排除年中一天或多天
  • CronCalendar :日歷的這種實現排除了由給定的 CronExpression 表達的時間集合。 例如,您可以使用此日歷使用表達式“* * 0-7,18-23?* *”每天排除所有營業時間(上午 8 點至下午 5 點)。 如果 CronTrigger 具有給定的 cron 表達式並且與具有相同表達式的 CronCalendar 相關聯,則日歷將排除觸發器包含的所有時間,並且它們將彼此抵消。
  • DailyCalendar :您可以使用此日歷來排除營業時間(上午 8 點 - 5 點)每天。 每個DailyCalendar 僅允許指定單個時間范圍,並且該時間范圍可能不會跨越每日邊界(即,您不能指定從上午 8 點至凌晨 5 點的時間范圍)。 如果屬性 invertTimeRange 為 false(默認),則時間范圍定義觸發器不允許觸發的時間范圍。 如果 invertTimeRange 為 true,則時間范圍被反轉 - 也就是排除在定義的時間范圍之外的所有時間。
  • HolidayCalendar :特別的用於從 Trigger 中排除節假日
  • MonthlyCalendar :排除月份中的指定數天,例如,可用於排除每月的最后一天
  • WeeklyCalendar :排除星期中的任意周幾,例如,可用於排除周末,默認周六和周日

Scheduler:

  調度器,是 Quartz 的指揮官,由 StdSchedulerFactory 產生。它是單例的。並且是 Quartz 中最重要的 API,默認是實現類是 StdScheduler,里面包含了一個QuartzScheduler。QuartzScheduler 里面又包含了一個 QuartzSchedulerThread。

  Scheduler 中的方法主要分為三大類:

  • 操作調度器本身,例如調度器的啟動 start()、調度器的關閉 shutdown()。
  • 操作 Trigger,例如 pauseTriggers()、resumeTrigger()。
  • 操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()

  這些方法非常重要,可以實現任務的動態調度。

Listener:

  我們有這么一種需求,在每個任務運行結束之后發送通知給運維管理員。那是不是要在每個任務的最后添加一行代碼呢?這種方式對原來的代碼造成了入侵,不利於維護。如果代碼不是寫在任務代碼的最后一行,怎么知道任務執行完了呢?或者說,怎么監測到任務的生命周期呢?觀察者模式:定義對象間一種一對多的依賴關系,使得每當一個對象改變狀態,則所有依賴它的對象都會得到通知並自動更新。Quartz 中提供了三種 Listener,監聽 Scheduler 的,監聽 Trigger 的,監聽 Job 的。只需要創建類實現相應的接口,並在 Scheduler 上注冊 Listener,便可實現對核心對象的監聽。

  1. JobListener
  2. TriggerListener
  3. SchedulerListener

  以 JobListener 為例:

public class MyJobListener implements JobListener {

    public String getName() {
        String name = getClass().getSimpleName();
        System.out.println( "Method 111111 :"+ "獲取到監聽器名稱:"+name);
        return name;
    }

    public void jobToBeExecuted(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        System.out.println("Method 222222 :"+ jobName + " ——任務即將執行 ");
    }

    public void jobExecutionVetoed(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        System.out.println("Method 333333 :"+ jobName + " ——任務被否決 ");
    }

    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        String jobName = context.getJobDetail().getKey().getName();
        System.out.println("Method 444444 :"+ jobName + " ——執行完畢 ");
        System.out.println("------------------");
    }
}
public static void main(String[] args) throws SchedulerException {

   // JobDetail
   JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();

   // Trigger
   Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
         .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();

   // SchedulerFactory
   SchedulerFactory  factory = new StdSchedulerFactory();

   // Scheduler
   Scheduler scheduler = factory.getScheduler();

   scheduler.scheduleJob(jobDetail, trigger);

   // 創建並注冊一個全局的Job Listener
   scheduler.getListenerManager().addJobListener(new MyJobListener(), EverythingMatcher.allJobs());

   scheduler.start();
   
}

  工具類:ListenerManager,用於添加、獲取、移除監聽器。Matcher,主要是基於 groupName 和 keyName 進行匹配。

JobStore:

  Jobstore 用來存儲任務和觸發器相關的信息,例如所有任務的名稱、數量、狀態等等。Quartz 中有兩種存儲任務的方式,一種在在內存,一種是在數據庫。

RAMJobStore:

  Quartz 默認的 JobStore 是 RAMJobstore,也就是把任務和觸發器信息運行的信息存儲在內存中,用到了 HashMap、TreeSet、HashSet 等等數據結構。如果程序崩潰或重啟,所有存儲在內存中的數據都會丟失。所以我們需要把這些數據持久化到磁盤。

JDBCJobStore:

  JDBCJobStore 可以通過 JDBC 接口,將任務運行數據保存在數據庫中。JDBC 的實現方式有兩種,JobStoreSupport 類的兩個子類:

  • JobStoreTX:在獨立的程序中使用,自己管理事務,不參與外部事務。
  • JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事務時,使用它。

  使用 JDBCJobSotre 時,需要配置數據庫信息:

org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties,不使用默認配置
org.quartz.jobStore.useProperties:true
#數據庫中 quartz 表的表名前綴
org.quartz.jobStore.tablePrefix:QRTZ_
org.quartz.jobStore.dataSource:myDS
​
#配置數據源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual

  問題來了?需要建什么表?表里面有什么字段?字段類型和長度是什么?在官網的 Downloads 鏈接中,提供了 11 張表的建表語句:quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables2.3 的版本在這個路徑下:src\org\quartz\impl\jdbcjobstore

  表名與作用:

  1. QRTZ_BLOB_TRIGGERS Trigger 作為 Blob 類型存儲
  2. QRTZ_CALENDARS 存儲 Quartz 的 Calendar 信息
  3. QRTZ_CRON_TRIGGERS 存儲 CronTrigger,包括 Cron 表達式和時區信息
  4. QRTZ_FIRED_TRIGGERS 存儲與已觸發的 Trigger 相關的狀態信息,以及相關 Job 的執行信息
  5. QRTZ_JOB_DETAILS 存儲每一個已配置的 Job 的詳細信息
  6. QRTZ_LOCKS 存儲程序的悲觀鎖的信息
  7. QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的 Trigger 組的信息
  8. QRTZ_SCHEDULER_STATE 存儲少量的有關 Scheduler 的狀態信息,和別的 Scheduler 實例
  9. QRTZ_SIMPLE_TRIGGERS 存儲 SimpleTrigger 的信息,包括重復次數、間隔、以及已觸的次數
  10. QRTZ_SIMPROP_TRIGGERS 存儲 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 兩種類型的觸發器
  11. QRTZ_TRIGGERS 存儲已配置的 Trigger 的信息

動態調度的實現:

  傳統的 Spring 方式集成,由於任務信息全部配置在 xml 文件中,如果需要操作任務或者修改任務運行頻率,只能重新編譯、打包、部署、重啟,如果有緊急問題需要處理,會浪費很多的時間。有沒有可以動態調度任務的方法?比如停止一個 Job?啟動一個 Job?修改 Job 的觸發頻率?讀取配置文件、寫入配置文件、重啟 Scheduler 或重啟應用明顯是不可取的。對於這種頻繁變更並且需要實時生效的配置信息,我們可以放到哪里?可以存放於ZK、Redis、DB tables。並且,我們可以提供一個界面,實現對數據表的輕松操作。

  這里我們用最簡單的數據庫的實現。問題 1:建一張什么樣的表?參考 JobDetail 的屬性。

CREATE TABLE `sys_job` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`job_name` varchar(512) NOT NULL COMMENT '任務名稱',
`job_group` varchar(512) NOT NULL COMMENT '任務組名',
`job_cron` varchar(512) NOT NULL COMMENT '時間表達式',
`job_class_path` varchar(1024) NOT NULL COMMENT '類路徑,全類型',
`job_data_map` varchar(1024) DEFAULT NULL COMMENT '傳遞 map 參數',
`job_status` int(2) NOT NULL COMMENT '狀態:1 啟用 0 停用',
`job_describe` varchar(1024) DEFAULT NULL COMMENT '任務功能描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

  操作數據表非常簡單,SSM 增刪改查。但是在修改了表的數據之后,怎么讓調度器知道呢?調度器的接口:Scheduler。在我們的需求中,我們需要做的事情:

  1. 新增一個任務
  2. 刪除一個任務
  3. 啟動、停止一個任務
  4. 修改任務的信息(包括調度規律)

  因 此 可 以 把 相 關 的 操 作 封 裝 到 一 個 工 具 類 中 。我這里貼出來基本的增刪改的幾個方法:

public static void addJob(String jobClassName, String jobName, String jobGroupName, String cronExpression, String jobDataMap) throws Exception {
        // 通過SchedulerFactory獲取一個調度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        // 啟動調度器
        scheduler.start();
        // 構建job信息
        JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
                .withIdentity(jobName, jobGroupName).build();
        // JobDataMap用於傳遞任務運行時的參數,比如定時發送郵件,可以用json形式存儲收件人等等信息
        if (StringUtils.isNotEmpty(jobDataMap)) {
            JSONObject jb = JSONObject.parseObject(jobDataMap);
            Map<String, Object> dataMap = (Map<String, Object>) jb.get("data");
            for (Map.Entry<String, Object> m : dataMap.entrySet()) {
                jobDetail.getJobDataMap().put(m.getKey(), m.getValue());
            }
        }
        // 表達式調度構建器(即任務執行的時間)
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
        // 按新的cronExpression表達式構建一個新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                .withSchedule(scheduleBuilder).startNow().build();
        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            logger.info("創建定時任務失敗" + e);
            throw new Exception("創建定時任務失敗");
        }
    }

    /**
     * 停用一個定時任務
     *
     * @param jobName      任務名稱
     * @param jobGroupName 組別
     * @throws Exception
     */
    public static void jobPause(String jobName, String jobGroupName) throws Exception {
        // 通過SchedulerFactory獲取一個調度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
    }

    /**
     * 啟用一個定時任務
     *
     * @param jobName      任務名稱
     * @param jobGroupName 組別
     * @throws Exception
     */
    public static void jobresume(String jobName, String jobGroupName) throws Exception {
        // 通過SchedulerFactory獲取一個調度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
    }

    /**
     * 刪除一個定時任務
     *
     * @param jobName      任務名稱
     * @param jobGroupName 組別
     * @throws Exception
     */
    public static void jobdelete(String jobName, String jobGroupName) throws Exception {
        // 通過SchedulerFactory獲取一個調度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));
        scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));
        scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
    }

容器啟動與 Service 注入

  容器啟動:因為任務沒有定義在 ApplicationContext.xml 中,而是放到了數據庫中,SpringBoot 啟動時,怎么讀取任務信息?或者,怎么在 Spring 啟動完成的時候做一些事情?創建一個類,實現 CommandLineRunner 接口,實現 run 方法。從表中查出狀態是 1 的任務,然后構建。

  Service 類注入到 Job 中 : Spring Bean 如何注入到實現了 Job 接口的類中?例如在 TaskJob 中,需要注入 XXXXService(自定義的service),查詢數據庫進行操作。如果沒有任何配置,注入會報空指針異常。

  • 原因:因為定時任務 Job 對象的實例化過程是在 Quartz 中進行的,而 Service Bean 是由Spring 容器管理的,Quartz 察覺不到 Service Bean 的存在,所以無法將 Service Bean裝配到 Job 對象中。
  • 分析:Quartz 集成到 Spring 中,用到 SchedulerFactoryBean,其實現了 InitializingBean方法,在唯一的方法 afterPropertiesSet()在 Bean 的屬性初始化后調用。調度器用 AdaptableJobFactory 對 Job 對象進行實例化。所以,如果我們可以把這個 JobFactory 指定為我們自定義的工廠的話,就可以在 Job 實例化完成之后,把 Job納入到 Spring 容器中管理。

解決這個問題的步驟:

  1.定義一個 AdaptableJobFactory,實現 JobFactory 接口,實現接口定義的newJob 方法,在這里面返回 Job 實例,具體實現可以參考 源碼  org.springframework.scheduling.quartz.AdaptableJobFactory 類,直接拷貝就可以了。

  2.定義一個 MyJobFactory,繼承 AdaptableJobFactory。使用 Spring 的 AutowireCapableBeanFactory,把 Job 實例注入到容器中。

@Component
public class MyJobFactory extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //調用父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        capableBeanFactory.autowireBean(jobInstance);

        return jobInstance;
    }
}

  3.指定 Scheduler 的 JobFactory 為自定義的 JobFactory。

scheduler.setJobFactory(myJobFactory);

Quartz 集群部署:

為什么 需要集群?

  1. 防止單點故障,減少對業務的影響
  2. 減少節點的壓力,例如在 10 點要觸發 1000 個任務,如果有 10 個節點,則每個節點之需要執行 100 個任務

集群需要 解決的問題?

  1. 任務重跑,因為節點部署的內容是一樣的,到 10 點的時候,每個節點都會執行相同的操作,引起數據混亂。比如跑批,絕對不能執行多次。
  2. 任務漏跑,假如任務是平均分配的,本來應該在某個節點上執行的任務,因為節點故障,一直沒有得到執行。
  3. 水平集群需要注意時間同步問題
  4. Quartz 使用的是隨機的負載均衡算法,不能指定節點執行所以必須要有一種共享數據或者通信的機制。在分布式系統的不同節點中,我們可以采用什么樣的方式,實現數據共享?兩兩通信,或者基於分布式的服務,實現數據共享。例如:ZK、Redis、DB。

  在 Quartz 中,提供了一種簡單的方式,基於數據庫共享任務執行信息。也就是說,一個節點執行任務的時候,會操作數據庫,其他的節點查詢數據庫,便可以感知到了。同樣的問題:建什么表?哪些字段?依舊使用系統自帶的 11 張表。

  配置集群需要先配置如下信息,另外需要配置數據庫連接持久化的信息 ,具體如上文所示。

#如果使用集群,instanceId必須唯一,設置成AUTO
org.quartz.scheduler.instanceId = AUTO

#是否使用集群
org.quartz.jobStore.isClustered = true

  如果設置的數量>1,並且使用 JDBC JobStore(RAMJobStore 不支持分布式,只有 一 個 調 度 器 實 例 , 所 以 不 加 鎖 ) , 則 屬 性org.quartz.jobStore.acquireTriggersWithinLock 應設置為 true。否則不加鎖會導致任務重復執行。

//代表 Scheduler 一次拉取trigger 的最大數量,默認是 1
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
org.quartz.jobStore.acquireTriggersWithinLock=true

Quartz 調度原理:

  • Job 沒有繼承 Thread 和實現 Runnable,是怎么被調用的?通過反射還是什么?
  • 任務是什么時候被調度的?是誰在監視任務還是監視 Trigger?
  • 任務是怎么被調用的?誰執行了任務?
  • 任務本身有狀態嗎?還是觸發器有狀態?

  看源碼的入口

Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

  factory.getScheduler() 調用到  StdSchedulerFactory 的方法,如下:

public Scheduler getScheduler() throws SchedulerException {
  if (cfg == null) {
    // 讀取 quartz.properties 配置文件
    initialize();
  }
  // 這個類是一個 HashMap,用來基於調度器的名稱保證調度器的唯一性
  SchedulerRepository schedRep = SchedulerRepository.getInstance();
​
  Scheduler sched = schedRep.lookup(getSchedulerName());
  // 如果調度器已經存在了
  if (sched != null) {
    // 調度器關閉了,移除
    if (sched.isShutdown()) {
      schedRep.remove(getSchedulerName());
    } else {
      // 返回調度器
      return sched;
    }
  }
  // 調度器不存在,初始化
  sched = instantiate();
​
  return sched;
}

  instantiate()方法中做了初始化的所有工作:

// 存儲任務信息的 JobStore
JobStore js = null;
// 創建線程池,默認是 SimpleThreadPool
ThreadPool tp = null;
// 創建調度器
QuartzScheduler qs = null;
// 連接數據庫的連接管理器
DBConnectionManager dbMgr = null;
// 自動生成 ID
String instanceIdGeneratorClass = null;
// 創建線程執行器,默認為 DefaultThreadExecutor 
ThreadExecutor threadExecutor;

  創建線程池 (包工頭)即上面代碼的 ThreadPool tp = null; 在該方法830行和839行,創建了一個線程池,默認是配置文件中指定的SimpleThreadPool。

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();

  SimpleThreadPool 里面維護了三個 list,分別存放所有的工作線程、空閑的工作線程和忙碌的工作線程。我們可以把 SimpleThreadPool 理解為包工頭。

private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

  tp 的 runInThread()方法是線程池運行線程的接口方法。參數 Runnable 是執行的任務內容。取出 WorkerThread 去執行參數里面的 runnable(JobRunShell)。

WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);

  WorkerThread (工人)是 SimpleThreadPool 的 內 部 類 , 用 來 執 行 任 務 。 我 們 把WorkerThread理解為工人。在WorkerThread的run方法中,執行傳入的參數runnable任務:

runnable.run();

  創建調度線程 (項目經理 )QuartzScheduler:

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

  在 QuartzScheduler 的構造函數中,創建了 QuartzSchedulerThread,我們把它理解為項目經理,它會調用包工頭的工人資源,給他們安排任務。並 且 創 建 了 線 程 執 行 器 schedThreadExecutor , 執 行 了 這 個QuartzSchedulerThread,也就是調用了它的 run 方法。

// 創建一個線程,resouces 里面有線程名稱
this.schedThread = new QuartzSchedulerThread(this, resources);
// 線程執行器
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//執行這個線程,也就是調用了線程的 run 方法
schedThreadExecutor.execute(this.schedThread);

  點開 QuartzSchedulerThread 類,找到 run 方法,這個是 Quartz 任務調度的核心方法:

public void run() {
        int acquiresFailed = 0;
        // 檢查 scheuler 是否為停止狀態
        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    // 檢查是否為暫停狀態
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            // 暫停的話會嘗試去獲得信號鎖,並 wait 一會
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                // 從 JobStore 獲取 Job 持續失敗,sleep 一下
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }
                // 從線程池獲取可用的線程
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        // 獲取需要下次執行的 triggers
                        // idleWaitTime: 默認 30s
                        // availThreadCount:獲取可用(空閑)的工作線程數量,總會大於 1,因為該方法會一直阻塞,直到有工作線程空閑下來。
                        // maxBatchSize:一次拉取 trigger 的最大數量,默認是 1
                        // batchTimeWindow:時間窗口調節參數,默認是 0
                        // misfireThreshold: 超過這個時間還未觸發的 trigger,被認為發生了 misfire,默認 60s
                        // 調度線程一次會拉取 NEXT_FIRETIME 小於(now + idleWaitTime  +batchTimeWindow),大於(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個 triggers,默認情況下,會拉取未來 30s、
過去 60s 之間還未 fire 的 1 個 trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    } catch (RuntimeException e) {
                        if (acquiresFailed == 0) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                // 觸發 Trigger,把 ACQUIRED 狀態改成 EXECUTING
                                // 如果這個 trigger 的 NEXTFIRETIME 為空,也就是未來不再觸發,就將其狀態改為COMPLETE
                                // 如果trigger不允許並發執行(即Job的實現類標注了@DisallowConcurrentExecution),則將狀態變為 BLOCKED,否則就將狀態改為 WAITING
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                // 循環處理 Trigger
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                // 根據 trigger 信息實例化 JobRunShell(implements Runnable),同時依據JOB_CLASS_NAME 實例化 Job,隨后我們將 JobRunShell 實例丟入工作線
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            // 執行 JobRunShell 的 run 方法
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    } 

  JobRunShell 的作用:JobRunShell 用來為 Job 提供安全的運行環境的,執行 Job 中所有的作業,捕獲運行中的異常,在任務執行完畢的時候更新 Trigger 狀態,等等。JobRunShell 實例是用 JobRunShellFactory 為 QuartzSchedulerThread 創建的,在調度器決定一個 Job 被觸發的時候,它從線程池中取出一個線程來執行任務。

線程模型總結:

  1. SimpleThreadPool:包工頭,管理所有 WorkerThread
  2. WorkerThread:工人,把 Job 包裝成 JobRunShell,執行
  3. QuartSchedulerThread:項目經理,獲取即將觸發的 Trigger,從包工頭出拿到worker,執行 Trigger 綁定的任務

  綁定 l JobDetail 和 和 Trigger(scheduler.scheduleJob(jobDetail, trigger);)

// 存儲 JobDetail 和 Trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知相關的 Listener
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);

  啟動調度器scheduler.start();

public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
// 通知監聽器 notifySchedulerListenersStarting(); if (initialStart == null) { initialStart = new Date(); this.resources.getJobStore().schedulerStarted(); startPlugins(); } else { resources.getJobStore().schedulerResumed(); } // 通知 QuartzSchedulerThread 不再等待,開始干活 schedThread.togglePause(false); getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted(); }

  getScheduler 方法創建線程池 ThreadPool,創建調度器 QuartzScheduler,創建調度線程 QuartzSchedulerThread,調度線程初始處於暫停狀態。scheduleJob 將任務添加到 JobStore 中。scheduler.start()方法激活調度器,QuartzSchedulerThread 從 timeTrriger 取出待觸發的任務,並包裝成 TriggerFiredBundle,然后由 JobRunShellFactory 創建TriggerFiredBundle 的 執 行 線 程 JobRunShell , 調 度 執 行 通 過 線 程 池SimpleThreadPool去執行JobRunShell,而JobRunShell執行的就是任務類的execute方法:job.execute(JobExecutionContext context)。

1、一個任務10秒鍾觸發一次,但是每次執行需要60秒,在第20秒的時候,會同時運行兩個任務嗎?怎么禁止一個任務並發運行?需要注解 @DisallowConcurrentExecution

@DisallowConcurrentExecution
public class MyJob4 implements Job {
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        Date date = new Date();
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            System.out.println(" " + sf.format(date) + "   任務4開始執行了,請等待3秒");
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2、現在有三個任務,任務A、任務B、任務C,怎么讓多個任務串行執行,例如A執行完了之后再執行B,B執行完了再執行C?

  如果僅僅只是串行的話,可以把三個任務放在同一個任務當中。

3、除了執行本地的代碼之外,怎么調用其他系統的任務?

  鑒權的HTTP,RPC

4、任務在什么時候會錯過觸發?錯過觸發怎么辦?

  沒有可用線程,Trigger 被暫停,系統重啟,禁止並發執行的任務在到達觸發時間時,上次執行還沒有結束。

  忽略,立即跑一次,下次跑。

  每一種 Trigger 都定義了自己的 Misfire 策略,不同的策略通過不同的方法來設置。

// Trigger1
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
        .withSchedule(SimpleScheduleBuilder.simpleSchedule().
          withMisfireHandlingInstructionNowWithExistingCount().
           withIntervalInSeconds(1).
           repeatForever()).build();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM