一、Quartz 基本介紹
1.1 Quartz 概述
1.2 Quartz特點
1.3 Quartz 集群配置
二、Quartz 原理及流程
2.1 quartz基本原理
2.2 quartz啟動流程
三、Spring + Quartz 實現企業級調度的實現示例
3.1 環境信息
3.2 相關代碼及配置
四、問題及解決方案
五、相關知識
六、參考資料
總結
一、Quartz 基本介紹
1.1 Quartz 概述
Quartz 是 OpenSymphony 開源組織在任務調度領域的一個開源項目,完全基於 Java 實現。該項目於 2009 年被 Terracotta 收購,目前是 Terracotta 旗下的一個項目。讀者可以到 http://www.quartz-scheduler.org/站點下載 Quartz 的發布版本及其源代碼。
1.2 Quartz特點
作為一個優秀的開源調度框架,Quartz 具有以下特點:
- 強大的調度功能,例如支持豐富多樣的調度方法,可以滿足各種常規及特殊需求;
- 靈活的應用方式,例如支持任務和調度的多種組合方式,支持調度數據的多種存儲方式;
- 分布式和集群能力,Terracotta 收購后在原來功能基礎上作了進一步提升。
另外,作為 Spring 默認的調度框架,Quartz 很容易與 Spring 集成實現靈活可配置的調度功能。
quartz調度核心元素:
- Scheduler:任務調度器,是實際執行任務調度的控制器。在spring中通過SchedulerFactoryBean封裝起來。
- Trigger:觸發器,用於定義任務調度的時間規則,有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger,其中CronTrigger用的比較多,本文主要介紹這種方式。CronTrigger在spring中封裝在CronTriggerFactoryBean中。
- Calendar:它是一些日歷特定時間點的集合。一個trigger可以包含多個Calendar,以便排除或包含某些時間點。
- JobDetail:用來描述Job實現類及其它相關的靜態信息,如Job名字、關聯監聽器等信息。在spring中有JobDetailFactoryBean和 MethodInvokingJobDetailFactoryBean兩種實現,如果任務調度只需要執行某個類的某個方法,就可以通過MethodInvokingJobDetailFactoryBean來調用。
- Job:是一個接口,只有一個方法void execute(JobExecutionContext context),開發者實現該接口定義運行任務,JobExecutionContext類提供了調度上下文的各種信息。Job運行時的信息保存在JobDataMap實例中。實現Job接口的任務,默認是無狀態的,若要將Job設置成有狀態的,在quartz中是給實現的Job添加@DisallowConcurrentExecution注解(以前是實現StatefulJob接口,現在已被Deprecated),在與spring結合中可以在spring配置文件的job detail中配置concurrent參數。
1.3 Quartz 集群配置
quartz集群是通過數據庫表來感知其他的應用的,各個節點之間並沒有直接的通信。只有使用持久的JobStore才能完成Quartz集群。
數據庫表:以前有12張表,現在只有11張表,現在沒有存儲listener相關的表,多了QRTZ_SIMPROP_TRIGGERS表:
Table name | Description |
---|---|
QRTZ_CALENDARS | 存儲Quartz的Calendar信息 |
QRTZ_CRON_TRIGGERS | 存儲CronTrigger,包括Cron表達式和時區信息 |
QRTZ_FIRED_TRIGGERS | 存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的Trigger組的信息 |
QRTZ_SCHEDULER_STATE | 存儲少量的有關Scheduler的狀態信息,和別的Scheduler實例 |
QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
QRTZ_JOB_DETAILS | 存儲每一個已配置的Job的詳細信息 |
QRTZ_SIMPLE_TRIGGERS | 存儲簡單的Trigger,包括重復次數、間隔、以及已觸的次數 |
QRTZ_BLOG_TRIGGERS | Trigger作為Blob類型存儲 |
QRTZ_TRIGGERS | 存儲已配置的Trigger的信息 |
QRTZ_SIMPROP_TRIGGERS |
QRTZ_LOCKS就是Quartz集群實現同步機制的行鎖表,包括以下幾個鎖:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS。
二、Quartz 原理及流程
2.1 quartz基本原理
核心元素
Quartz 任務調度的核心元素是 scheduler, trigger 和 job,其中 trigger 和 job 是任務調度的元數據, scheduler 是實際執行調度的控制器。
在 Quartz 中,trigger 是用於定義調度時間的元素,即按照什么時間規則去執行任務。Quartz 中主要提供了四種類型的 trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和 NthIncludedDayTrigger。這四種 trigger 可以滿足企業應用中的絕大部分需求。我們將在企業應用一節中進一步討論四種 trigger 的功能。
在 Quartz 中,job 用於表示被調度的任務。主要有兩種類型的 job:無狀態的(stateless)和有狀態的(stateful)。對於同一個 trigger 來說,有狀態的 job 不能被並行執行,只有上一次觸發的任務被執行完之后,才能觸發下一次執行。Job 主要有兩種屬性:volatility 和 durability,其中 volatility 表示任務是否被持久化到數據庫存儲,而 durability 表示在沒有 trigger 關聯的時候任務是否被保留。兩者都是在值為 true 的時候任務被持久化或保留。一個 job 可以被多個 trigger 關聯,但是一個 trigger 只能關聯一個 job。
在 Quartz 中, scheduler 由 scheduler 工廠創建:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二種工廠 StdSchedulerFactory 使用較多,因為 DirectSchedulerFactory 使用起來不夠方便,需要作許多詳細的手工編碼設置。 Scheduler 主要有三種:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。本文以最常用的 StdScheduler 為例講解。這也是筆者在項目中所使用的 scheduler 類。
Quartz 核心元素之間的關系如下圖所示:
圖 1. Quartz 核心元素關系圖
線程視圖
在 Quartz 中,有兩類線程,Scheduler 調度線程和任務執行線程,其中任務執行線程通常使用一個線程池維護一組線程。
圖 2. Quartz 線程視圖
Scheduler 調度線程主要有兩個: 執行常規調度的線程,和執行 misfired trigger 的線程。常規調度線程輪詢存儲的所有 trigger,如果有需要觸發的 trigger,即到達了下一次觸發的時間,則從任務執行線程池獲取一個空閑線程,執行與該 trigger 關聯的任務。Misfire 線程是掃描所有的 trigger,查看是否有 misfired trigger,如果有的話根據 misfire 的策略分別處理。下圖描述了這兩個線程的基本流程:
圖 3. Quartz 調度線程流程圖
關於 misfired trigger,我們在企業應用一節中將進一步描述。
數據存儲
Quartz 中的 trigger 和 job 需要存儲下來才能被使用。Quartz 中有兩種存儲方式:RAMJobStore, JobStoreSupport,其中 RAMJobStore 是將 trigger 和 job 存儲在內存中,而 JobStoreSupport 是基於 jdbc 將 trigger 和 job 存儲到數據庫中。RAMJobStore 的存取速度非常快,但是由於其在系統被停止后所有的數據都會丟失,所以在通常應用中,都是使用 JobStoreSupport。
在 Quartz 中,JobStoreSupport 使用一個驅動代理來操作 trigger 和 job 的數據存儲:StdJDBCDelegate。StdJDBCDelegate 實現了大部分基於標准 JDBC 的功能接口,但是對於各種數據庫來說,需要根據其具體實現的特點做某些特殊處理,因此各種數據庫需要擴展 StdJDBCDelegate 以實現這些特殊處理。Quartz 已經自帶了一些數據庫的擴展實現,可以直接使用,如下圖所示:
圖 4. Quartz 數據庫驅動代理
作為嵌入式數據庫的代表,Derby 近來非常流行。如果使用 Derby 數據庫,可以使用上圖中的 CloudscapeDelegate 作為 trigger 和 job 數據存儲的代理類。
2.2 quartz啟動流程
若quartz是配置在spring中,當服務器啟動時,就會裝載相關的bean。SchedulerFactoryBean實現了InitializingBean接口,因此在初始化bean的時候,會執行afterPropertiesSet方法,該方法將會調用SchedulerFactory(DirectSchedulerFactory 或者 StdSchedulerFactory,通常用StdSchedulerFactory)創建Scheduler。SchedulerFactory在創建quartzScheduler的過程中,將會讀取配置參數,初始化各個組件,關鍵組件如下:
-
ThreadPool:一般是使用SimpleThreadPool,SimpleThreadPool創建了一定數量的WorkerThread實例來使得Job能夠在線程中進行處理。WorkerThread是定義在SimpleThreadPool類中的內部類,它實質上就是一個線程。在SimpleThreadPool中有三個list:workers-存放池中所有的線程引用,availWorkers-存放所有空閑的線程,busyWorkers-存放所有工作中的線程;
線程池的配置參數如下所示:1 2 3
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount=3 org.quartz.threadPool.threadPriority=5
- 1
- 2
-
JobStore:分為存儲在內存的RAMJobStore和存儲在數據庫的JobStoreSupport(包括JobStoreTX和JobStoreCMT兩種實現,JobStoreCMT是依賴於容器來進行事務的管理,而JobStoreTX是自己管理事務),若要使用集群要使用JobStoreSupport的方式;
- QuartzSchedulerThread:用來進行任務調度的線程,在初始化的時候paused=true,halted=false,雖然線程開始運行了,但是paused=true,線程會一直等待,直到start方法將paused置為false;
另外,SchedulerFactoryBean還實現了SmartLifeCycle接口,因此初始化完成后,會執行start()方法,該方法將主要會執行以下的幾個動作:
- 創建ClusterManager線程並啟動線程:該線程用來進行集群故障檢測和處理,將在下文詳細討論;
- 創建MisfireHandler線程並啟動線程:該線程用來進行misfire任務的處理,將在下文詳細討論;
- 置QuartzSchedulerThread的paused=false,調度線程才真正開始調度;
三、Spring + Quartz 實現企業級調度的實現示例
3.1 環境信息
此示例中的環境: Spring 4.1.6.RELEASE + quartz 2.2.1 + Mysql 5.6
3.2 相關代碼及配置
3.2.1 Maven 引入
3.2.2 數據庫腳本准備
SET FOREIGN_KEY_CHECKS=0;
– —————————-
– Table structure for task_schedule_job
– —————————-
DROP TABLE IF EXISTS `task_schedule_job`;
CREATE TABLE `task_schedule_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT,
`create_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`job_name` varchar(255) DEFAULT NULL,
`job_group` varchar(255) DEFAULT NULL,
`job_status` varchar(255) DEFAULT NULL,
`cron_expression` varchar(255) NOT NULL,
`description` varchar(255) DEFAULT NULL,
`bean_class` varchar(255) DEFAULT NULL,
`is_concurrent` varchar(255) DEFAULT NULL COMMENT ‘1’,
`spring_id` varchar(255) DEFAULT NULL,
`method_name` varchar(255) NOT NULL
PRIMARY KEY (`job_id`),
UNIQUE KEY `name_group` (`job_name`,`job_group`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
在Quartz包下docs/dbTables,選擇對應的數據庫腳本,創建相應的數據庫表即可,我用的是mysql5.6,這里有一個需要注意的地方,mysql5.5之前用的表存儲引擎是MyISAM,使用的是表級鎖,鎖發生沖突的概率比較高,並發度低;5.6之后默認的存儲引擎為InnoDB,InnoDB采用的鎖機制是行級鎖,並發度也較高。而quartz集群使用數據庫鎖的
機制來來實現同一個任務在同一個時刻只被實例執行,所以為了防止沖突,我們建表的時候要選取InnoDB作為表的存
儲引擎。如下:
3.2.3 關鍵代碼及配置
<1>spring-quartz.xml 配置 在application.xml 文件中引入
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <!-- 注冊本地調度任務 <bean id="localQuartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"></bean>--> <!-- 注冊集群調度任務 --> <bean id="schedulerFactoryBean" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" destroy-method="destroy"> <!--可選,QuartzScheduler 啟動時更新己存在的Job,這樣就不用每次修改targetObject后刪除qrtz_job_details表對應記錄了 --> <property name="overwriteExistingJobs" value="true" /> <!--必須的,QuartzScheduler 延時啟動,應用啟動完后 QuartzScheduler 再啟動 --> <property name="startupDelay" value="3" /> <!-- 設置自動啟動 --> <property name="autoStartup" value="true" /> <property name="applicationContextSchedulerContextKey" value="applicationContext" /> <property name="configLocation" value="classpath:quartz.properties" /> </bean> </beans>
<2>quartz.properties 文件配置
#============================================================== #Configure Main Scheduler Properties #============================================================== org.quartz.scheduler.instanceName = KuanrfGSQuartzScheduler org.quartz.scheduler.instanceId = AUTO #============================================================== #Configure JobStore #============================================================== org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.jobStore.dataSource = myDS org.quartz.jobStore.maxMisfiresToHandleAtATime = 1 org.quartz.jobStore.misfireThreshold = 120000 org.quartz.jobStore.txIsolationLevelSerializable = false #============================================================== #Configure DataSource #============================================================== org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.myDS.URL = 你的數據鏈接 org.quartz.dataSource.myDS.user = 用戶名 org.quartz.dataSource.myDS.password = 密碼 org.quartz.dataSource.myDS.maxConnections = 30 org.quartz.jobStore.selectWithLockSQL = SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE #============================================================== #Configure ThreadPool #============================================================== org.quartz.threadPool.class= org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount= 10 org.quartz.threadPool.threadPriority= 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread= true #============================================================== #Skip Check Update #update:true #not update:false #============================================================== org.quartz.scheduler.skipUpdateCheck = true #============================================================================ # Configure Plugins #============================================================================ org.quartz.plugin.triggHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin org.quartz.plugin.shutdownhook.class = org.quartz.plugins.management.ShutdownHookPlugin org.quartz.plugin.shutdownhook.cleanShutdown = true
<3>關鍵代碼
package com.netease.ad.omp.service.sys; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Set; import javax.annotation.PostConstruct; import javax.annotation.Resource; import com.netease.ad.omp.common.utils.SpringUtils; import com.netease.ad.omp.dao.sys.mapper.ScheduleJobMapper; import com.netease.ad.omp.entity.sys.ScheduleJob; import com.netease.ad.omp.quartz.job.JobUtils; import com.netease.ad.omp.quartz.job.MyDetailQuartzJobBean; import com.netease.ad.omp.quartz.job.QuartzJobFactory; import com.netease.ad.omp.quartz.job.QuartzJobFactoryDisallowConcurrentExecution; import org.apache.log4j.Logger; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; /** * 計划任務管理 */ @Service public class JobTaskService { public final Logger log = Logger.getLogger(this.getClass()); @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Autowired private ScheduleJobMapper scheduleJobMapper; /** * 從數據庫中取 區別於getAllJob * * @return */ public List<ScheduleJob> getAllTask() { return scheduleJobMapper.select(null); } /** * 添加到數據庫中 區別於addJob */ public void addTask(ScheduleJob job) { job.setCreateTime(new Date()); scheduleJobMapper.insertSelective(job); } /** * 從數據庫中查詢job */ public ScheduleJob getTaskById(Long jobId) { return scheduleJobMapper.selectByPrimaryKey(jobId); } /** * 更改任務狀態 * * @throws SchedulerException */ public void changeStatus(Long jobId, String cmd) throws SchedulerException { ScheduleJob job = getTaskById(jobId); if (job == null) { return; } if ("stop".equals(cmd)) { deleteJob(job); job.setJobStatus(JobUtils.STATUS_NOT_RUNNING); } else if ("start".equals(cmd)) { job.setJobStatus(JobUtils.STATUS_RUNNING); addJob(job); } scheduleJobMapper.updateByPrimaryKeySelective(job); } /** * 更改任務 cron表達式 * * @throws SchedulerException */ public void updateCron(Long jobId, String cron) throws SchedulerException { ScheduleJob job = getTaskById(jobId); if (job == null) { return; } job.setCronExpression(cron); if (JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) { updateJobCron(job); } scheduleJobMapper.updateByPrimaryKeySelective(job); } /** * 添加任務 * * @throws SchedulerException */ public void addJob(ScheduleJob job) throws SchedulerException { if (job == null || !JobUtils.STATUS_RUNNING.equals(job.getJobStatus())) { return; } Scheduler scheduler = schedulerFactoryBean.getScheduler(); log.debug(scheduler + ".......................................................................................add"); TriggerKey triggerKey = TriggerKey.triggerKey(job.getJobName(), job.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); // 不存在,創建一個 if (null == trigger) { Class clazz = JobUtils.CONCURRENT_IS.equals(job.getIsConcurrent()) ? QuartzJobFactory.class : QuartzJobFactoryDisallowConcurrentExecution.class; JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(job.getJobName(), job.getJobGroup()).build(); jobDetail.getJobDataMap().put("scheduleJob", job); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); trigger = TriggerBuilder.newTrigger().withIdentity(job.getJobName(), job.getJobGroup()).withSchedule(scheduleBuilder).build(); scheduler.scheduleJob(jobDetail, trigger); } else { // Trigger已存在,那么更新相應的定時設置 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); // 按新的cronExpression表達式重新構建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); // 按新的trigger重新設置job執行 scheduler.rescheduleJob(triggerKey, trigger); } } @PostConstruct public void init() throws Exception { // 這里獲取任務信息數據 List<ScheduleJob> jobList = scheduleJobMapper.select(null); for (ScheduleJob job : jobList) { addJob(job); } } /** * 獲取所有計划中的任務列表 * * @return * @throws SchedulerException */ public List<ScheduleJob> getAllJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup(); Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(); for (JobKey jobKey : jobKeys) { List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { ScheduleJob job = new ScheduleJob(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } } return jobList; } /** * 所有正在運行的job * * @return * @throws SchedulerException */ public List<ScheduleJob> getRunningJob() throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs(); List<ScheduleJob> jobList = new ArrayList<ScheduleJob>(executingJobs.size()); for (JobExecutionContext executingJob : executingJobs) { ScheduleJob job = new ScheduleJob(); JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); job.setJobName(jobKey.getName()); job.setJobGroup(jobKey.getGroup()); job.setDescription("觸發器:" + trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); job.setJobStatus(triggerState.name()); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); job.setCronExpression(cronExpression); } jobList.add(job); } return jobList; } /** * 暫停一個job * * @param scheduleJob * @throws SchedulerException */ public void pauseJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.pauseJob(jobKey); } /** * 恢復一個job * * @param scheduleJob * @throws SchedulerException */ public void resumeJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.resumeJob(jobKey); } /** * 刪除一個job * * @param scheduleJob * @throws SchedulerException */ public void deleteJob(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.deleteJob(jobKey); } /** * 立即執行job * * @param scheduleJob * @throws SchedulerException */ public void runAJobNow(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); JobKey jobKey = JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); scheduler.triggerJob(jobKey); } /** * 更新job時間表達式 * * @param scheduleJob * @throws SchedulerException */ public void updateJobCron(ScheduleJob scheduleJob) throws SchedulerException { Scheduler scheduler = schedulerFactoryBean.getScheduler(); TriggerKey triggerKey = TriggerKey.triggerKey(scheduleJob.getJobName(), scheduleJob.getJobGroup()); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()); trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); scheduler.rescheduleJob(triggerKey, trigger); } public static void main(String[] args) { CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("xxxxx"); } }
package com.netease.ad.omp.quartz.job; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.netease.ad.omp.common.utils.SpringUtils; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.quartz.JobExecutionContext; import org.springframework.context.ApplicationContext; /** * Created with IntelliJ IDEA * ProjectName: omp * Author: bjsonghongxu * CreateTime : 15:58 * Email: bjsonghongxu@crop.netease.com * Class Description: * 定時任務工具類 */ public class JobUtils { public final static Logger log = Logger.getLogger(JobUtils.class); public static final String STATUS_RUNNING = "1"; //啟動狀態 public static final String STATUS_NOT_RUNNING = "0"; //未啟動狀態 public static final String CONCURRENT_IS = "1"; public static final String CONCURRENT_NOT = "0"; private ApplicationContext ctx; /** * 通過反射調用scheduleJob中定義的方法 * * @param scheduleJob */ public static void invokMethod(ScheduleJob scheduleJob,JobExecutionContext context) { Object object = null; Class clazz = null; if (StringUtils.isNotBlank(scheduleJob.getSpringId())) { object = SpringUtils.getBean(scheduleJob.getSpringId()); } else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) { try { clazz = Class.forName(scheduleJob.getBeanClass()); object = clazz.newInstance(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (object == null) { log.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啟動成功,請檢查是否配置正確!!!"); return; } clazz = object.getClass(); Method method = null; try { method = clazz.getMethod(scheduleJob.getMethodName(), new Class[] {JobExecutionContext.class}); } catch (NoSuchMethodException e) { log.error("任務名稱 = [" + scheduleJob.getJobName() + "]---------------未啟動成功,方法名設置錯誤!!!"); } catch (SecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (method != null) { try { method.invoke(object, new Object[] {context}); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalArgumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } log.info("任務名稱 = [" + scheduleJob.getJobName() + "]----------啟動成功"); } }
package com.netease.ad.omp.quartz.job; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.log4j.Logger; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * * @Description: 計划任務執行處 無狀態 * Spring調度任務 (重寫 quartz 的 QuartzJobBean 類原因是在使用 quartz+spring 把 quartz 的 task 實例化進入數據庫時,會產生: serializable 的錯誤) */ public class QuartzJobFactory implements Job { public final Logger log = Logger.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob"); JobUtils.invokMethod(scheduleJob,context); } }
package com.netease.ad.omp.quartz.job; import com.netease.ad.omp.entity.sys.ScheduleJob; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * * @Description: 若一個方法一次執行不完下次輪轉時則等待該方法執行完后才執行下一次操作 * Spring調度任務 (重寫 quartz 的 QuartzJobBean 類原因是在使用 quartz+spring 把 quartz 的 task 實例化進入數據庫時,會產生: serializable 的錯誤) */ @DisallowConcurrentExecution public class QuartzJobFactoryDisallowConcurrentExecution implements Job { public final Logger log = Logger.getLogger(this.getClass()); @Override public void execute(JobExecutionContext context) throws JobExecutionException { ScheduleJob scheduleJob = (ScheduleJob) context.getMergedJobDataMap().get("scheduleJob"); JobUtils.invokMethod(scheduleJob,context); } }
package com.netease.ad.omp.entity.sys; import javax.persistence.Id; import javax.persistence.Table; import java.io.Serializable; import java.util.Date; /** * Created with IntelliJ IDEA * ProjectName: omp * Author: bjsonghongxu * CreateTime : 15:48 * Email: bjsonghongxu@crop.netease.com * Class Description: * 計划任務信息 */ @Table(name = "task_schedule_job") public class ScheduleJob implements Serializable { @Id private Long jobId; private Date createTime; private Date updateTime; /** * 任務名稱 */ private String jobName; /** * 任務分組 */ private String jobGroup; /** * 任務狀態 是否啟動任務 */ private String jobStatus; /** * cron表達式 */ private String cronExpression; /** * 描述 */ private String description; /** * 任務執行時調用哪個類的方法 包名+類名 */ private String beanClass; /** * 任務是否有狀態 */ private String isConcurrent; /** * spring bean */ private String springId; /** * 任務調用的方法名 */ private String methodName; public Long getJobId() { return jobId; } public void setJobId(Long jobId) { this.jobId = jobId; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } public String getJobGroup() { return jobGroup; } public void setJobGroup(String jobGroup) { this.jobGroup = jobGroup; } public String getJobStatus() { return jobStatus; } public void setJobStatus(String jobStatus) { this.jobStatus = jobStatus; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getBeanClass() { return beanClass; } public void setBeanClass(String beanClass) { this.beanClass = beanClass; } public String getIsConcurrent() { return isConcurrent; } public void setIsConcurrent(String isConcurrent) { this.isConcurrent = isConcurrent; } public String getSpringId() { return springId; } public void setSpringId(String springId) { this.springId = springId; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } }
package com.netease.ad.omp.common.utils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; public final class SpringUtils implements BeanFactoryPostProcessor { private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境 @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } /** * 獲取對象 * * @param name * @return Object 一個以所給名字注冊的bean的實例 * @throws BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 獲取類型為requiredType的對象 * * @param clz * @return * @throws BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { @SuppressWarnings("unchecked") T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 * 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注冊對象的類型 * @throws NoSuchBeanDefinitionException * */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果給定的bean名字在bean定義中有別名,則返回這些別名 * * @param name * @return * @throws NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } }
至於前端自己畫個簡單的界面即可使用了。
四、問題及解決方案
4.1quartz mysql 死鎖問題
quartz文檔提到,如果在集群環境下,最好將配置項org.quartz.jobStore.txIsolationLevelSerializable設置為true
問題:
這個選項在mysql下會非常容易出現死鎖問題。
2014-12-29 09:55:28.006 [QuartzScheduler_clusterQuartzSchedular-BJ-YQ-64.2491419487774923_ClusterManager] ERROR o.q.impl.jdbcjobstore.JobStoreTX [U][] - ClusterManager: Error managing cluster: Failure updating scheduler state when checking-in: Deadlock found when trying to get lock; try restarting transaction
這個選項存在意義:
quartz需要提升隔離級別來保障自己的運作,不過,由於各數據庫實現的隔離級別定義都不一樣,所以quartz提供一個設置序列化這樣的隔離級別存在,因為例如oracle中是沒有未提交讀和可重復讀這樣的隔離級別存在。但是由於mysql默認的是可重復讀,比提交讀高了一個級別,所以已經可以滿足quartz集群的正常運行。
五、相關知識
5.1、QuartzSchedulerThread線程
線程的主要邏輯代碼如下:
while (!halted.get()) { int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow()); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } List<TriggerFiredResult> bndle = qsRsrcs.getJobStore().triggersFired(triggers); for(int i = 0;i < res.size();i++){ JobRunShell shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); qsRsrcs.getThreadPool().runInThread(shell); } }
- 先獲取線程池中的可用線程數量(若沒有可用的會阻塞,直到有可用的);
- 獲取30m內要執行的trigger(即acquireNextTriggers):
獲取trigger的鎖,通過select …for update方式實現;獲取30m內(可配置)要執行的triggers(需要保證集群節點的時間一致),若@ConcurrentExectionDisallowed且列表存在該條trigger則跳過,否則更新trigger狀態為ACQUIRED(剛開始為WAITING);插入firedTrigger表,狀態為ACQUIRED;(注意:在RAMJobStore中,有個timeTriggers,排序方式是按觸發時間nextFireTime排的;JobStoreSupport從數據庫取出triggers時是按照nextFireTime排序); - 等待直到獲取的trigger中最先執行的trigger在2ms內;
- triggersFired:
1)更新firedTrigger的status=EXECUTING;
2)更新trigger下一次觸發的時間;
3)更新trigger的狀態:無狀態的trigger->WAITING,有狀態的trigger->BLOCKED,若nextFireTime==null ->COMPLETE;
4) commit connection,釋放鎖; - 針對每個要執行的trigger,創建JobRunShell,並放入線程池執行:
1)execute:執行job
2)獲取TRIGGER_ACCESS鎖
3)若是有狀態的job:更新trigger狀態:BLOCKED->WAITING,PAUSED_BLOCKED->BLOCKED
4)若@PersistJobDataAfterExecution,則updateJobData
5)刪除firedTrigger
6)commit connection,釋放鎖
線程執行流程如下圖所示:QuartzSchedulerThread時序圖
任務調度執行過程中,trigger的狀態變化如下圖所示:該圖來自參考文獻5
5.2.misfireHandler線程
下面這些原因可能造成 misfired job:
- 系統因為某些原因被重啟。在系統關閉到重新啟動之間的一段時間里,可能有些任務會被 misfire;
- Trigger 被暫停(suspend)的一段時間里,有些任務可能會被 misfire;
- 線程池中所有線程都被占用,導致任務無法被觸發執行,造成 misfire;
- 有狀態任務在下次觸發時間到達時,上次執行還沒有結束;為了處理 misfired job,Quartz 中為 trigger 定義了處理策略,主要有下面兩種:MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:針對 misfired job 馬上執行一次;MISFIRE_INSTRUCTION_DO_NOTHING:忽略 misfired job,等待下次觸發;默認是MISFIRE_INSTRUCTION_SMART_POLICY,該策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW線程默認1分鍾執行一次;在一個事務中,默認一次最多recovery 20個;
執行流程:
- 若配置(默認為true,可配置)成獲取鎖前先檢查是否有需要recovery的trigger,先獲取misfireCount;
- 獲取TRIGGER_ACCESS鎖;
- hasMisfiredTriggersInState:獲取misfired的trigger,默認一個事務里只能最大20個misfired trigger(可配置),misfired判斷依據:status=waiting,next_fire_time < current_time-misfirethreshold(可配置,默認1min)
- notifyTriggerListenersMisfired
- updateAfterMisfire:獲取misfire策略(默認是MISFIRE_INSTRUCTION_SMART_POLICY,該策略在CronTrigger中=MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根據策略更新nextFireTime;
- 將nextFireTime等更新到trigger表;
- commit connection,釋放鎖8.如果還有更多的misfired,sleep短暫時間(為了集群負載均衡),否則sleep misfirethreshold時間,后繼續輪詢;
misfireHandler線程執行流程如下圖所示:misfireHandler線程時序圖
5.3.clusterManager線程
初始化:
failedInstance=failed+self+firedTrigger表中的schedulerName在scheduler_state表中找不到的(孤兒)
線程執行:
每個服務器會定時(org.quartz.jobStore.clusterCheckinInterval這個時間)更新SCHEDULER_STATE表的LAST_CHECKIN_TIME,若這個字段遠遠超出了該更新的時間,則認為該服務器實例掛了;
注意:每個服務器實例有唯一的id,若配置為AUTO,則為hostname+current_time
線程執行的具體流程:
- 檢查是否有超時的實例failedInstances;
- 更新該服務器實例的LAST_CHECKIN_TIME;
若有超時的實例: - 獲取STATE_ACCESS鎖;
- 獲取超時的實例failedInstances;
- 獲取TRIGGER_ACCESS鎖;
- clusterRecover:
- 針對每個failedInstances,通過instanceId獲取每個實例的firedTriggers;
- 針對每個firedTrigger:
1) 更新trigger狀態:
BLOCKED->WAITING
PAUSED_BLOCKED->PAUSED
ACQUIRED->WAITING
2) 若firedTrigger不是ACQUIRED狀態(在執行狀態),且jobRequestRecovery=true:
創建一個SimpleTrigger,存儲到trigger表,status=waiting,MISFIRE_INSTR=MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY.
3) 刪除firedTrigger
5.4源碼分析鎖
目前代碼中行鎖只用到了STATE_ACCESS 和TRIGGER_ACCESS 這兩種。
1、TRIGGER_ACCESS
先了解一篇文章,通過源碼來分析quartz是如何通過加鎖來實現集群環境,觸發器狀態的一致性。
http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
可以看到觸發器的操作主要用主線程StdScheduleThread來完成,不管是獲取需要觸發的30S內的觸發器,還是觸發過程。select和update觸發器表時
都會先加鎖,后解鎖。如果數據庫資源競爭比較大的話,鎖會影響整個性能。可以考慮將任務信息放在分布式內存,如redis上進行處理。數據庫只是定時從redis上load數據下來做統計。
實現都在JobStoreSupport類
加鎖類型 | 加鎖方法 | 底層數據庫操作 | 備注 |
executeInNonManagedTXLock | acquireNextTrigger | selectTriggerToAcquire selectTrigger selectJobDetail insertFiredTrigger |
查詢需要點火的trigger 選擇需要執行的trigger加入到fired_trigger表 |
for執行 triggerFired | selectJobDetail selectCalendar updateFiredTrigger triggerExists updateTrigger |
點火trigger 修改trigger狀態為可執行狀態。 |
|
recoverJobs | updateTriggerStatesFromOtherStates hasMisfiredTriggersInState doUpdateOfMisfiredTrigger selectTriggersForRecoveringJobs selectTriggersInState deleteFiredTriggers |
非集群環境下重新執行 failed與misfired的trigger |
|
retryExecuteInNonManagedTXLock | releaseAcquiredTrigger | updateTriggerStateFromOtherState deleteFiredTrigger |
異常情況下重新釋放trigger到初使狀態。 |
triggeredJobComplete | selectTriggerStatus removeTrigger updateTriggerState deleteFiredTrigger |
觸發JOB任務完成后的處理。 | |
obtainLock | recoverMisfiredJobs | hasMisfiredTriggersInState doUpdateOfMisfiredTrigger | 重新執行misfired的trigger 可以在啟動時執行,也可以由misfired線程定期執行。 |
clusterRecover | selectInstancesFiredTriggerRecords updateTriggerStatesForJobFromOtherState storeTrigger deleteFiredTriggers selectFiredTriggerRecords removeTrigger deleteSchedulerState |
集群有結點faied,讓JOB能重新執行。 | |
executeInLock 數據庫集群里等同於 executeInNonManagedTXLock |
storeJobAndTrigger | updateJobDetail insertJobDetail triggerExists selectJobDetail updateTrigger insertTrigger |
保存JOB和TRIGGER配置 |
storeJob | 保存JOB | ||
removeJob | 刪除JOB | ||
removeJobs | 批量刪除JOB | ||
removeTriggers | 批量刪除triggers | ||
storeJobsAndTriggers | 保存JOB和多個trigger配置 | ||
removeTrigger | 刪除trigger | ||
replaceTrigger | 替換trigger | ||
storeCalendar | 保存定時日期 | ||
removeCalendar | 刪除定時日期 | ||
clearAllSchedulingData | 清除所有定時數據 | ||
pauseTrigger | 停止觸發器 | ||
pauseJob | 停止任務 | ||
pauseJobs | 批量停止任務 | ||
resumeTrigger | 恢復觸發器 | ||
resumeJob | 恢復任務 | ||
resumeJobs | 批量恢復任務 | ||
pauseTriggers | 批量停止觸發器 | ||
resumeTriggers | 批量恢復觸發器 | ||
pauseAll | 停止所有 | ||
resumeAll | 恢復所有 |
2、STATE_TRIGGER
實現都在JobStoreSupport類
加鎖類型 | 加鎖方法 | 底層數據庫操作 | 備注 |
obtainLock | doCheckin | clusterCheckIn | 判斷集群狀態 先用LOCK_STATE_ACCESS鎖集群狀態 再用LOCK_TRIGGER_ACCESS恢復集群運行 |
六、參考資料
- Quartz Documentation http://quartz-scheduler.org/documentation
- spring javadoc-api http://docs.spring.io/spring/docs/4.3.0.BUILD-SNAPSHOT/javadoc-api/
- 基於Quartz開發企業級任務調度應用 https://www.ibm.com/developerworks/cn/opensource/os-cn-quartz/
- quartz應用與集群原理分析 http://tech.meituan.com/mt-crm-quartz.html
- quartz詳解2:quartz由淺入深 http://ecmcug.itpub.net/11627468/viewspace-1763498/
- quartz詳解4:quartz線程管理 http://blog.itpub.net/11627468/viewspace-1766967/
- quartz學習筆記 http://www.cnblogs.com/yunxuange/archive/2012/08/28/2660141.html
- quartz集群調度機制調研及源碼分析 http://demo.netfoucs.com/gklifg/article/details/27090179
總結
通過這段時間對quartz資料的整理和結合工作中的運用,深入理解了quartz這一優秀的調度框架。在技術這條路上,做技術千萬不要淺嘗輒止,一定要深入的去理解所用的東西,才會使自己的能力提升,此外,一些系統的知識分析對自己和他人都是十分有益的。