quartz2.3.0(十三)數據庫持久化定時器job任務和trigger觸發器,在多個調度器實例情況下,由其它調度器實例恢復執行調度器宕機的job任務


一、初始化數據庫11張quartz表:qrtz_*  

先從官網下載好quartz2.3.0包:http://www.quartz-scheduler.org/downloads/ 

解壓后進入目錄:quartz-2.3.0-SNAPSHOT\src\org\quartz\impl\jdbcjobstore

得到22種數據庫的11張qrtz_*表的初始化SQL,這里列舉幾個經典的數據庫文件:tables_oracle.sql、tables_mysql_innodb.sql、tables_sqlServer.sql、tables_postgres.sql

二、配置定時器quartz.properties

 

#============================================================================
# Configure Main Scheduler Properties  
#============================================================================
org.quartz.scheduler.instanceName=TestScheduler
#instance_1 更改后綴數字后,程序可以執行不同ID的調度器,啟動多個調度器,有利於觀察其中一個實例宕機后,另外實例恢復這個實例的job任務情況
org.quartz.scheduler.instanceId=instance_1

#============================================================================
# Configure ThreadPool  
#============================================================================
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=5
org.quartz.threadPool.threadPriority=5

#============================================================================
# 配置Oracle數據庫,命名dataSource為myDS
#============================================================================
# 支持PostgreSQL數據庫
#org.quartz.dataSource.myDS.driver=org.postgresql.Driver
#org.quartz.dataSource.myDS.URL=jdbc:postgresql://localhost:5432/quartz
org.quartz.dataSource.myDS.driver=oracle.jdbc.driver.OracleDriver
org.quartz.dataSource.myDS.URL=jdbc:oracle:thin:@localhost:1521:orcl
org.quartz.dataSource.myDS.user=zhuwen
org.quartz.dataSource.myDS.password=ZHUwen12
org.quartz.dataSource.myDS.maxConnections=5
org.quartz.dataSource.myDS.validationQuery=select 0 FROM DUAL

#============================================================================
# 配置job任務存儲策略,指定一個叫myDS的dataSource
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# 支持PostgreSQL數據庫
#org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
# 支持Oracle數據庫
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.oracle.OracleDelegate
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true

 

 

 

三、job任務類

第一個job類:SimpleRecoveryJob

package org.quartz.examples.example13;

import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 * 一個愚蠢的作業實現,用於單元測試。
 */
public class SimpleRecoveryJob implements Job {

    private static Logger LOG = LoggerFactory.getLogger(SimpleRecoveryJob.class);

    private static final String COUNT = "count";

    //必須要有public修飾的無參構造函數
    public SimpleRecoveryJob() {
    }

    //任務執行方法
    public void execute(JobExecutionContext context) throws JobExecutionException {

        JobKey jobKey = context.getJobDetail().getKey();

        // 如果作業正在恢復。如果由於“恢復”情況而重新執行作業,此方法將返回true。
        if (context.isRecovering()) {
//            LOG.info("恢復作業:SimpleRecoveryJob: " + jobKey + " RECOVERING at " + new Date());
            System.err.println("恢復作業:SimpleRecoveryJob: " + jobKey + " RECOVERING at " + new Date());
        } else {
//            LOG.info("不恢復作業:SimpleRecoveryJob: " + jobKey + " starting at " + new Date());
            System.err.println("不恢復作業:SimpleRecoveryJob: " + jobKey + " starting at " + new Date());
        }

        // 睡眠10秒
        long delay = 10L * 1000L;
        try {
            Thread.sleep(delay);
        } catch (Exception e) {
            //
        }

        JobDataMap data = context.getJobDetail().getJobDataMap();
        int count;
        if (data.containsKey(COUNT)) {
            count = data.getInt(COUNT);
        } else {
            count = 0;
        }
        count++;
        data.put(COUNT, count);

        LOG.info("SimpleRecoveryJob: " + jobKey + " done at " + new Date() + "\n Execution #" + count);

    }

}

下面是第二個job類,注意這個job類不允許多線程並發執行:

package org.quartz.examples.example13;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;

/**
 * 這個作業具有與SimpleRecoveryJob相同的功能,只是這個作業實現是“有狀態的”,
 * 因為它將在每次執行之后自動重新持久化它的數據(JobDataMap),並且一次只能執行JobDetail的一個實例。
 * 
 */
@PersistJobDataAfterExecution //持久化JobDataMap里的數據,使下一個定時任務還能獲取到這些值
@DisallowConcurrentExecution //禁止並發多任務執行,所以永遠只有一個任務在執行中
public class SimpleRecoveryStatefulJob extends SimpleRecoveryJob {

  public SimpleRecoveryStatefulJob() {
    super();
  }
}

四、任務調度類

package org.quartz.examples.example13;

import static org.quartz.DateBuilder.futureDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用於測試/顯示JDBCJobStore (JobStoreTX或JobStoreCMT)的集群特性。
 * <p>
 * 所有實例必須使用不同的屬性文件,因為它們的實例id必須不同,但是所有其他屬性應該相同。
 * </p>
 * <p>
 * 如果您希望它清除現有的作業&觸發器,設置clearJobs=true
 * </p>
 * <p>
 * 您可能需要先清空殘留的表數據,因為將來自非集群設置的數據與集群設置的數據混合在一起可能是不好的。
 * </p>
 * <p>
 *    你可以在運行多個Scheduler實例的時候,殺死其中一個實例,並確保其余實例恢復正在進行的作業。注意,使用默認設置檢測故障可能需要15秒左右的時間。
 * </p>
 * <p>
 * 還可以嘗試使用/不使用調度程序注冊的shutdown-hook插件運行它。(org.quartz.plugins.management.ShutdownHookPlugin)。
 * </p>
 * <p>
 * 注意:從不要在單獨的機器上運行集群,除非它們的時鍾使用某種形式的時間同步服務(例如NTP守護進程)進行同步。
 * </p>
 */
public class ClusterExample {

    private static Logger LOG = LoggerFactory.getLogger(ClusterExample.class);

    public void run(boolean inClearJobs, boolean inScheduleJobs) throws Exception {

        // 初始化調度器
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler sched = sf.getScheduler();

        if (inClearJobs) {
            LOG.warn("***** 刪除已存在的job任務和triggers觸發器。Deleting existing jobs/triggers *****");
            sched.clear();
        }

        if (inScheduleJobs) {
            LOG.info("------- Scheduling Jobs ------------------");
            String schedId = sched.getSchedulerInstanceId();

            // ========================================================
            // ============ job1
            // ========================================================
            int count = 1;
            JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId).requestRecovery() // 如果job執行過程中宕機,則job重新執行
                    .build();
            SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
                    .startAt(futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
            LOG.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                    + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
            sched.scheduleJob(job, trigger);

            // ========================================================
            // ============ job2
            // ========================================================
            count++;
            job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId).requestRecovery() // 如果job執行過程中宕機,則job重新執行
                    .build();
            trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(2, IntervalUnit.SECOND))
                    .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
            LOG.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                    + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
            sched.scheduleJob(job, trigger);

            // ========================================================
            // ============ job3,這里使用的是job子類SimpleRecoveryStatefulJob
            // ========================================================
            count++;
            job = newJob(SimpleRecoveryStatefulJob.class).withIdentity("job_" + count, schedId).requestRecovery() // 如果job執行過程中宕機,則job重新執行
                    .build();
            trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(3)).build();
            LOG.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                    + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
            sched.scheduleJob(job, trigger);

            // ========================================================
            // ============ job4
            // ========================================================
            count++;
            job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId).requestRecovery() // 如果job執行過程中宕機,則job重新執行
                    .build();
            trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(4)).build();
            LOG.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: "
                    + trigger.getRepeatCount() + "/" + trigger.getRepeatInterval());
            sched.scheduleJob(job, trigger);

            // ========================================================
            // ============ job5
            // ========================================================
            count++;
            job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId).requestRecovery() // 如果job執行過程中宕機,則job重新執行
                    .build();
            trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInMilliseconds(4500L)).build();
            LOG.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: "
                    + trigger.getRepeatCount() + "/" + trigger.getRepeatInterval());
            sched.scheduleJob(job, trigger);
        }

        sched.start();

        LOG.info("------- Waiting for one hour... ----------");
        try {
            Thread.sleep(3600L * 1000L);
        } catch (Exception e) {
            //
        }

        sched.shutdown();
        LOG.info("------- Shutdown Complete ----------------");
    }

    public static void main(String[] args) throws Exception {
        boolean clearJobs = false;
        boolean scheduleJobs = true;

        for (String arg : args) {
            if (arg.equalsIgnoreCase("clearJobs")) {
                clearJobs = true;
            } else if (arg.equalsIgnoreCase("dontScheduleJobs")) {
                scheduleJobs = false;
            }
        }
        
        clearJobs = true;
        scheduleJobs = true;

        ClusterExample example = new ClusterExample();
        example.run(clearJobs, scheduleJobs);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM