最近項目中要用到作業調度的功能,很自然想到大名鼎鼎的Quartz。但實際用的時候碰到一個很蛋疼的問題,自己定義的作業始終觸發不了,而且日志上也沒有異常拋出來。雖然最終問題解決了,而且問題的原因很操蛋,但還是把過程中自己對Quartz的一點點拙見理解寫下來,一來方便以后復習,而來本着分享精神。
Quartz中真正干活的幾個類如下:
QuartzScheduler 任務調度器(內部使用)
TreadPool Quartz線程池,干活的線程就是從這里分配出去並管理的。
因為比較懶,這里直接使用默認的SimpleTreadPool
QuartzSchedulerThread Quartz主線程。就是他負責找到需要出發的作業,並交給TreadPool執行
JobStore 貯存器,負責提供JobDetail和Trigger。
同樣因為比較懶,這里直接使用的RAMStore
Trigger 觸發器父類,負責控制作業的出發時間。這里使用的是CronTrigger
SchedulerFactoryBean Spring與Quartz的一個連接類,負責Quartz的初始化和啟動工作。
該類實現了InitializingBean,SmartLifecycle接口。所以初始化和啟動是由Spring負責調用的。
Quartz的原理大致如下:
IOC容器初始化時(我是用Spring與Quartz結合的)會創建並初始化Quartz線程池(TreadPool),並啟動它。剛啟動時線程池中每個線程都處於等待狀態,等待外界給他分配Runnable(持有作業對象的線程)。
然后會初始化並啟動Quartz的主線程(QuartzSchedulerThread),該線程自啟動后就會等待外界的信號量開始工作。外界給出工作信號量之后,該主線程的run方法才實質上開始工作。run中會獲取JobStore中下一次要觸發的作業,拿到之后會一直等待到該作業的真正觸發時間,然后將該作業包裝成一個JobRunShell對象(該對象實現了Runnable接口,其實看是上面TreadPool中等待外界分配給他的Runnable),然后將剛創建的JobRunShell交給線程池,由線程池負責執行作業。
線程池收到Runnable后,從線程池一個線程啟動Runnable,然后將該線程回收至空閑線程中。
JobRunShell對象的run方法就是最終通過反射調用作業的地方。
源碼分析過程大致如下(我只看了上面的幾個類,具體的類似配置文件讀取、Listener實現之類的就沒怎么看了。有興趣的可以自己看下):
因為我這里的Quartz是與Spring結合使用的,所以初始化的入口是SchedulerFactoryBean。該類實現了Spring的InitializingBean接口,所以IOC容器初始化完成后會調用afterPropertiesSet方法。Quartz的初始化也是在這里完成的。又因為該類實現了Spring的SmartLifecycle接口,所以真正啟動主線程的start方法也是由Spring調用的。
public void afterPropertiesSet() throws Exception {
/**
* schedulerFactoryClass默認是StdSchedulerFactory,initSchedulerFactory方法沒有仔細看,應該是讀取配置信息
*/
SchedulerFactory schedulerFactory = (SchedulerFactory)BeanUtils.instantiateClass(this.schedulerFactoryClass);
initSchedulerFactory(schedulerFactory);
。。。
// 所有的工作都是在createScheduler方法中做的:創建線程池、創建並啟動主線程。
// 但這里創建的主線程並沒有實質上的開始工作,他要等待外界的信號量
try {
this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
populateSchedulerContext();
}
。。。
// registerListeners注冊監聽器,這個方法沒有仔細看過
// registerJobsAndTriggers方法就是讀取配置的作業和他們的觸發器的地方
registerListeners();
registerJobsAndTriggers();
}
跟蹤createScheduler方法(這里返回的Scheduler對象就是最終要返回的Scheduler任務調度者):
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
throws SchedulerException {
。。。
// 這里創建的是StdScheduler,調用方法的自然也是StdSchedulerFactory
Scheduler newScheduler = schedulerFactory.getScheduler();
。。。
}
跟蹤StdSchedulerFactory的getScheduler方法:
public Scheduler getScheduler() throws SchedulerException {
// 比較關鍵的就instantiate方法,其他的就是加載配置信息,判斷緩存里有沒有意見創建過的Scheduler等等
。。。
sched = instantiate();
。。。
}
跟蹤instantiate方法:
這個方法很長很長,我這里指截取其中某寫片段進行說明。
private Scheduler instantiate() throws SchedulerException {
。。。
// 這里就是創建線程池的地方。tpClass是默認是SimpleTreadPool,具體的下面會分析
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, null);
try {
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {。。。}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
。。。
// 這里是創建JobStore的地方,負責保存作業和觸發器。這里是默認的RAMJobStore
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS, RAMJobStore.class.getName());
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {。。。}
。。。
// 這里就是創建Quartz內部調度器和Quartz主線程的地方。主線程會在QuartzScheduler的構造函數中創建並啟動
qs = new QuartzScheduler(rsrcs, schedCtxt, idleWaitTime, dbFailureRetry);
。。。
}
初始化的時候我關心的代碼大概就這么多。下面具體跟蹤看下
先從TreadPool的創建開始,這里創建的是SimpleTreadPool。SimpleTreadPool中持有3個List
private List workers; // 存放池中所有的線程引用
private LinkedList availWorkers = new LinkedList(); // 存放所有空閑的線程
private LinkedList busyWorkers = new LinkedList(); // 存放所有工作中的線程
public void initialize() throws SchedulerConfigException {
。。。
// 如果外界沒有配置,那默認的線程組就是main線程的第一層子線程組
if(isThreadsInheritGroupOfInitializingThread()) {
threadGroup = Thread.currentThread().getThreadGroup();
} else {
// follow the threadGroup tree to the root thread group.
threadGroup = Thread.currentThread().getThreadGroup();
ThreadGroup parent = threadGroup;
while ( !parent.getName().equals("main") ) {
threadGroup = parent;
parent = threadGroup.getParent();
}
threadGroup = new ThreadGroup(parent, schedulerInstanceName + "-SimpleThreadPool");
if (isMakeThreadsDaemons()) {
threadGroup.setDaemon(true);
}
}
// createWorkerThreads方法中會根據配置的池大小創建線程實例。並啟動池中每一個線程
// 這里啟動的線程就是上面說到的等待Runnable(JobRunShell)的線程。
// create the worker threads and start them
Iterator workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = (WorkerThread) workerThreads.next();
wt.start();
availWorkers.add(wt);
}
}
跟蹤createWorkerThreads方法:
// 池中實際的對象是WorkerThread對象。
protected List createWorkerThreads(int count) {
workers = new LinkedList();
for (int i = 1; i<= count; ++i) {
WorkerThread wt = new WorkerThread(this, threadGroup,
getThreadNamePrefix() + "-" + i,
getThreadPriority(),
isMakeThreadsDaemons());
if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
wt.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
workers.add(wt);
}
return workers;
}
跟蹤WorkerThread的run方法:
public void run() {
boolean ran = false;
boolean shouldRun = false;
synchronized(this) {
shouldRun = run;
}
while (shouldRun) {
try {
synchronized(this) {
// 放Runnable為空(外界還沒有給JobRunShell)的時候,這個線程無限等待。
while (runnable == null && run) {
this.wait(500);
}
}
if (runnable != null) {
ran = true;
// 這里就是JobRunShell的run方法,也就是作業最終被調用的地方。
runnable.run();
}
} catch (InterruptedException unblock) {
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {}
} finally {
synchronized(this) {
runnable = null;
}
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
synchronized(this) {
run = false;
}
// 如果只執行一次則執行完成后該對象不放入空閑線程隊列中
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
// 將該對象從工作線程隊列中刪除,並且放入空閑隊列中。這個方法實際上就是線程的回收
makeAvailable(this);
}
}
synchronized(this) {
shouldRun = run;
}
}
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
}
}
線程池的代碼大概就是這樣,下面跟蹤QuartzScheduler的構造函數。這個類會創建Quartz的主線程。
public QuartzScheduler(QuartzSchedulerResources resources,
SchedulingContext ctxt, long idleWaitTime, long dbRetryInterval){
。。。
this.schedThread = new QuartzSchedulerThread(this, resources, ctxt);
。。。
}
QuartzSchedulerThread的構造函數中會將本身自啟動,進入run的等待中。
關注QuartzSchedulerThread的run方法(這個run方法也是很長很長,這里只截取關鍵的部分):
public void run() {
while (!halted.get()) {
try {
synchronized (sigLock) {
// paused 就是等待外界的信號量,
// 需要信號量pausedc=false才能開始工作 QuartzScheduler.start()方法中會設置pausedc=false
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
// 當線程池中有空閑線程時才執行(這里也不是嚴格的,如果配置的沒有空閑線程則創建一個新的)
int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availTreadCount > 0) {
。。。
// 這里會找到下一個要觸發的線程。具體的方法在下面會分析。
trigger = qsRsrcs.getJobStore().acquireNextTrigger(ctxt, now + idleWaitTime);
。。。等待線程到trigger的真正觸發時間。。。
// 創建JobRunShell,要執行的作業就在這里面
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {。。。}
// 這里就是將JobRunShell交給線程池的地方
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {。。。}
。。。
}
}
}
}
跟蹤JobStore的acquireNextTrigger方法(這里是RAMJobStore)
// 實際上RAMJobStore持有一個TreeSet<Trigger> timeTriggers,排序方式是按觸發時間排的。觸發時間越早的排在前面。
// 所以這里只要取timeTriggers的first並驗證就可以了。
public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan) {
TriggerWrapper tw = null;
synchronized (lock) {
while (tw == null) {
try {
tw = (TriggerWrapper) timeTriggers.first();
} catch (java.util.NoSuchElementException nsee) {
return null;
}
if (tw == null) {
return null;
}
if (tw.trigger.getNextFireTime() == null) {
timeTriggers.remove(tw);
tw = null;
continue;
}
timeTriggers.remove(tw);
if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
tw = null;
continue;
}
if(tw.trigger.getNextFireTime().getTime() > noLaterThan) {
timeTriggers.add(tw);
return null;
}
tw.state = TriggerWrapper.STATE_ACQUIRED;
tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
Trigger trig = (Trigger) tw.trigger.clone();
return trig;
}
}
return null;
}
這里還有一點,Trigger是怎么知道自己的觸發時間的。這里使用的是CronTrigger。通過源碼可以知道,Trigger的下次觸發時間是通過getNextFireTime方法得到的。CronTrigger的getNextFireTime方法是通過CronExpression對象的getTimeAfter方法實現的。CronExpression對象就是表示我們配置的觸發表達式的對象。類似這樣:0 0/10 * * * *
計算方法:
CronTrigger.getTimeAfter() 方法內部會調用CronExpression.getTimeAfter()方法。。。。。
利用Calendar類,單獨設置年月日小時分秒的值。
年月日小時分秒都有一個TreeSet存儲可能出現的所有的值,然后取當前時間之后的部分的第一個。就是下次觸發的值。
PS:之前還以為多復雜,看了源碼之后才知道,我們都被忽悠了。
難怪CronTrigger的觸發表達式要這樣寫: 0 0/10 * * * 。。。
最終,JobRunShell就這樣被啟動了。
最后再回到 SchedulerFactoryBean 的start方法:
public void start() throws SchedulingException {
if (this.scheduler != null) {
try {
startScheduler(this.scheduler, this.startupDelay);
}
catch (SchedulerException ex) {
throw new SchedulingException("Could not start Quartz Scheduler", ex);
}
startScheduler中會調用前面創建的scheduler對象的start方法。將Quartz的信號量置為false,啟動Quartz主線程
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
}
// 這里就是將主線程的pause信號量置為false的地方
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
Quartz的工作原理和源碼分析大概就是這樣,知道了原理並不是很復雜的。
再回到項目中,之前那個操蛋的問題到底出在哪呢?一路分析下來我發現都沒問題,Quartz是正常啟動了。原因就在於有個同事提交了代碼,沒通知我,本地的代碼與服務器上的代碼已經不一致了,我還傻乎乎的遠程斷點調試,當然看不到進斷點了。