Quartz是為大家熟知的任務調度框架,先看看官網的介紹:
-------------------------------------------------------------------------------------------------------------------------
What is the Quartz Job Scheduling Library?
Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.
Quartz is freely usable, licensed under the Apache 2.0 license.
-------------------------------------------------------------------------------------------------------------------------
翻譯:Quartz是一個功能豐富、開源的任務調度庫,它可以集成到幾乎任意Java應用中---小到最小的獨立應用,大到最大的電子商務系統。Quartz 可以用來創建簡單或者復雜的工作計划,同時執行數十、成百、甚至上萬的任務。可被定義為標准Java組件的任務,幾乎可以執行任意可以編程的任務。Quartz 任務調度包含許多企業級功能特性,比如支持JTA事務和集群。
Quartz可以免費試用,遵循 Apache 2.0 license 許可協議
-------------------------------------------------------------------------------------------------------------------------
公司項目也用的Quartz,最近遇到一些關於Quartz的問題,帶着疑問,查閱了部分Quartz源碼,與大家分享。
開始是為了研究Quartz的MisFire策略,當任務執行時間過長、服務停機、任務暫停等原因,導致其超過其下次執行的時間點時,就會涉及MisFire(失火,錯誤任務的觸發)處理的策略問題。 Quartz的任務分為SimpleTrigger和CronTrigger,項目中一般使用CronTrigger居多,本文只涉及了CronTrigger的MisFire處理策略(SimpleTrigger的MisFire策略與CronTrigger不同,后續再說)。
MisFire策略常量的定義在類CronTrigger中,列舉如下:
- MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1
- MISFIRE_INSTRUCTION_DO_NOTHING = 2
- MISFIRE_INSTRUCTION_SMART_POLICY = 0
- MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1
根據JavaDoc介紹和官網文檔分析,其對應執行策略如下:
- MISFIRE_INSTRUCTION_FIRE_ONCE_NOW:立即執行一次,然后按照Cron定義時間點執行
- MISFIRE_INSTRUCTION_DO_NOTHING:什么都不做,等待Cron定義下次任務執行的時間點
- MISFIRE_INSTRUCTION_SMART_POLICY:智能的策略,針對不同的Trigger執行不同,CronTrigger時為MISFIRE_INSTRUCTION_FIRE_ONCE_NOW
- MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY:將所有錯過的執行時間點全都補上,例如,任務15s執行一次,執行的任務錯過了4分鍾,則執行MisFire時,一次性執行4*(60/15)= 16次任務
但是,我寫了例子,實際執行策略1和策略2與文檔又不太相同,示例任務cron表達式為:0/10 * * * * ?,每10s執行一次。
測試步驟如下:
任務下次執行時間為15:05:10,misFire策略為MISFIRE_INSTRUCTION_FIRE_ONCE_NOW(1)
1.將任務暫停至15:05:35
2.重新啟動任務,任務瞬間執行了3次
將misFire策略設置為MISFIRE_INSTRUCTION_DO_NOTHING與上述表現一致。這個實驗結果與文檔描述不太相符。
於是,翻閱Quartz源碼,首先從定時任務本身入手,打斷點,找到任務執行工作線程為:WorkerThread對象,工作線程池為:SimpleThreadPool
核心代碼如下:
// WorkerThread.class
// 將任務送入工作線程
public void run(Runnable newRunnable) { synchronized(lock) { if(runnable != null) { throw new IllegalStateException("Already running a Runnable!"); } runnable = newRunnable; lock.notifyAll(); } }
//循環執行,當有任務送入時執行任務 @Override public void run() { boolean ran = false; while (run.get()) { try { synchronized(lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; runnable.run(); } } } catch (InterruptedException unblock) { // do nothing (loop will terminate if shutdown() was called try { getLog().error("Worker thread was interrupt()'ed.", unblock); } catch(Exception e) { // ignore to help with a tomcat glitch } } catch (Throwable exceptionInRunnable) { try { getLog().error("Error while executing the Runnable: ", exceptionInRunnable); } catch(Exception e) { // ignore to help with a tomcat glitch } } finally { synchronized(lock) { runnable = null; } // repair the thread in case the runnable mucked it up... if(getPriority() != tp.getThreadPriority()) { setPriority(tp.getThreadPriority()); } if (runOnce) { run.set(false); clearFromBusyWorkersList(this); } else if(ran) { ran = false; makeAvailable(this); } } }
可以看到當有任務送入工作線程時,任務將被執行。由此,反向找到線程池代碼,代碼如下:
// SimpleThreadPool.class
public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
可以看到線程池從可用的工作線程隊列中取出一個工作線程,將任務送入工作線程(WorkerThread),然后任務會被執行。
由此,反向找到調用方法runInThread的地方,類QuartzSchedulerThread(約398行),QuartzSchedulerThread集成自Thread,又是一個無限循環執行的線程任務,找到類QuartzSchedulerThread.run()方法(由於代碼量較大,此處不再全部粘貼),可以看到這個方法干的活大概是:循環找出需要執行的Job,然后送入線程池,再由線程池送入工作線程。
列舉部分關鍵代碼:
1.找出需要執行的Job的代碼
try {
//此處去數據庫查詢將要執行的任務 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) { if (acquiresFailed == 0) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; }
關鍵點在注釋處的代碼,方法:acquireNextTriggers,繼續debug跟進該方法,找到查詢SQL,代碼如下:
// StdJDBCDelegate.class
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount) throws SQLException { PreparedStatement ps = null; ResultSet rs = null; List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>(); try { ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE)); // Set max rows to retrieve if (maxCount < 1) maxCount = 1; // we want at least one trigger back. ps.setMaxRows(maxCount); // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need. // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception! ps.setFetchSize(maxCount); ps.setString(1, STATE_WAITING); ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan))); ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan))); rs = ps.executeQuery(); while (rs.next() && nextTriggers.size() <= maxCount) { nextTriggers.add(triggerKey( rs.getString(COL_TRIGGER_NAME), rs.getString(COL_TRIGGER_GROUP))); } return nextTriggers; } finally { closeResultSet(rs); closeStatement(ps); } }
根據debug時實時參數,處理過的SQL為:
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM qrtz_TRIGGERS WHERE SCHED_NAME = 'schedulerFactoryBean' AND TRIGGER_STATE = 'WAITING' AND NEXT_FIRE_TIME <= (now + idleWaitTime) AND ( MISFIRE_INSTR = -1 OR ( MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= (now - misfireThreshold) ) ) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
其中:now為系統當前時間,idleWaitTime為系統線程閑置時間,默認取值為30s,misfireThreshold為配置參數,意思為系統能容忍的misFire的最大閥值,默認為60s(當前系統配置也是60s,之前一直不知道這個值什么意思)。從SQL中看得很清楚了,這個SQL語句是要查詢出:未來30s內將要執行的任務,且MISFIRE_INSTR為-1(MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY),或者MISFIRE_INSTR不為-1,但是,NEXT_FIRE_TIME錯過的執行時間不能超過閥值60s。至此問題搞清楚了,影響misFire執行策略的另一個參數就是misfireThreshold,配置文件quartz.properties中,對應org.quartz.jobStore.misfireThreshold: 60000,單位毫秒。也就是說:如果【錯過時間】不超過60s都不算是misFire,不執行misFire策略,依次執行錯過的任務時間點;【錯過時間】超過60s按misFire策略執行。
根據上述結論重新進行試驗,將任務暫停時間超過60s,這次試驗結果與文檔描述一致。
另外,跟蹤啟動任務的代碼,找到處理misFire的方法,代碼位置:org.quartz.impl.triggers.CronTriggerImpl.updateAfterMisfire(Calendar)
@Override public void updateAfterMisfire(org.quartz.Calendar cal) { int instr = getMisfireInstruction(); if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) return; if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) { instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW; } if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) { Date newFireTime = getFireTimeAfter(new Date()); while (newFireTime != null && cal != null && !cal.isTimeIncluded(newFireTime.getTime())) { newFireTime = getFireTimeAfter(newFireTime); } setNextFireTime(newFireTime); } else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) { setNextFireTime(new Date()); } }
可以清楚看到,misFire的執行邏輯。
在翻閱源碼的同時,對之前比較疑惑的幾個問題也做了研究,比如:Quartz的任務執行機制如何實現等等問題,都可以輕松通過翻閱源碼找到答案,有興趣的 童鞋 可以自己去翻閱下代碼。
其實,針對這個問題,上網也可以查詢問題的原因,但是,個人感覺由翻閱源碼找到問題原因,對問題理解的更透徹,同時也能了解下Quartz的實現邏輯。鼓勵大家遇到問題,去翻閱框架的源碼,其實沒有想象中的那么復雜。
(以上如有錯誤,還請指正,歡迎留言評論)