1.關於各個要素的創建,SchedulerFactoryBean,CronTriggerFactoryBean及JobDetailFactoryBean全部實現spring中的FactoryBean<CronTrigger>, BeanNameAware, InitializingBean用於生成各自的實例

1.1.JobDetailFactoryBean 使用JobDetail的實現類JobDetailImpl生成JobDetail實例
public void afterPropertiesSet() { if (this.name == null) { this.name = this.beanName; } if (this.group == null) { this.group = Scheduler.DEFAULT_GROUP; } if (this.applicationContextJobDataKey != null) { if (this.applicationContext == null) { throw new IllegalStateException( "JobDetailBean needs to be set up in an ApplicationContext " + "to be able to handle an 'applicationContextJobDataKey'"); } getJobDataMap().put(this.applicationContextJobDataKey, this.applicationContext); } JobDetailImpl jdi = new JobDetailImpl(); jdi.setName(this.name); jdi.setGroup(this.group); jdi.setJobClass((Class) this.jobClass); jdi.setJobDataMap(this.jobDataMap); jdi.setDurability(this.durability); jdi.setRequestsRecovery(this.requestsRecovery); jdi.setDescription(this.description); this.jobDetail = jdi; }
1.2.CronTriggerFactoryBean使用Trigger的實現類CronTriggerImpl生成Trigger實例
public void afterPropertiesSet() throws ParseException { if (this.name == null) { this.name = this.beanName; } if (this.group == null) { this.group = Scheduler.DEFAULT_GROUP; } if (this.jobDetail != null) { this.jobDataMap.put("jobDetail", this.jobDetail); } if (this.startDelay > 0 || this.startTime == null) { this.startTime = new Date(System.currentTimeMillis() + this.startDelay); } if (this.timeZone == null) { this.timeZone = TimeZone.getDefault(); } CronTriggerImpl cti = new CronTriggerImpl(); cti.setName(this.name); cti.setGroup(this.group); if (this.jobDetail != null) { cti.setJobKey(this.jobDetail.getKey()); } cti.setJobDataMap(this.jobDataMap); cti.setStartTime(this.startTime); cti.setCronExpression(this.cronExpression); cti.setTimeZone(this.timeZone); cti.setCalendarName(this.calendarName); cti.setPriority(this.priority); cti.setMisfireInstruction(this.misfireInstruction); cti.setDescription(this.description); this.cronTrigger = cti; }
1.3.SchedulerFactoryBean 使用StdSchedulerFacotory(通過配置文件來設置Scheduler的各項參數,還有一種DirectSchedulerFactory主要通過硬編碼的不做介紹)
創建Scheduler的實現StdScheduler,然后將所有功能托管給QuartzScheduler,實際所有功能通過QuartzScheduler實例進行實現
public void afterPropertiesSet() throws Exception { ... // Create SchedulerFactory instance... SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass); initSchedulerFactory(schedulerFactory); ... // Get Scheduler instance from SchedulerFactory. try { this.scheduler = createScheduler(schedulerFactory, this.schedulerName); populateSchedulerContext(); if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) { // Use AdaptableJobFactory as default for a local Scheduler, unless when // explicitly given a null value through the "jobFactory" bean property. this.jobFactory = new AdaptableJobFactory(); } if (this.jobFactory != null) { if (this.jobFactory instanceof SchedulerContextAware) { ((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext()); } this.scheduler.setJobFactory(this.jobFactory); } } ...
//注冊監聽
registerListeners();
//注冊job和trigger(更新或插入數據庫)
registerJobsAndTriggers();
}
1.4 啟動QuartzScheduler.start(),通過spring的AbstractApplicationContext中的refresh()方法啟動
2.任務運行過程

2.1創建QuartzScheduler時創建調度器主線程並運行
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { .... this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); schedThreadExecutor.execute(this.schedThread); ... }
2.2 QuartzSchedulerThread運行
public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { //等待直到togglePause(false)被調用,在QuartzScheduler.start()調用以后 sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } } //獲取可用線程的數量,獲取線程池,沒有可用線程時等待 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { //1.從jobStore中獲取下次要觸發的觸發器集合//idleWaitTime == 30L * 1000L; 當調度程序發現沒有當前觸發器要觸發,它應該等待多長時間再檢查 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; continue; } //若查詢出觸發器,則進行觸發 if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; //循環直至距離觸發時間前2毫秒 while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { //2.通知JobStore觸發,其中加鎖,更改狀態 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) //返回的數據賦值到bndles bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { //異常時,下同 數據庫中ACQUIRED狀態更新回WAITING后下次循環 qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } //數據庫關於quartz的表全部設置完成 for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { //3.創建JobRunShell並初始化 shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } //交由線程池處理任務 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do...調度程序正在關閉或線程池或線程池中並發使用的錯誤 getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } //在最后調度線程生成了一個隨機的等待時間,進入短暫的等待,這使得其他節點的調度器都有機會獲取數據庫資源.如此就實現了quratz的負載平衡 long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }
2.2.1.獲取下次要觸發的觸發器集合
過程為:獲取待觸發trigger-->數據庫LOCKS表TRIGGER_ACCESS行加鎖-->讀取JobDetail信息-->讀取trigger表中觸發器信息並標記為"已獲取"-->commit事務,釋放鎖
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; //isAcquireTriggersWithinLock()方法判斷屬性acquireTriggersWithinLock默認為false,maxCount默認=1故默認不加鎖 //可以在配置文件中配置org.quartz.jobStore.acquireTriggersWithinLock=true使其每次獲取時加鎖 if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { //返回下次需要執行的trigger return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, new TransactionValidator<List<OperableTrigger>>() { public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException { try { List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> fireInstanceIds = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { fireInstanceIds.add(ft.getFireInstanceId()); } for (OperableTrigger tr : result) { if (fireInstanceIds.contains(tr.getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException("error validating trigger acquisition", e); } } }); }
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); } List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { //sql為:SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS //WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); // No trigger is ready to fire yet. if (keys == null || keys.size() == 0) return acquiredTriggers; long batchEnd = noLaterThan; for(TriggerKey triggerKey: keys) { // If our trigger is no longer available, try a new one. //sql:SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); if(nextTrigger == null) { continue; // next trigger } // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey = nextTrigger.getJobKey(); JobDetail job; try { //sql:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ? job = retrieveJob(conn, jobKey); } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } continue; } //(有狀態任務,類上是否加上注解@DisallowConcurrentExecution或實現StatefulJob,任務執行時間過長,下一次任務時間開始則阻塞不觸發) if (job.isConcurrentExectionDisallowed()) { //若已經存在則跳過 if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { continue; // next trigger } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } } if (nextTrigger.getNextFireTime().getTime() > batchEnd) { break; } // We now have a acquired trigger, let's add to return list. // If our trigger was no longer in the expected state, try a new one. //將查詢出的WAITING狀態的trigger更新為ACQUIRED //sql:UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ? int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING); if (rowsUpdated <= 0) { continue; // next trigger } nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); //sql:INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) //VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); if(acquiredTriggers.isEmpty()) { batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow; } //下次執行的Trigger放入acquiredTriggers List中 acquiredTriggers.add(nextTrigger); } // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. //如果為空,可以循環3次 if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list return acquiredTriggers; }
2.2.2.觸發trigger
執行過程與上述過程類似,此時是必定加鎖的:數據庫LOCKS表STATE_ACCESS行加鎖-->確認trigger的狀態-->讀取trigger的JobDetail信息-->讀取trigger的Calendar信息-->更新trigger信息-->commit事務,釋放鎖
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallback<List<TriggerFiredResult>>() { public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException { List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>(); TriggerFiredResult result; for (OperableTrigger trigger : triggers) { try { //主要方法 TriggerFiredBundle bundle = triggerFired(conn, trigger); result = new TriggerFiredResult(bundle); } catch (JobPersistenceException jpe) { result = new TriggerFiredResult(jpe); } catch(RuntimeException re) { result = new TriggerFiredResult(re); } results.add(result); } return results; } }, new TransactionValidator<List<TriggerFiredResult>>() { @Override public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException { try { List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> executingTriggers = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { if (STATE_EXECUTING.equals(ft.getFireInstanceState())) { executingTriggers.add(ft.getFireInstanceId()); } } for (TriggerFiredResult tr : result) { if (tr.getTriggerFiredBundle() != null && executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException("error validating trigger acquisition", e); } } }); }
protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger) throws JobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed... try { // if trigger was deleted, state will be STATE_DELETED //查詢狀態,不為觸發狀態則跳過 //sql:SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? String state = getDelegate().selectTriggerState(conn, trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: " + e.getMessage(), e); } try { //sql:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ? job = retrieveJob(conn, trigger.getJobKey()); if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; } //若有設置特殊的日期與cron關聯 if (trigger.getCalendarName() != null) { cal = retrieveCalendar(conn, trigger.getCalendarName()); if (cal == null) { return null; } } try { //更新執行中trigger的信息 //sql:UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ? getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); } catch (SQLException e) { throw new JobPersistenceException("Couldn't insert fired trigger: " + e.getMessage(), e); } Date prevFireTime = trigger.getPreviousFireTime(); // call triggered - to update the trigger's next-fire-time state... //更新觸發器的下一個觸發時間狀態 //previousFireTime = nextFireTime; //nextFireTime = getFireTimeAfter(nextFireTime); trigger.triggered(cal); String state = STATE_WAITING; boolean force = true; //任務是否是有狀態的,若是,則將狀態STATE_WAITING-->STATE_BLOCKED,STATE_ACQUIRED-->STATE_BLOCKED,STATE_PAUSED-->STATE_PAUSED_BLOCKED if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't update states of blocked triggers: " + e.getMessage(), e); } } //判斷是否還有下次觸發 if (trigger.getNextFireTime() == null) { state = STATE_COMPLETE; force = true; } //插入或更新trigger storeTrigger(conn, trigger, job, true, state, force, false); job.getJobDataMap().clearDirtyFlag(); //創建一個 TriggerFiredBundle的對象 return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); }
2.2.3.實例化並執行Job
為每個Job生成一個可運行的RunShell,並放入線程池運行
public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available //直至線程池有可用線程 while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null; Job job = jec.getJobInstance(); try { begin(); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't begin execution.", se); break; } // notify job & trigger listeners... try { if (!notifyListenersBeginning(jec)) { break; } } catch(VetoedException ve) { try { CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null); qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode); // QTZ-205 // Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not. if (jec.getTrigger().getNextFireTime() == null) { qs.notifySchedulerListenersFinalized(jec.getTrigger()); } complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } break; } long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); //執行JOB的execute(),在測試中為QuartzJobBean的execute()其中會調用子類的executeInternal()方法 job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { //如果execute拋出異常,並且是JobExecutionException,JobExecutionException會保存着是重試,還是結束的信息 endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // notify all job listeners if (!notifyJobListenersComplete(jec, jobExEx)) { break; } CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP; // update the trigger try { //根據不同狀態設置不同指令編碼 instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { // If this happens, there's a bug in the trigger... SchedulerException se = new SchedulerException( "Trigger threw an unhandled exception.", e); qs.notifySchedulerListenersError( "Please report this error to the Quartz developers.", se); } // notify all trigger listeners if (!notifyTriggerListenersComplete(jec, instCode)) { break; } // update job/trigger or re-execute job if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } continue; } try { complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); continue; } //任務完成,其方法中 根據instCode的值去更新不同的狀態,若加入了注解@DisallowConcurrentExecution則將STATE_BLOCKED-->STATE_BLOCKED //若加上@PersistJobDataAfterExecution,則將_job_details表中的jobMapData數據持久化用於下次執行共享 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } finally { qs.removeInternalSchedulerListener(this); } }
