Quartz學習筆記


Quartz學習筆記

最近項目中要用到作業調度的功能,很自然想到大名鼎鼎的Quartz。但實際用的時候碰到一個很蛋疼的問題,自己定義的作業始終觸發不了,而且日志上也沒有異常拋出來。雖然最終問題解決了,而且問題的原因很操蛋,但還是把過程中自己對Quartz的一點點拙見理解寫下來,一來方便以后復習,而來本着分享精神。

Quartz中真正干活的幾個類如下:

QuartzScheduler 任務調度器(內部使用)

TreadPool Quartz線程池,干活的線程就是從這里分配出去並管理的。

因為比較懶,這里直接使用默認的SimpleTreadPool

QuartzSchedulerThread Quartz主線程。就是他負責找到需要出發的作業,並交給TreadPool執行

JobStore 貯存器,負責提供JobDetail和Trigger。

同樣因為比較懶,這里直接使用的RAMStore

Trigger 觸發器父類,負責控制作業的出發時間。這里使用的是CronTrigger

SchedulerFactoryBean Spring與Quartz的一個連接類,負責Quartz的初始化和啟動工作。

該類實現了InitializingBean,SmartLifecycle接口。所以初始化和啟動是由Spring負責調用的。

 

Quartz的原理大致如下:

IOC容器初始化時(我是用Spring與Quartz結合的)會創建並初始化Quartz線程池(TreadPool),並啟動它。剛啟動時線程池中每個線程都處於等待狀態,等待外界給他分配Runnable(持有作業對象的線程)。

然后會初始化並啟動Quartz的主線程(QuartzSchedulerThread),該線程自啟動后就會等待外界的信號量開始工作。外界給出工作信號量之后,該主線程的run方法才實質上開始工作。run中會獲取JobStore中下一次要觸發的作業,拿到之后會一直等待到該作業的真正觸發時間,然后將該作業包裝成一個JobRunShell對象(該對象實現了Runnable接口,其實看是上面TreadPool中等待外界分配給他的Runnable),然后將剛創建的JobRunShell交給線程池,由線程池負責執行作業。

線程池收到Runnable后,從線程池一個線程啟動Runnable,然后將該線程回收至空閑線程中。

JobRunShell對象的run方法就是最終通過反射調用作業的地方。

 

源碼分析過程大致如下(我只看了上面的幾個類,具體的類似配置文件讀取、Listener實現之類的就沒怎么看了。有興趣的可以自己看下):

因為我這里的Quartz是與Spring結合使用的,所以初始化的入口是SchedulerFactoryBean。該類實現了Spring的InitializingBean接口,所以IOC容器初始化完成后會調用afterPropertiesSet方法。Quartz的初始化也是在這里完成的。又因為該類實現了Spring的SmartLifecycle接口,所以真正啟動主線程的start方法也是由Spring調用的。

public void afterPropertiesSet() throws Exception {

/**

* schedulerFactoryClass默認是StdSchedulerFactory,initSchedulerFactory方法沒有仔細看,應該是讀取配置信息

*/

SchedulerFactory schedulerFactory = (SchedulerFactory)BeanUtils.instantiateClass(this.schedulerFactoryClass);

initSchedulerFactory(schedulerFactory);

。。。

// 所有的工作都是在createScheduler方法中做的:創建線程池、創建並啟動主線程。

// 但這里創建的主線程並沒有實質上的開始工作,他要等待外界的信號量

try {
this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
populateSchedulerContext();
}

。。。

// registerListeners注冊監聽器,這個方法沒有仔細看過

// registerJobsAndTriggers方法就是讀取配置的作業和他們的觸發器的地方

registerListeners();
registerJobsAndTriggers();

}

跟蹤createScheduler方法(這里返回的Scheduler對象就是最終要返回的Scheduler任務調度者):

protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
throws SchedulerException {

。。。

// 這里創建的是StdScheduler,調用方法的自然也是StdSchedulerFactory

Scheduler newScheduler = schedulerFactory.getScheduler();

。。。

}

跟蹤StdSchedulerFactory的getScheduler方法:

public Scheduler getScheduler() throws SchedulerException {

// 比較關鍵的就instantiate方法,其他的就是加載配置信息,判斷緩存里有沒有意見創建過的Scheduler等等

。。。

sched = instantiate();

。。。

}

跟蹤instantiate方法:

這個方法很長很長,我這里指截取其中某寫片段進行說明。

private Scheduler instantiate() throws SchedulerException {

。。。

// 這里就是創建線程池的地方。tpClass是默認是SimpleTreadPool,具體的下面會分析

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, null);

try {
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {。。。}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);

。。。

// 這里是創建JobStore的地方,負責保存作業和觸發器。這里是默認的RAMJobStore

String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS, RAMJobStore.class.getName());

try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {。。。}

。。。

// 這里就是創建Quartz內部調度器和Quartz主線程的地方。主線程會在QuartzScheduler的構造函數中創建並啟動

qs = new QuartzScheduler(rsrcs, schedCtxt, idleWaitTime, dbFailureRetry);

。。。

}

初始化的時候我關心的代碼大概就這么多。下面具體跟蹤看下

先從TreadPool的創建開始,這里創建的是SimpleTreadPool。SimpleTreadPool中持有3個List

private List workers; // 存放池中所有的線程引用
private LinkedList availWorkers = new LinkedList(); // 存放所有空閑的線程
private LinkedList busyWorkers = new LinkedList(); // 存放所有工作中的線程

public void initialize() throws SchedulerConfigException {

。。。
// 如果外界沒有配置,那默認的線程組就是main線程的第一層子線程組

if(isThreadsInheritGroupOfInitializingThread()) {
threadGroup = Thread.currentThread().getThreadGroup();
} else {
// follow the threadGroup tree to the root thread group.
threadGroup = Thread.currentThread().getThreadGroup();
ThreadGroup parent = threadGroup;
while ( !parent.getName().equals("main") ) {
threadGroup = parent;
parent = threadGroup.getParent();
}
threadGroup = new ThreadGroup(parent, schedulerInstanceName + "-SimpleThreadPool");
if (isMakeThreadsDaemons()) {
threadGroup.setDaemon(true);
}
}

// createWorkerThreads方法中會根據配置的池大小創建線程實例。並啟動池中每一個線程

// 這里啟動的線程就是上面說到的等待Runnable(JobRunShell)的線程。

// create the worker threads and start them
Iterator workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = (WorkerThread) workerThreads.next();
wt.start();
availWorkers.add(wt);
}
}

跟蹤createWorkerThreads方法:

// 池中實際的對象是WorkerThread對象。

protected List createWorkerThreads(int count) {
workers = new LinkedList();
for (int i = 1; i<= count; ++i) {
WorkerThread wt = new WorkerThread(this, threadGroup,
getThreadNamePrefix() + "-" + i,
getThreadPriority(),
isMakeThreadsDaemons());
if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
wt.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
workers.add(wt);
}

return workers;
}

跟蹤WorkerThread的run方法:

public void run() {
boolean ran = false;
boolean shouldRun = false;
synchronized(this) {
shouldRun = run;
}

while (shouldRun) {
try {
synchronized(this) {

// 放Runnable為空(外界還沒有給JobRunShell)的時候,這個線程無限等待。
while (runnable == null && run) {
this.wait(500);
}
}

if (runnable != null) {
ran = true;

// 這里就是JobRunShell的run方法,也就是作業最終被調用的地方。
runnable.run();
}
} catch (InterruptedException unblock) {
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {}
} finally {
synchronized(this) {
runnable = null;
}
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}

if (runOnce) {
synchronized(this) {
run = false;
}

// 如果只執行一次則執行完成后該對象不放入空閑線程隊列中
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;

// 將該對象從工作線程隊列中刪除,並且放入空閑隊列中。這個方法實際上就是線程的回收
makeAvailable(this);
}

}
synchronized(this) {
shouldRun = run;
}
}
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
}
}

線程池的代碼大概就是這樣,下面跟蹤QuartzScheduler的構造函數。這個類會創建Quartz的主線程。

public QuartzScheduler(QuartzSchedulerResources resources,

SchedulingContext ctxt, long idleWaitTime, long dbRetryInterval){

。。。

this.schedThread = new QuartzSchedulerThread(this, resources, ctxt);

。。。

}

QuartzSchedulerThread的構造函數中會將本身自啟動,進入run的等待中。

關注QuartzSchedulerThread的run方法(這個run方法也是很長很長,這里只截取關鍵的部分):

public void run() {

while (!halted.get()) {

try {

synchronized (sigLock) {

// paused 就是等待外界的信號量,

// 需要信號量pausedc=false才能開始工作 QuartzScheduler.start()方法中會設置pausedc=false
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}

if (halted.get()) {
break;
}

// 當線程池中有空閑線程時才執行(這里也不是嚴格的,如果配置的沒有空閑線程則創建一個新的)

int availTreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

if(availTreadCount > 0) {

。。。

// 這里會找到下一個要觸發的線程。具體的方法在下面會分析。

trigger = qsRsrcs.getJobStore().acquireNextTrigger(ctxt, now + idleWaitTime);

。。。等待線程到trigger的真正觸發時間。。。

// 創建JobRunShell,要執行的作業就在這里面

JobRunShell shell = null;

try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {。。。}

// 這里就是將JobRunShell交給線程池的地方

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {。。。}

。。。


}

}

}

}

跟蹤JobStore的acquireNextTrigger方法(這里是RAMJobStore)

// 實際上RAMJobStore持有一個TreeSet<Trigger> timeTriggers,排序方式是按觸發時間排的。觸發時間越早的排在前面。

// 所以這里只要取timeTriggers的first並驗證就可以了。

public Trigger acquireNextTrigger(SchedulingContext ctxt, long noLaterThan) {
TriggerWrapper tw = null;

synchronized (lock) {

while (tw == null) {
try {
tw = (TriggerWrapper) timeTriggers.first();
} catch (java.util.NoSuchElementException nsee) {
return null;
}

if (tw == null) {
return null;
}

if (tw.trigger.getNextFireTime() == null) {
timeTriggers.remove(tw);
tw = null;
continue;
}

timeTriggers.remove(tw);

if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
tw = null;
continue;
}

if(tw.trigger.getNextFireTime().getTime() > noLaterThan) {
timeTriggers.add(tw);
return null;
}

tw.state = TriggerWrapper.STATE_ACQUIRED;

tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
Trigger trig = (Trigger) tw.trigger.clone();
return trig;
}
}

return null;
}

這里還有一點,Trigger是怎么知道自己的觸發時間的。這里使用的是CronTrigger。通過源碼可以知道,Trigger的下次觸發時間是通過getNextFireTime方法得到的。CronTrigger的getNextFireTime方法是通過CronExpression對象的getTimeAfter方法實現的。CronExpression對象就是表示我們配置的觸發表達式的對象。類似這樣:0 0/10 * * * *

計算方法:

CronTrigger.getTimeAfter() 方法內部會調用CronExpression.getTimeAfter()方法。。。。。
利用Calendar類,單獨設置年月日小時分秒的值。
年月日小時分秒都有一個TreeSet存儲可能出現的所有的值,然后取當前時間之后的部分的第一個。就是下次觸發的值。

PS:之前還以為多復雜,看了源碼之后才知道,我們都被忽悠了。
難怪CronTrigger的觸發表達式要這樣寫: 0 0/10 * * * 。。。

 

最終,JobRunShell就這樣被啟動了。

最后再回到 SchedulerFactoryBean 的start方法:

public void start() throws SchedulingException {
if (this.scheduler != null) {
try {
startScheduler(this.scheduler, this.startupDelay);
}
catch (SchedulerException ex) {
throw new SchedulingException("Could not start Quartz Scheduler", ex);
}

startScheduler中會調用前面創建的scheduler對象的start方法。將Quartz的信號量置為false,啟動Quartz主線程

public void start() throws SchedulerException {

if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}

if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
}

// 這里就是將主線程的pause信號量置為false的地方

schedThread.togglePause(false);

getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");

notifySchedulerListenersStarted();
}

Quartz的工作原理和源碼分析大概就是這樣,知道了原理並不是很復雜的。

再回到項目中,之前那個操蛋的問題到底出在哪呢?一路分析下來我發現都沒問題,Quartz是正常啟動了。原因就在於有個同事提交了代碼,沒通知我,本地的代碼與服務器上的代碼已經不一致了,我還傻乎乎的遠程斷點調試,當然看不到進斷點了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM