title: quartz2.x源碼分析——啟動過程
date: 2017-04-13 14:59:01
categories: quartz
tags: [quartz, 源碼分析]
先簡單介紹一下quartz,Quartz是一個功能豐富的開源作業調度庫,可以集成到幾乎任何Java應用程序中 - 從最小的獨立應用程序到最大的電子商務系統。quartz可用於創建執行數十,數百甚至數十萬個作業的簡單或復雜的計划; 任務定義為標准Java組件的任務,可以執行任何可以對其進行編程的任何內容。Quartz Scheduler包含許多企業級功能,例如支持JTA事務和集群。
以上內容來自quartz官網。
我們會針對quartz的幾個主要類進行分析,看一下quartz是如何實現定時調度功能
1.1測試Demo
來自官方實例的example1
public class SimpleExample {
public void run() throws Exception {
Logger log = LoggerFactory.getLogger(SimpleExample.class);
log.info("------- Initializing ----------------------");
// First we must get a reference to a scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
log.info("------- Initialization Complete -----------");
// computer a time that is on the next round minute
Date runTime = evenMinuteDate(new Date());
log.info("------- Scheduling Job -------------------");
// define the job and tie it to our HelloJob class
JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();
// Trigger the job to run on the next round minute
Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
// Tell quartz to schedule the job using our trigger
sched.scheduleJob(job, trigger);
log.info(job.getKey() + " will run at: " + runTime);
// Start up the scheduler (nothing can actually run until the
// scheduler has been started)
sched.start();
log.info("------- Started Scheduler -----------------");
// wait long enough so that the scheduler as an opportunity to
// run the job!
log.info("------- Waiting 65 seconds... -------------");
try {
// wait 65 seconds to show job
Thread.sleep(65L * 1000L);
// executing...
} catch (Exception e) {
//
}
// shut down the scheduler
log.info("------- Shutting Down ---------------------");
sched.shutdown(true);
log.info("------- Shutdown Complete -----------------");
}
public static void main(String[] args) throws Exception {
SimpleExample example = new SimpleExample();
example.run();
}
}
public class HelloJob implements Job {
private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
public HelloJob() {
}
public void execute(JobExecutionContext context)
throws JobExecutionException {
_log.info("Hello World! - " + new Date());
}
}
1.2 quartz關鍵API
從上述example中我們可以看到quartz主要接口和類
- Scheduler - 進行作業調度的主要接口.
- Job - 作業接口,編寫自己的作業需要實現,如example中的HelloJob
- JobDetail - 作業的詳細信息,除了包含作業本身,還包含一些額外的數據。
- Trigger - 作業計划的組件-作業何時執行,執行次數,頻率等。
- JobBuilder - 建造者模式創建 JobDetail實例.
- TriggerBuilder - 建造者模式創建 Trigger 實例.
- QuartzSchedulerThread 繼承Thread 主要的執行任務線程
從上面的幾個接口,可以看到quartz設計非常精妙,將作業和觸發器分開設計,同時調度器完成對作業的調度。
了解了幾個關鍵類和接口作用,下面我們來分析整個執行過程。
1.3 執行過程分析
1.3.1 從StdSchedulerFactory獲取scheduler
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
sched = instantiate();
return sched;
}
- cfg變量為PropertiesParser實例————是quartz的配置信息(主要是quartz.properties),如果為空,就初始化讀取quartz的配置信息。
- SchedulerRepository是一個HashMap,用於存儲Scheduler。如果有重名的,判斷是否已經停止,是從hashMap刪掉,否直接返回已保存實例
- SchedulerRepository未找到,實例化一個scheduler
private Scheduler instantiate() throws SchedulerException {
if (cfg == null) {
initialize();
}
if (initException != null) {
throw initException;
}
.....
.....
SchedulerRepository schedRep = SchedulerRepository.getInstance();
// Get Scheduler Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String schedName = cfg.getStringProperty(PROP_SCHED_INSTANCE_NAME,
"QuartzScheduler");
String threadName = cfg.getStringProperty(PROP_SCHED_THREAD_NAME,
schedName + "_QuartzSchedulerThread");
.....
.....
String managementRESTServiceHostAndPort = cfg.getStringProperty(MANAGEMENT_REST_SERVICE_HOST_PORT, "0.0.0.0:9889");
Properties schedCtxtProps = cfg.getPropertyGroup(PROP_SCHED_CONTEXT_PREFIX, true);
// If Proxying to remote scheduler, short-circuit here...
// ~~~~~~~~~~~~~~~~~~
if (rmiProxy) {
....
....
schedRep.bind(remoteScheduler);
return remoteScheduler;
}
// Create class load helper
ClassLoadHelper loadHelper = null;
try {
loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass)
.newInstance();
} catch (Exception e) {
throw new SchedulerConfigException(
"Unable to instantiate class load helper class: "
+ e.getMessage(), e);
}
loadHelper.initialize();
// If Proxying to remote JMX scheduler, short-circuit here...
// ~~~~~~~~~~~~~~~~~~
if (jmxProxy) {
if (autoId) {
schedInstId = DEFAULT_INSTANCE_ID;
}
....
....
jmxScheduler.initialize();
schedRep.bind(jmxScheduler);
return jmxScheduler;
}
JobFactory jobFactory = null;
if(jobFactoryClass != null) {
....
....
}
InstanceIdGenerator instanceIdGenerator = null;
if(instanceIdGeneratorClass != null) {
.....
.....
}
// Get ThreadPool Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
....
....
// Get JobStore Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());
if (jsClass == null) {
}
....
....
....
// Set up any DataSources
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
for (int i = 0; i < dsNames.length; i++) {
PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));
String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);
// custom connectionProvider...
....
....
....
}
// Set up any SchedulerPlugins
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
for (int i = 0; i < pluginNames.length; i++) {
Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
+ pluginNames[i], true);
....
....
plugins[i] = plugin;
}
// Set up any JobListeners
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Class<?>[] strArg = new Class[] { String.class };
String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
JobListener[] jobListeners = new JobListener[jobListenerNames.length];
for (int i = 0; i < jobListenerNames.length; i++) {
Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
+ jobListenerNames[i], true);
.....
.....
jobListeners[i] = listener;
}
// Set up any TriggerListeners
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
for (int i = 0; i < triggerListenerNames.length; i++) {
Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
+ triggerListenerNames[i], true);
String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);
.....
.....
.....
triggerListeners[i] = listener;
}
boolean tpInited = false;
boolean qsInited = false;
// Get ThreadExecutor Properties
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
if (threadExecutorClass != null) {
tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
.....
.....
}
// Fire everything up
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
try {
JobRunShellFactory jrsf = null; // Create correct run-shell factory...
......
......
schedRep.bind(scheduler);
return scheduler;
}
catch(SchedulerException e) {
shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
throw e;
}
catch(RuntimeException re) {
shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
throw re;
}
catch(Error re) {
shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
throw re;
}
}
instantiate()是一個比較重要的方法,主要從上一段代碼介紹的PropertiesParser獲取Scheduler配置信息
- 獲取Scheduler Properties
- 如果是rmi代理sheduler,則創建RemoteScheduler,並通過scheRep.bind放入SchedulerRepository中,返回scheduler,結束
- 創建class load helper 加載類提供幫助
- 如果是jmx scheduler 則進行對應操作,並通過scheRep.bind放入SchedulerRepository中,返回scheduler,結束
往下是本地調度,不是遠程調度,因此需要獲取和本地調度相關的信息
6.獲取線程池配置
7.獲取Job存儲設置,(分為內存存儲和數據庫存儲作業還有一個不太了解的方式,默認是內存存儲)
8.設置數據庫連接池
9.安裝Scheduler插件
10.安裝JobListener ,監聽作業啟動前,作業唄否決執行,作業已經執行,參見[JobListener.class]
11.安裝TriggerListener,監聽觸發器被觸發工作將要執行時,觸發錯過,否決工作執行,工作完成,參見[TriggerListener.class]
12.獲取ThreadExecutor配置
13.初始化JobRunShellFactory,這個工廠類很重要,后面會介紹
14.創建標准StdScheduler,返回。
1.3.2 創建JobDetail
JobDeatil包含jobDataMap和jobClass,以及一些描述,名稱等等,采用Build模式建造,jobDataMap主要存儲一些額外信息。
1.3.3 創建Trigger
Trigger主要包含了一些計划信息,詳細可參考接口
准備工作做完了,開始進行調度
1.3.3 scheduler.scheduleJob()
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException {
return sched.scheduleJob(jobDetail, trigger);
}
sched是StdScheduler的一個成員,是QuartzScheduler的實例,在上面的instantiate()方法中實例化過並且用它構造StdScheduler;看一下QuartzScheduler的scheduleJob
public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
validateState();
if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null");
}
if (trigger == null) {
throw new SchedulerException("Trigger cannot be null");
}
if (jobDetail.getKey() == null) {
throw new SchedulerException("Job's key cannot be null");
}
if (jobDetail.getJobClass() == null) {
throw new SchedulerException("Job's class cannot be null");
}
OperableTrigger trig = (OperableTrigger)trigger;
if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException(
"Trigger does not reference given job!");
}
trig.validate();
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
return ft;
}
讓我們來看一下schedule()方法
- 一些校驗,jobDetail,trigger,jobClass判空
- 計算第一次執行的日期,計算日期為空,則拋出異常,job永遠不會被調度
- 如果上述通過,通過jobStore存儲jobDetail和trigger
- 通知監聽器程序:工作加入通知、
public void notifySchedulerListenersJobAdded(JobDetail jobDetail) {
// build a list of all scheduler listeners that are to be notified...
List<SchedulerListener> schedListeners = buildSchedulerListenerList();
// notify all scheduler listeners
for(SchedulerListener sl: schedListeners) {
try {
sl.jobAdded(jobDetail);
} catch (Exception e) {
getLog().error(
"Error while notifying SchedulerListener of JobAdded.",
e);
}
}
}
5.通知監聽器程序:通知正在休眠(工作可能都執行完,主線程sigLock.wait())的主執行線程,有工作加入,喚醒主線程
protected void notifySchedulerThread(long candidateNewNextFireTime) {
if (isSignalOnSchedulingChange()) {
signaler.signalSchedulingChange(candidateNewNextFireTime);
}
}
public void signalSchedulingChange(long candidateNewNextFireTime) {
synchronized(sigLock) {
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}
6.通知調度器job被調度了
public void notifySchedulerListenersSchduled(Trigger trigger) {
// build a list of all scheduler listeners that are to be notified...
List<SchedulerListener> schedListeners = buildSchedulerListenerList();
// notify all scheduler listeners
for(SchedulerListener sl: schedListeners) {
try {
sl.jobScheduled(trigger);
} catch (Exception e) {
getLog().error(
"Error while notifying SchedulerListener of scheduled job."
+ " Triger=" + trigger.getKey(), e);
}
}
}
此時scheduler.scheduleJob()執行完畢
1.3.4 scheduler.start() 未執行此函數,沒有什么會真正執行,主線程loop循環一直被wait。
public void start() throws SchedulerException {
sched.start();
}
//sched.start()
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
- 判斷調度器是否關掉或停止(因為可能開始之前已經將調度器停止或關閉),是則直接拋異常
- 通知監聽器,馬上就要執行
- 啟動一些通知jobStore和一些插件開始執行,如果initial不為空,說明曾經啟動過,則重新恢復
- 切換線程開始執行togglePause(false)。此時開始執行, 下面的run()方法選取了一部分,未設置開始之前,while里面還有一個while (paused && !halted.get()) 一直等待變量改變,否則,就一直wait這也就是sche.scheduleJob()未真正啟動工作的原因
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
}
5.通知任務已經開始執行
public void notifySchedulerListenersStarted() {
// build a list of all scheduler listeners that are to be notified...
List<SchedulerListener> schedListeners = buildSchedulerListenerList();
// notify all scheduler listeners
for(SchedulerListener sl: schedListeners) {
try {
sl.schedulerStarted();
} catch (Exception e) {
getLog().error(
"Error while notifying SchedulerListener of startup.",
e);
}
}
}
至此,整個流程大約完成了。
具體執行的(run)細節看看有空再講一下。