前言
相信大家對Quartz這框架並不陌生,日常工作經常會接觸到,我們團隊也在使用。但是我發現大家在工作中對其僅停留在簡單配置使用層面,很多時候發生問題,並不知道它問題root cause
是什么,配置參數也是隨便在網上copy回來亂用,並不是基於項目實際情況。自從前幾年開始做技術管理后,工作期間也沒多少時間可以在一線擼碼,剛好趁周末時間重新把源碼看了一遍整理下,希望對大家有幫助!PS:本文基於Quartz2.3.0,不會介紹如何使用Quartz,完全沒有接觸過Quartz的朋友建議先閱讀官方文檔。
常見問題
-
Quartz的核心組件?
-
Quartz的核心運行機制?
-
Quartz的線程模型
-
Quartz集群進程間如何通信?
-
Quartz集群如何保證高並發下不重復跑?
-
Quartz如何保證不漏跑
-
Quartz默認任務鎖機制?
-
Quartz常見問題
Quartz的核心組件
JobDetail
我們創建一個實現 Job
接口的類,使用 JobBuilder
包裝成 JobDetail
,它可以攜帶 KV 的數據,方面用戶可以擴展自己任務要用的參數。
Trigger
定義任務的觸發規則,使用 TriggerBuilder
來構建。
為什么JobDetail和Trigger是一對多的關系
因為通常我們一個任務實際上是有多種觸發規則的,例如:我想我的跑批任務周一9點跑一次,周三5點跑一起,它實際上是屬於同一個Job,只是不同的觸發規則,這時候我們就可以定義多個Trigger組合起來用。
Set<Trigger> triggersForJob = new HashSet();
triggersForJob.add(trigger);
triggersForJob.add(trigger1);
// 綁定關系是1:N
scheduler.scheduleJob(jobDetail, triggersForJob,true);
常見的Tigger類型
接口 | 描述 | 特點 |
---|---|---|
SimpleTrigger | 簡單觸發器 | SimpleTrigger 可以定義固定時刻或者固定時間間隔的調度規則(精確到毫秒) 例如:每天 9 點鍾運行;每隔 30 分鍾運行一次 |
CalendarIntervalTrigger | 基於日歷的觸發器 | CalendarIntervalTrigger 可以定義更多時間單位的調度需求,精確到秒 好處是不需要去計算時間間隔,比如 1 個小時等於多少毫秒 例如每年、每個月、每周、每天、每小時、每分鍾、每秒 每年的月數和每個月的天數不是固定的,這種情況也適用 |
DailyTimeIntervalTrigger | 基於日期的觸發器 | 每天的某個時間段 例如:每天早上 9 點到晚上 9 點,每隔半個小時執行一次,並且只在周一到周六執行。 |
CronTrigger | 基於 Cron 表達式的觸發器 | 可以支持任意時間(推薦) 如:0/10 * * * * ? |
怎么排除掉一些日期不觸發
比較常見的需求是周末不計息、節假日不觸發郵件通知
如果要在觸發器的基礎上,排除一些時間區間不執行任務,就要用到 Quartz 的 Calendar
類(注意不是 JDK 的 Calendar
)。可以按年、月、周、日、特定日期、Cron 表達式
排除
使用方法
-
調用調度器的
addCalendar()
方法注冊排除規則 -
調用
Trigger
的modifiedByCalendar()
添加到觸發器中
//排除營業時間
scheduler.addCalendar("workingHours",new CronCalendar("* * 0-7,18-23?* *”"),false,false);
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.modifiedByCalendar("workingHours") //排除時間段
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();
Scheduler
調度器,是 Quartz 的指揮官,由
StdSchedulerFactory
產生,它是單例的,並且是 Quartz 中最重要的 API,默認是實現類是StdScheduler
,里面包含了一個QuartzScheduler
。QuartzScheduler
里面又包含了一個QuartzSchedulerThread
。
Scheduler 中的方法主要分為三大類:
-
操作調度器本身,例如調度器的啟動
start()
、調度器的關閉shutdown()
。 -
操作
Trigger
,例如pauseTriggers()
、resumeTrigger()
。 -
操作
Job
,例如scheduleJob()
、unscheduleJob()
、rescheduleJob()
這些方法非常重要,可以實現任務的動態調度。
Listener
事件監聽器。Quartz框架采用觀察者模式設計,可以無入侵式地讓用戶可以收到對應的通知。提供三種類型監聽器,分別是
SchedulerListener
(監聽 Scheduler 的),TriggerListener
(監聽 Trigger 的),JobListener
(監聽 Job 的)
場景
- 任務完成了,發郵件給對應的人。例如:跑批完成了,我想系統自動給我發一個郵件通知
- 監控任務整個生命周期。例如:作為一個中央分布式調度器需要通過
Webhook
或者MQ
觸發多個服務,想監控每個任務的執行情況,是否有遺漏
工具類:ListenerManager
,用於添加、獲取、移除監聽器
工具類:Matcher
,主要是基於 groupName
和 keyName
進行匹配。
JobStore
Jobstore 用來存儲任務和觸發器相關的信息,例如所有任務的名稱、數量、狀態等等。Quartz 中有兩種存儲任務的方式,一種在在內存,一種是在數據庫。
RAMJobStore
Quartz 默認
的 JobStore
是 RAMJobstore
,也就是把任務和觸發器信息運行的信息存儲在內存中,用到了 HashMap
、TreeSet
、HashSet
等等數據結構。
如果程序崩潰或重啟,所有存儲在內存中的數據都會丟失。所以我們需要把這些數 據持久化到磁盤。
JDBCJobStore
JDBCJobStore
可以通過 JDBC 接口,將任務運行數據保存在數據庫中。
JDBC 的實現方式有兩種,JobStoreSupport
類的兩個子類:
-
JobStoreTX
:在獨立的程序中使用,自己管理事務,不參與外部事務。 -
JobStoreCMT
:(Container Managed Transactions (CMT),如果需要容器管理事 務時,使用它。
Quartz的核心運行機制
以上只是梳理了Quartz的核心流程,列舉了一些核心組件,通過一下幾個方法作為源碼入口:
// Scheduler
Scheduler scheduler = factory.getScheduler();
// 綁定關系是1:N
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
從上圖可以看到,Quartz的核心流程大致分為三個階段:
- 獲取調度實例階段
- 通過
getScheduler
方法根據配置文件加載配置和初始化,創建線程池ThreadPool
(默認是SimpleThreadPool
,用來執行Quartz
調度任務),創建調度器QuartzScheduler
,創建調度線程QuartzSchedulerThread
,並將調度線程初始狀態設置為暫停狀態。
- 通過
- 綁定
JobDetail
和Trigger
階段Scheduler
將任務添加到JobStore
中,如果是使用數據庫存儲信息,這時候會把任務持久化到Quartz
核心表中,同時也會對實現JobListener
的監聽者通知任務已添加
- 啟動調度器階段
Scheduler
會調用QuartzScheduler
的Start()
方法,這時候會把調度線程從暫停切為啟動狀態,通知QuartzSchedulerThread
正式干活。QuartzSchedulerThread
會從SimpleThreadPool
查看下有多少可用工作線程,然后找JobStore
去拿下一批符合條件的待觸發的Trigger
任務列表,包裝成FiredTriggerBundle
。通過JobRunShellFactory
創建FiredTriggerBundle
的執行線程實例JobRunShell
,然后把JobRunShell
實例交給SimpleThreadPool
的工作線程去執行。SimpleThreadPool
會從可用線程隊列拿出對應數量的線程,去調用JobRunShell
的run()
方法,此時會執行任務類的execute
方法 :job.execute(JobExecutionContext context)
。
獲取調度實例階段
加載配置和初始化調度器
StdSchedulerFactory.getScheduler
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
//加載quartz.properties 配置文件
initialize();
}
//調度倉庫里維護着一個HashMap<String, Scheduler>,這里使用單例是為了全局共享
SchedulerRepository schedRep = SchedulerRepository.getInstance();
//實際上是從HashMap<String, Scheduler>里查找Scheduler,保證了調度器名稱必須是唯一
Scheduler sched = schedRep.lookup(getSchedulerName());
//如果調度器已經存在
if (sched != null) {
if (sched.isShutdown()) {
//假如調度器是關閉狀態,則從調度倉庫的HashMap移除
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
//調度器不存在則要進行初始化
sched = instantiate();
return sched;
}
StdSchedulerFactory.instantiate
對調度器進行初始化工作
private Scheduler instantiate() throws SchedulerException {
//...省略...
//存儲任務信息的 JobStore
JobStore js = null;
//線程池,默認是SimpleThreadPool
ThreadPool tp = null;
//核心調度器
QuartzScheduler qs = null;
//數據庫連接器
DBConnectionManager dbMgr = null;
//ID生成器,用來自動生成唯一的instance id
String instanceIdGeneratorClass = null;
//線程執行器,默認為 DefaultThreadExecutor
ThreadExecutor threadExecutor;
//...省略...
創建線程池(SimpleThreadPool)
StdSchedulerFactory.instantiate
這里創建了線程池,默認是配置文件指定的SimpleThreadPool
//從配置中獲取線程池類名,如果沒,默認選用SimpleThreadPool作為線程池
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {
initException = new SchedulerException(
"ThreadPool class not specified. ");
throw initException;
}
try {
//反射創建線程池
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' could not be instantiated.", e);
throw initException;
}
SimpleThreadPool
此時SimpleThreadPool在創建過程中,會初始化三個列表:
workers
(總工作線程隊列):存放所有的工作線程availWorkers
(可用工作線程隊列) :存放可用於做任務的工作線程busyWorkers
(繁忙工作線程隊列):存放已經占用的工作線程
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
初始化線程池
StdSchedulerFactory.instantiate
在該方法下面有一行對該線程池進行初始化
if(tp instanceof SimpleThreadPool) {
if(threadsInheritInitalizersClassLoader)
((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
}
//調用線程池初始化方法
tp.initialize();
SimpleThreadPool.initialize
在該方法里,會開始創建工作線程(WorkerThread),用於后面的任務執行,真正執行任務的是WorkerThread
的run()
方法
//根據用戶配置文件設置的線程數,來創建對應數量的工作線程
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = workerThreads.next();
//激活每個工作線程
wt.start();
//放在可用線程隊列等待被使用
availWorkers.add(wt);
}
創建核心調度器QuartzScheduler
StdSchedulerFactory.instantiate
這里創建核心調度器
//這里創建核心調度器,並且把QuartzSchedulerResources調度資源信息和idleWaitTime(調度器空閑等待的時間量)傳進去,默認30秒
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
QuartzScheduler.QuartzScheduler
創建調度器時,會對調度器的成員變量進行初始化,這里還會創建調度線程QuartzSchedulerThread
,它會負責把任務分配給線程池里的工作線程執行
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
//...省略...
//創建調度線程,resouces 里面有線程名稱
this.schedThread = new QuartzSchedulerThread(this, resources);
//創建線程執行器 ,默認是DefaultThreadExecutor
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//這里線程執行器會調用QuartzSchedulerThread的run()方法
schedThreadExecutor.execute(this.schedThread);
//...省略...
}
QuartzSchedulerThread.QuartzSchedulerThread
調度線程在實例化的時候,會把調度線程控制變量paused=ture
,是把調度線程暫停處理任務,halted=false
是要把調度線程開始監聽調度器控制變量paused
,就是讓調度線程開始運行但是不處理任務,等待被喚醒,下一步會提到
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
//...省略...
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
}
QuartzSchedulerThread.run
上面提到,調度線程會被schedThreadExecutor
執行,此時由於halted
被設置為false
,paused
設置為true,此時調度線程run()方法並不會向下處理任務,等待被激活,這里會等到后面Scheduler
調用start()
才會真正被激活
public void run() {
int acquiresFailed = 0;
//這里!halted.get() = true,因此會向下執行
while (!halted.get()) {
try {
//sigLock是調度線程內的一個成員變量,用於控制線程並發
synchronized (sigLock) {
// 檢查是否為暫停狀態,此時paused && !halted.get() =false,會在這里循環等待,不會往下執行
while (paused && !halted.get()) {
try {
//暫停狀態時,嘗試去獲得信號鎖,使當前線程等待直到另一個線程調用,超時時間是1秒
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// 暫停時重置失敗計數器,這樣我們就不會取消暫停后再次等待
acquiresFailed = 0;
}
//這里為false,因此會直接跳出循環,不會向后執行任務
if (halted.get()) {
break;
}
//...省略...
}
綁定JobDetail和Trigger階段
執行作業調度
StdScheduler.scheduleJob
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException {
//這里實際調用的是QuartzScheduler
return sched.scheduleJob(jobDetail, trigger);
}
QuartzScheduler.scheduleJob
public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
//...省略...
//持久化JobDetail和trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
//通知scheduler監聽者
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
return ft;
}
啟動調度器階段
調用調度器啟動方法
StdScheduler.start
StdScheduler
只是代理類,實際上還是調用QuartzScheduler
public void start() throws SchedulerException {
//調用QuartzScheduler.start()方法
sched.start();
}
通知調度線程開始干活
QuartzScheduler.start
public void start() throws SchedulerException {
//...省略...
//通知Scheduler監聽者任務開始啟動
notifySchedulerListenersStarting();
//第一次啟動,這里initialStart為空
if (initialStart == null) {
initialStart = new Date();
//這里將恢復任何失敗或誤觸發的作業並根據需要清理數據存儲,錯過的任務會在這里重跑
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
//如果initialStart不為空,意味着之前已經做過初始化,則把調度器狀態恢復成運行中
resources.getJobStore().schedulerResumed();
}
//這里實際上讓調度線程QuartzSchedulerThread開始執行任務,前面有提到調度線程雖然已經激活,但是由於Pause為true,因此它沒辦法處理任務,實際處於停止狀態
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
//通知Scheduler監聽者任務已經啟動
notifySchedulerListenersStarted();
}
QuartzSchedulerThread.togglePause
//切換暫停狀態
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
//如果暫停,這里是要中斷任何可能發生的睡眠,等待着被喚醒
signalSchedulingChange(0);
} else {
//喚醒在此對象監視器上等待的所有線程。
sigLock.notifyAll();
}
}
}
調度線程正式開始執行任務
QuartzSchedulerThread.run
這里由於上面一步已經把pause切換成false,因此調度線程的run()方法可以開始處理任務
//...省略...
//由於pause已經被切換成flase,這里會跳出循環,線程會往下繼續執行
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
acquiresFailed = 0;
}
//...省略...
// 獲取線程池可用線程數量
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
//可用線程數量>0才往下執行
if(availThreadCount > 0) {
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 獲取需要下次執行的 triggers
// idleWaitTime: 默認 30s
// availThreadCount:獲取可用(空閑)的工作線程數量,總會大於 1,因為該方法會一直阻塞, 直到有工作線程空閑下來。
// maxBatchSize:一次拉取 trigger 的最大數量,默認是 1
// batchTimeWindow:時間窗口調節參數,默認是 0
// misfireThreshold: 超過這個時間還未觸發的 trigger,被認為發生了 misfire,默認 60s
// 調度線程一次會拉取 NEXT_FIRETIME 小於(now + idleWaitTime +batchTimeWindow),大 於(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個 triggers,默認情況下,會拉取未來 30s、 過去 60s 之間還未 fire 的 1 個 trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//...省略...
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
// 觸發 Trigger,把 ACQUIRED 狀態改成 EXECUTING
// 如果這個 trigger 的 NEXTFIRETIME 為空,也就是未來不再觸發,就將其狀態改為 COMPLETE // 如果 trigger 不允許並發執行(即 Job 的實現類標注了@DisallowConcurrentExecution), 則將狀態變為 BLOCKED,否則就將狀態改為 WAITING
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
//...省略...
continue;
}
}
//循環處理trigger
for (int i = 0; i < bndles.size(); i++) {
//從trigger任務集合取出一個
TriggerFiredResult result = bndles.get(i);
//把trigger任務包裝成TriggerFiredBundle
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
//...省略...
JobRunShell shell = null;
try {
// 根據 trigger 信息實例化 JobRunShell(implements Runnable),同時依據 JOB_CLASS_NAME 實例化 Job,隨后我們將 JobRunShell 實例丟入工作線。
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//調用線程池的runInThread方法,實際上是調用JobRunShell的run()方法
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
//...省略...
SimpleThreadPool.runInThread
這里線程池開始從可用線程隊列分配工作線程去處理JobRunShell
的run()方法
public boolean runInThread(Runnable runnable) {
//...省略...
//假如線程沒有關閉
if (!isShutdown) {
//從可用工作線程隊列移除一條工作線程
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
//把工作線程加入到繁忙工作線程隊列
busyWorkers.add(wt);
//執行JobRunShell的run方法
wt.run(runnable);
} else {
//加入線程池准備要關閉,開啟一個線程池里沒有的新工作線程
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
//加入到繁忙工作線程隊列
busyWorkers.add(wt);
//工作線程隊列加入該新工作線程
workers.add(wt);
//執行JobRunShell的run方法
wt.start();
}
//...省略...
return true;
}
JobRunShell
用來為 Job
提供安全的運行環境的,執行 Job
中所有的作業,捕獲運行中的異常,在任務執行完畢的
時候更新 Trigger
狀態,等等。
JobRunShell
實例是用 JobRunShellFactory
為 QuartzSchedulerThread
創建的,在調度器決定一個 Job
被觸發的時候,它從線程池中取出一個線程來執行任務。
Quartz線程模型
SimpleThreadPool
:包工頭,管理所有WorkerThread
WorkerThread
:工人,把Job
包裝成JobRunShell
執行QuartSchedulerThread
:項目經理,獲取即將觸發的Trigger
,從問包工頭拿一個空閑的worker
,執行Trigger
綁定的任務
Quartz集群進程間如何通信
Quartz集群之間是通過數據庫幾張核心的Quartz表進行通信
表名 | 作用 |
---|---|
QRTZ_BLOB_TRIGGERS | Trigger 作為 Blob 類型存儲 |
QRTZ_CALENDARS | 存儲 Quartz 的 Calendar 信息 |
QRTZ_CRON_TRIGGERS | 存儲 CronTrigger,包括 Cron 表達式和時區信息 |
QRTZ_FIRED_TRIGGERS | 存儲與已觸發的 Trigger 相關的狀態信息,以及相關 Job 的執行信息 |
QRTZ_JOB_DETAILS | 存儲每一個已配置的 Job 的詳細信息 |
QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的 Trigger 組的信息 |
QRTZ_SCHEDULER_STATE | 存儲少量的有關 Scheduler 的狀態信息,和別的 Scheduler 實例 |
QRTZ_SIMPLE_TRIGGERS | 存儲 SimpleTrigger 的信息,包括重復次數、間隔、以及已觸的次數 |
QRTZ_SIMPROP_TRIGGERS | 存儲 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 兩種類型的觸發器 |
QRTZ_TRIGGERS | 存儲已配置的 Trigger 的信息 |
Quartz集群如何保證高並發下不重復跑
Quartz有多個節點同時在運行,而任務是共享的,這時候肯定存在資源競爭問題,容易造成並發問題,Quartz節點之間是否存在分布式鎖去控制?
Quartz
是通過數據庫去作為分布式鎖來控制多進程並發問題,Quartz
加鎖的地方很多,Quartz
是使用悲觀鎖的方式進行加鎖,讓在各個instance操作Trigger
任務期間串行,這里挑選核心的代碼來看看它是符合利用數據庫防止並發的。
使用數據庫鎖需要在quartz.properties
中加以下配置,讓集群生效Quartz才會對多個instance進行並發控制
org.quartz.jobStore.isClustered = true
QRTZ_LOCKS
表,它會為每個調度器創建兩行數據,獲取 Trigger 和觸發 Trigger 是兩把鎖,加鎖入口在JobStoreSupport
類中,Quartz提供的鎖表,為多個節點調度提供分布式鎖,實現分布式調度,默認有2個鎖
SCHED_NAME | LOCK_NAME |
---|---|
Myscheduler | STATE_ACCESS |
Myscheduler | TRIGGER_ACCESS |
STATE_ACCESS
主要用在scheduler定期檢查是否失效的時候,保證只有一個節點去處理已經失效的scheduler;
TRIGGER_ACCESS
主要用在TRIGGER被調度的時候,保證只有一個節點去執行調度
QuartzSchedulerThread.run
調度線程在獲取下一個Trigger任務的時候,會在Quartz表加行級鎖,入口在這
//...省略...
//
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//...省略...
JobStoreSupport.acquireNextTriggers
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
//...省略...
//這里會進入加鎖控制,lockName是鎖的key
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
//...省略...
JobStoreSupport.executeInNonManagedTXLock
這里會進入非托管事務,加入lockName不為空,需要先獲取鎖才能執行事務回調方法和事務校驗方法
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
//只要作為鎖的key不為空,在這里就會調用JobStoreTx獲取數據庫連接
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
//真正加鎖的入口,通過LockHandler去調用DBSemaphore操作數據庫獲取鎖
transOwner = getLockHandler().obtainLock(conn, lockName);
}
//...省略...
DBSemaphore.obtainLock
這里會通過執行兩條SQL
去向調用線程授予對已識別資源的鎖定(阻塞)直到可用
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
//...省略...
//判斷當前調用線程是否對標識的資源持有鎖,加入已經持有該鎖,則直接跳過
if (!isLockOwner(lockName)) {
//通過調用StdRowLockSemaphore的executeSQL方法對expandedSQL, expandedInsertSQL對lockName進行加鎖控制
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
//...省略...
}
StdRowLockSemaphore.executeSQL
如果已經有lockName
代表的行,直接加鎖,如果沒有插入。但是在加鎖時或插入時有可能失敗,失敗則重試,重試如果超過一定次數就會直接拋出異常。這里是使用悲觀鎖的方式進行加鎖
protected void executeSQL(Connection conn, final String lockName, final String expandedSQL, final String expandedInsertSQL) throws LockException {
//...省略...
ps = conn.prepareStatement(expandedSQL);
//...省略...
ps.setString(1, lockName);
//先執行查詢,看看表里是否已經有該存在
rs = ps.executeQuery();
//...省略...
// 如果查詢結果不為空
if (!rs.next()) {
ps.setString(1, lockName);
//
int res = ps.executeUpdate();
//...省略...
return; // obtained lock, go
}
這兩條SQL是在DBSemaphore
初始化的時候塞進來的
public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
this.tablePrefix = tablePrefix;
this.schedName = schedName;
setSQL(defaultSQL);
setInsertSQL(defaultInsertSQL);
}
再看看調用鏈會發現,這兩條SQL是在StdRowLockSemaphore
初始化的時候調用父類DBSemaphore
構造方法傳進來,分別是selectWithLockSQL
和SELECT_FOR_LOCK
public StdRowLockSemaphore(String tablePrefix, String schedName, String selectWithLockSQL) {
super(tablePrefix, schedName, selectWithLockSQL != null ? selectWithLockSQL : SELECT_FOR_LOCK, INSERT_LOCK);
}
兩條SQL分別是:
public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("
+ SCHED_NAME_SUBST + ", ?)";
把參數替換進去就比較清晰可以看到,Quartz通過在qrtz_LOCKS
表對當前schedule job
加兩個行級鎖
expandedSQL:select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
expandedInsertSQL:INSERT INTO qrtz_LOCKS(SCHED_NAME, LOCK_NAME) VALUES ('MySchedule', 'TRIGGER_ACCESS')
Quartz集群如何保證高並發下不漏跑
有時候Quartz
可能會錯過我們的調度任務:
- 服務重啟,沒能及時執行任務,就會misfire
- 工作線程去運行優先級更高的任務,就會misfire
- 任務的上一次運行還沒結束,下一次觸發時間到達,就會misfire
Quartz
可提供了一些補償機制應對misfire
情況,用戶可以根據需要選擇對應的策略,這里挑選常用的cronTrigger
作為示例
-
withMisfireHandlingInstructionDoNothing
- 不觸發立即執行
- 等待下次Cron觸發頻率到達時刻開始按照Cron頻率依次執行
-
withMisfireHandlingInstructionIgnoreMisfires
- 以錯過的第一個頻率時間立刻開始執行
- 重做錯過的所有頻率周期后當下一次觸發頻率發生時間大於當前時間后,再按照正常的Cron頻率依次執行
-
withMisfireHandlingInstructionFireAndProceed
(默認)- 以當前時間為觸發頻率立刻觸發一次執行,然后按照Cron頻率依次執行
假如用戶沒有設置Misfire
指令,Quartz
默認指定MISFIRE_INSTRUCTION_SMART_POLICY
作為默認策略,在Trigger
接口的getMisfireInstruction
源碼可以看到:
/**
* Get the instruction the <code>Scheduler</code> should be given for
* handling misfire situations for this <code>Trigger</code>- the
* concrete <code>Trigger</code> type that you are using will have
* defined a set of additional <code>MISFIRE_INSTRUCTION_XXX</code>
* constants that may be set as this property's value.
*
* <p>
* If not explicitly set, the default value is <code>MISFIRE_INSTRUCTION_SMART_POLICY</code>.
* </p>
*
* @see #MISFIRE_INSTRUCTION_SMART_POLICY
* @see SimpleTrigger
* @see CronTrigger
*/
public int getMisfireInstruction();
這里繼續以CronTrigger
舉例,其他類型Trigger
也類似 。如果是默認策略MISFIRE_INSTRUCTION_SMART_POLICY
,在CronTrigger
會選用MISFIRE_INSTRUCTION_FIRE_ONCE_NOW
,該策略的特點是立刻執行一次,然后后面的任務就按照正常的計划執行。
@Override
public void updateAfterMisfire(org.quartz.Calendar cal) {
int instr = getMisfireInstruction();
if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
return;
if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
}
if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
Date newFireTime = getFireTimeAfter(new Date());
while (newFireTime != null && cal != null
&& !cal.isTimeIncluded(newFireTime.getTime())) {
newFireTime = getFireTimeAfter(newFireTime);
}
setNextFireTime(newFireTime);
} else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
setNextFireTime(new Date());
}
}
Quartz對於misfire任務大致處理流程
-
QuartzScheduler.start()
啟動調度 -
JobStoreSupport.schedulerStarted()
執行啟動調度方法 -
創建和初始化
misfireHandler
-
異步執行
misfireHandler.run
方法處理misfire
任務 -
MisfileHandler
通過JobStoreSupport
去查詢有沒有misfire
的任務,查詢條件是當前狀態是waiting
,下一次trigger時間
<當前時間-misfire預設閾值
(默認1分鍾)
int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
getDelegate().countMisfiredTriggersInState(
conn, STATE_WAITING, getMisfireTime()) :
Integer.MAX_VALUE;
String COUNT_MISFIRED_TRIGGERS_IN_STATE = "SELECT COUNT("
+ COL_TRIGGER_NAME + ") FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND NOT ("
+ COL_MISFIRE_INSTRUCTION + " = " + Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY + ") AND "
+ COL_NEXT_FIRE_TIME + " < ? "
+ "AND " + COL_TRIGGER_STATE + " = ?";
protected long getMisfireTime() {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
//當前時間減去misfire預設閾值,閾值默認一分鍾
misfireTime -= getMisfireThreshold();
}
return (misfireTime > 0) ? misfireTime : 0;
}
-
JobStoreSupport
通過StdRowLockSemaphore
去獲取TRIGGER_ACCESS
鎖 -
查詢所有
misfire
任務,查詢條件:status=waiting,current_time-next_fire_time>misfireThreshold
(可配置,默認1分鍾)【即實際觸發時間-預計觸發時間大於容忍度時間】,獲取misfired的trigger,maxToRecoverAtATime
默認一個事務中只能最大有20
個misfired trigger(可配置) -
通過
updateAfterMisfired
方法獲取misfired的策略(默認是MISFIRE_INSTRUCTION_SMART_POLICY
該策略在CronTrigger
中為MISFIRE_INSTRUCTION_FIRE_ONCE_NOW
),根據策略設置nexFireTime
。 -
將
nextFireTime
等更新或者插入到trigger
表; -
提交事務,釋放鎖
Quartz默認任務鎖機制
Quartz是否一定會加鎖?什么情況下不會加鎖?應該怎么避免並發問題?
什么情況下不會加鎖?
回到JobStoreSupport
的 acquireNextTriggers()
方法,可以看到當isAcquireTriggersWithinLock()
為true
或者maxCount>1
才會加鎖,否則lockName
為空
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
//..省略..
}
});
}
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
//..省略...
}
Quartz
加鎖的條件有以下兩個:
- 如 果
acquireTriggersWithinLock=true
或 者batchTriggerAcquisitionMaxCount>1
時 ,lockName
賦 值 為
LOCK_TRIGGER_ACCESS
,此時獲取 Trigger
會加鎖。
- 否則,如果
isAcquireTriggersWithinLock()
值是false
並且maxCount=1
的話,lockName
賦值為null
,這種情況獲取Trigger
下不加鎖。
那這兩個參數的默認值是什么?
acquireTriggersWithinLock
變量默認是 false
private boolean acquireTriggersWithinLock = false;
maxCount
來自 QuartzSchedulerThread
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
getMaxBatchSize()
來自 QuartzSchedulerResources
,代表 Scheduler
一次拉取
trigger
的最大數量,默認是 1
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
什么情況下需要加鎖?
QuartzSchedulerThread
的 triggersFired()
方法
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
調用了 JobStoreSupport
的 triggersFired()
方法,接着又調用了triggerFired(Connection conn, OperableTrigger trigger)
方法:
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() {
public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) {
try {
//觸發
TriggerFiredBundle bundle = triggerFired(conn, trigger);
result = new TriggerFiredResult(bundle);
//...省略...
protected TriggerFiredBundle triggerFired(Connection conn,
OperableTrigger trigger)
throws JobPersistenceException {
JobDetail job;
Calendar cal = null;
// Make sure trigger wasn't deleted, paused, or completed...
try { // if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
//...省略...
如果 Trigger
的狀態不是 ACQUIRED
,也就是說被其他的線程 fire
了,返回空。但是這種樂觀鎖的檢查在高並發下難免會出現 ABA
的問題,比如線程 A 拿到的時候還是 ACQUIRED
狀態,但是剛准備執行的時候已經變成了 EXECUTING
狀態,這個時候就會 出現重復執行的問題。
把執行步驟拆解下,比較容易看到該問題:
推薦
如果設置的數量為 1(默認值),並且使用 JDBC JobStore(RAMJobStore 不支持 分 布 式 , 只 有 一 個 調 度 器 實 例 , 所 以 不 加 鎖 ) , 則 屬 性 org.quartz.jobStore.acquireTriggersWithinLock
應設置為 true
。否則不加鎖可能會導致任務重復執行。
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1 org.quartz.jobStore.acquireTriggersWithinLock=true
Quartz常見問題
服務器始終不一致問題
常見異常:
This scheduler instance (SchedulerName) is still active but was recovered by another instance in the cluster
解決:
同步所有集群節點的時間然后重啟服務
Quartz集群負載不均衡
Quartz集群是采用搶占式加鎖方式去處理任務,因此你會看到每個節點的任務處理日志並不是均衡分配的,很可能一個節點會搶占大量任務導致負載過重,但是這一點官方並沒有解決。
錯過預定觸發時間
常見異常:
Handling 1 trigger(s) that missed their scheduled fire-time
解決:
很可能是你線程數設置太少,而任務執行時間太長,超過的misfire
閾值,導致線程池沒有可用線程而錯過了觸發事件。嘗試把配置文件線程數調大org.quartz.threadPool.threadCount
或者把misfire
閾值調大org.quartz.jobStore.misfireThreshold