quartz的使用(二.基本過程)


1.關於各個要素的創建,SchedulerFactoryBean,CronTriggerFactoryBean及JobDetailFactoryBean全部實現spring中的FactoryBean<CronTrigger>, BeanNameAware, InitializingBean用於生成各自的實例

1.1.JobDetailFactoryBean 使用JobDetail的實現類JobDetailImpl生成JobDetail實例

public void afterPropertiesSet() {
                if (this.name == null) {
                    this.name = this.beanName;
                }
                if (this.group == null) {
                    this.group = Scheduler.DEFAULT_GROUP;
                }
                if (this.applicationContextJobDataKey != null) {
                    if (this.applicationContext == null) {
                        throw new IllegalStateException(
                            "JobDetailBean needs to be set up in an ApplicationContext " +
                            "to be able to handle an 'applicationContextJobDataKey'");
                    }
                    getJobDataMap().put(this.applicationContextJobDataKey, this.applicationContext);
                }

                JobDetailImpl jdi = new JobDetailImpl();
                jdi.setName(this.name);
                jdi.setGroup(this.group);
                jdi.setJobClass((Class) this.jobClass);
                jdi.setJobDataMap(this.jobDataMap);
                jdi.setDurability(this.durability);
                jdi.setRequestsRecovery(this.requestsRecovery);
                jdi.setDescription(this.description);
                this.jobDetail = jdi;
            }

1.2.CronTriggerFactoryBean使用Trigger的實現類CronTriggerImpl生成Trigger實例

public void afterPropertiesSet() throws ParseException {
                if (this.name == null) {
                    this.name = this.beanName;
                }
                if (this.group == null) {
                    this.group = Scheduler.DEFAULT_GROUP;
                }
                if (this.jobDetail != null) {
                    this.jobDataMap.put("jobDetail", this.jobDetail);
                }
                if (this.startDelay > 0 || this.startTime == null) {
                    this.startTime = new Date(System.currentTimeMillis() + this.startDelay);
                }
                if (this.timeZone == null) {
                    this.timeZone = TimeZone.getDefault();
                }

                CronTriggerImpl cti = new CronTriggerImpl();
                cti.setName(this.name);
                cti.setGroup(this.group);
                if (this.jobDetail != null) {
                    cti.setJobKey(this.jobDetail.getKey());
                }
                cti.setJobDataMap(this.jobDataMap);
                cti.setStartTime(this.startTime);
                cti.setCronExpression(this.cronExpression);
                cti.setTimeZone(this.timeZone);
                cti.setCalendarName(this.calendarName);
                cti.setPriority(this.priority);
                cti.setMisfireInstruction(this.misfireInstruction);
                cti.setDescription(this.description);
                this.cronTrigger = cti;
            }

1.3.SchedulerFactoryBean 使用StdSchedulerFacotory(通過配置文件來設置Scheduler的各項參數,還有一種DirectSchedulerFactory主要通過硬編碼的不做介紹)
創建Scheduler的實現StdScheduler,然后將所有功能托管給QuartzScheduler,實際所有功能通過QuartzScheduler實例進行實現

public void afterPropertiesSet() throws Exception {
                ...

                // Create SchedulerFactory instance...
                SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
                initSchedulerFactory(schedulerFactory);

                ...

                // Get Scheduler instance from SchedulerFactory.
                try {
                    this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
                    populateSchedulerContext();

                    if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) {
                        // Use AdaptableJobFactory as default for a local Scheduler, unless when
                        // explicitly given a null value through the "jobFactory" bean property.
                        this.jobFactory = new AdaptableJobFactory();
                    }
                    if (this.jobFactory != null) {
                        if (this.jobFactory instanceof SchedulerContextAware) {
                            ((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext());
                        }
                        this.scheduler.setJobFactory(this.jobFactory);
                    }
                }

                ...

          //注冊監聽
          registerListeners();
          //注冊job和trigger(更新或插入數據庫)
          registerJobsAndTriggers();


            }

1.4 啟動QuartzScheduler.start(),通過spring的AbstractApplicationContext中的refresh()方法啟動

 2.任務運行過程

 2.1創建QuartzScheduler時創建調度器主線程並運行

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
                throws SchedulerException {
                
                ....

                this.schedThread = new QuartzSchedulerThread(this, resources);
                ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
                schedThreadExecutor.execute(this.schedThread);
                
                ...
            }

2.2 QuartzSchedulerThread運行

public void run() {
                boolean lastAcquireFailed = false;

                while (!halted.get()) {
                    try {
                        // check if we're supposed to pause...
                        synchronized (sigLock) {
                            while (paused && !halted.get()) {
                                try {
                                    //等待直到togglePause(false)被調用,在QuartzScheduler.start()調用以后
                                    sigLock.wait(1000L);
                                } catch (InterruptedException ignore) {
                                }
                            }

                            if (halted.get()) {
                                break;
                            }
                        }
                        //獲取可用線程的數量,獲取線程池,沒有可用線程時等待
                        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                        if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                            List<OperableTrigger> triggers = null;

                            long now = System.currentTimeMillis();

                            clearSignaledSchedulingChange();
                            try {
                                //1.從jobStore中獲取下次要觸發的觸發器集合//idleWaitTime == 30L * 1000L; 當調度程序發現沒有當前觸發器要觸發,它應該等待多長時間再檢查
                                triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                        now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                                lastAcquireFailed = false;
                                if (log.isDebugEnabled()) 
                                    log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                            } catch (JobPersistenceException jpe) {
                                if(!lastAcquireFailed) {
                                    qs.notifySchedulerListenersError(
                                        "An error occurred while scanning for the next triggers to fire.",
                                        jpe);
                                }
                                lastAcquireFailed = true;
                                continue;
                            } catch (RuntimeException e) {
                                if(!lastAcquireFailed) {
                                    getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                            +e.getMessage(), e);
                                }
                                lastAcquireFailed = true;
                                continue;
                            }
                            //若查詢出觸發器,則進行觸發
                            if (triggers != null && !triggers.isEmpty()) {

                                now = System.currentTimeMillis();
                                long triggerTime = triggers.get(0).getNextFireTime().getTime();
                                long timeUntilTrigger = triggerTime - now;
                                //循環直至距離觸發時間前2毫秒
                                while(timeUntilTrigger > 2) {
                                    synchronized (sigLock) {
                                        if (halted.get()) {
                                            break;
                                        }
                                        if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                            try {
                                                // we could have blocked a long while
                                                // on 'synchronize', so we must recompute
                                                now = System.currentTimeMillis();
                                                timeUntilTrigger = triggerTime - now;
                                                if(timeUntilTrigger >= 1)
                                                    sigLock.wait(timeUntilTrigger);
                                            } catch (InterruptedException ignore) {
                                            }
                                        }
                                    }
                                    if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                        break;
                                    }
                                    now = System.currentTimeMillis();
                                    timeUntilTrigger = triggerTime - now;
                                }

                                // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                                if(triggers.isEmpty())
                                    continue;

                                // set triggers to 'executing'
                                List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                                boolean goAhead = true;
                                synchronized(sigLock) {
                                    goAhead = !halted.get();
                                }
                                if(goAhead) {
                                    try {
                                        //2.通知JobStore觸發,其中加鎖,更改狀態
                                        List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                        if(res != null)
                                            //返回的數據賦值到bndles
                                            bndles = res;
                                    } catch (SchedulerException se) {
                                        qs.notifySchedulerListenersError(
                                                "An error occurred while firing triggers '"
                                                        + triggers + "'", se);
                                        //QTZ-179 : a problem occurred interacting with the triggers from the db
                                        //we release them and loop again
                                        for (int i = 0; i < triggers.size(); i++) {
                                            //異常時,下同 數據庫中ACQUIRED狀態更新回WAITING后下次循環
                                            qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                        }
                                        continue;
                                    }

                                }
                                //數據庫關於quartz的表全部設置完成
                                for (int i = 0; i < bndles.size(); i++) {
                                    TriggerFiredResult result =  bndles.get(i);
                                    TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                                    Exception exception = result.getException();

                                    if (exception instanceof RuntimeException) {
                                        getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                        
                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                        continue;
                                    }

                                    // it's possible to get 'null' if the triggers was paused,
                                    // blocked, or other similar occurrences that prevent it being
                                    // fired at this time...  or if the scheduler was shutdown (halted)
                                    if (bndle == null) {
                                        
                                        qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                        continue;
                                    }

                                    JobRunShell shell = null;
                                    try {
                                        //3.創建JobRunShell並初始化
                                        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                        shell.initialize(qs);
                                    } catch (SchedulerException se) {
                                        qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                        continue;
                                    }
                                    //交由線程池處理任務
                                    if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                        // this case should never happen, as it is indicative of the
                                        // scheduler being shutdown or a bug in the thread pool or
                                        // a thread pool being used concurrently - which the docs
                                        // say not to do...調度程序正在關閉或線程池或線程池中並發使用的錯誤
                                        getLog().error("ThreadPool.runInThread() return false!");
                                        qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                    }

                                }

                                continue; // while (!halted)
                            }
                        } else { // if(availThreadCount > 0)
                            // should never happen, if threadPool.blockForAvailableThreads() follows contract
                            continue; // while (!halted)
                        }
                        //在最后調度線程生成了一個隨機的等待時間,進入短暫的等待,這使得其他節點的調度器都有機會獲取數據庫資源.如此就實現了quratz的負載平衡
                        long now = System.currentTimeMillis();
                        long waitTime = now + getRandomizedIdleWaitTime();
                        long timeUntilContinue = waitTime - now;
                        synchronized(sigLock) {
                            try {
                              if(!halted.get()) {
                                // QTZ-336 A job might have been completed in the mean time and we might have
                                // missed the scheduled changed signal by not waiting for the notify() yet
                                // Check that before waiting for too long in case this very job needs to be
                                // scheduled very soon
                                if (!isScheduleChanged()) {
                                  sigLock.wait(timeUntilContinue);
                                }
                              }
                            } catch (InterruptedException ignore) {
                            }
                        }

                    } catch(RuntimeException re) {
                        getLog().error("Runtime error occurred in main trigger firing loop.", re);
                    }
                } // while (!halted)

                // drop references to scheduler stuff to aid garbage collection...
                qs = null;
                qsRsrcs = null;
            }
2.2.1.獲取下次要觸發的觸發器集合

過程為:獲取待觸發trigger-->數據庫LOCKS表TRIGGER_ACCESS行加鎖-->讀取JobDetail信息-->讀取trigger表中觸發器信息並標記為"已獲取"-->commit事務,釋放鎖

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
                throws JobPersistenceException {
                
                String lockName;
                //isAcquireTriggersWithinLock()方法判斷屬性acquireTriggersWithinLock默認為false,maxCount默認=1故默認不加鎖
                //可以在配置文件中配置org.quartz.jobStore.acquireTriggersWithinLock=true使其每次獲取時加鎖
                if(isAcquireTriggersWithinLock() || maxCount > 1) { 
                    lockName = LOCK_TRIGGER_ACCESS;
                } else {
                    lockName = null;
                }
                return executeInNonManagedTXLock(lockName, 
                        new TransactionCallback<List<OperableTrigger>>() {
                            public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                                //返回下次需要執行的trigger
                                return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                            }
                        },
                        new TransactionValidator<List<OperableTrigger>>() {
                            public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                                try {
                                    List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                                    Set<String> fireInstanceIds = new HashSet<String>();
                                    for (FiredTriggerRecord ft : acquired) {
                                        fireInstanceIds.add(ft.getFireInstanceId());
                                    }
                                    for (OperableTrigger tr : result) {
                                        if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                            return true;
                                        }
                                    }
                                    return false;
                                } catch (SQLException e) {
                                    throw new JobPersistenceException("error validating trigger acquisition", e);
                                }
                            }
                        });
            }
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
                throws JobPersistenceException {
                if (timeWindow < 0) {
                  throw new IllegalArgumentException();
                }
                
                List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
                Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
                final int MAX_DO_LOOP_RETRY = 3;
                int currentLoopCount = 0;
                do {
                    currentLoopCount ++;
                    try {
                        //sql為:SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS 
                        //WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
                        List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
                        
                        // No trigger is ready to fire yet.
                        if (keys == null || keys.size() == 0)
                            return acquiredTriggers;

                        long batchEnd = noLaterThan;

                        for(TriggerKey triggerKey: keys) {
                            // If our trigger is no longer available, try a new one.
                            //sql:SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
                            OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
                            if(nextTrigger == null) {
                                continue; // next trigger
                            }
                            
                            // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
                            // put it back into the timeTriggers set and continue to search for next trigger.
                            
                            JobKey jobKey = nextTrigger.getJobKey();
                            JobDetail job;
                            try {
                                //sql:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?
                                job = retrieveJob(conn, jobKey);
                            } catch (JobPersistenceException jpe) {
                                try {
                                    getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                                    getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
                                } catch (SQLException sqle) {
                                    getLog().error("Unable to set trigger state to ERROR.", sqle);
                                }
                                continue;
                            }
                            //(有狀態任務,類上是否加上注解@DisallowConcurrentExecution或實現StatefulJob,任務執行時間過長,下一次任務時間開始則阻塞不觸發)
                            if (job.isConcurrentExectionDisallowed()) {
                                //若已經存在則跳過
                                if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
                                    continue; // next trigger
                                } else {
                                    acquiredJobKeysForNoConcurrentExec.add(jobKey);
                                }
                            }
                            
                            if (nextTrigger.getNextFireTime().getTime() > batchEnd) {
                              break;
                            }
                            // We now have a acquired trigger, let's add to return list.
                            // If our trigger was no longer in the expected state, try a new one.
                            //將查詢出的WAITING狀態的trigger更新為ACQUIRED
                            //sql:UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?
                            int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                            if (rowsUpdated <= 0) {
                                continue; // next trigger
                            }
                            nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                            //sql:INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) 
                            //VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                        
                            getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

                            if(acquiredTriggers.isEmpty()) {
                                batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
                            }
                            //下次執行的Trigger放入acquiredTriggers List中 
                            acquiredTriggers.add(nextTrigger);
                        }

                        // if we didn't end up with any trigger to fire from that first
                        // batch, try again for another batch. We allow with a max retry count.
                        //如果為空,可以循環3次
                        if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                            continue;
                        }
                        
                        // We are done with the while loop.
                        break;
                    } catch (Exception e) {
                        throw new JobPersistenceException(
                                  "Couldn't acquire next trigger: " + e.getMessage(), e);
                    }
                } while (true);
                
                // Return the acquired trigger list
                return acquiredTriggers;
            }
2.2.2.觸發trigger

執行過程與上述過程類似,此時是必定加鎖的:數據庫LOCKS表STATE_ACCESS行加鎖-->確認trigger的狀態-->讀取trigger的JobDetail信息-->讀取trigger的Calendar信息-->更新trigger信息-->commit事務,釋放鎖

public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
                return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
                        new TransactionCallback<List<TriggerFiredResult>>() {
                            public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                                List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();

                                TriggerFiredResult result;
                                for (OperableTrigger trigger : triggers) {
                                    try {
                                        //主要方法
                                      TriggerFiredBundle bundle = triggerFired(conn, trigger);
                                      result = new TriggerFiredResult(bundle);
                                    } catch (JobPersistenceException jpe) {
                                        result = new TriggerFiredResult(jpe);
                                    } catch(RuntimeException re) {
                                        result = new TriggerFiredResult(re);
                                    }
                                    results.add(result);
                                }

                                return results;
                            }
                        },
                        new TransactionValidator<List<TriggerFiredResult>>() {
                            @Override
                            public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {
                                try {
                                    List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                                    Set<String> executingTriggers = new HashSet<String>();
                                    for (FiredTriggerRecord ft : acquired) {
                                        if (STATE_EXECUTING.equals(ft.getFireInstanceState())) {
                                            executingTriggers.add(ft.getFireInstanceId());
                                        }
                                    }
                                    for (TriggerFiredResult tr : result) {
                                        if (tr.getTriggerFiredBundle() != null && executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId())) {
                                            return true;
                                        }
                                    }
                                    return false;
                                } catch (SQLException e) {
                                    throw new JobPersistenceException("error validating trigger acquisition", e);
                                }
                            }
                        });
            }
protected TriggerFiredBundle triggerFired(Connection conn,
            OperableTrigger trigger)
                throws JobPersistenceException {
                JobDetail job;
                Calendar cal = null;

                // Make sure trigger wasn't deleted, paused, or completed...
                try { // if trigger was deleted, state will be STATE_DELETED
                    //查詢狀態,不為觸發狀態則跳過
                    //sql:SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
                    String state = getDelegate().selectTriggerState(conn,
                            trigger.getKey());
                    if (!state.equals(STATE_ACQUIRED)) {
                        return null;
                    }
                } catch (SQLException e) {
                    throw new JobPersistenceException("Couldn't select trigger state: "
                            + e.getMessage(), e);
                }

                try {
                    //sql:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?
                    job = retrieveJob(conn, trigger.getJobKey());
                    if (job == null) { return null; }
                } catch (JobPersistenceException jpe) {
                    try {
                        getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                        getDelegate().updateTriggerState(conn, trigger.getKey(),
                                STATE_ERROR);
                    } catch (SQLException sqle) {
                        getLog().error("Unable to set trigger state to ERROR.", sqle);
                    }
                    throw jpe;
                }
                //若有設置特殊的日期與cron關聯
                if (trigger.getCalendarName() != null) {
                    cal = retrieveCalendar(conn, trigger.getCalendarName());
                    if (cal == null) { return null; }
                }

                try {
                    //更新執行中trigger的信息
                    //sql:UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?
                    getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
                } catch (SQLException e) {
                    throw new JobPersistenceException("Couldn't insert fired trigger: "
                            + e.getMessage(), e);
                }

                Date prevFireTime = trigger.getPreviousFireTime();

                // call triggered - to update the trigger's next-fire-time state...
                //更新觸發器的下一個觸發時間狀態
                //previousFireTime = nextFireTime;
                //nextFireTime = getFireTimeAfter(nextFireTime);
                trigger.triggered(cal);

                String state = STATE_WAITING;
                boolean force = true;
                //任務是否是有狀態的,若是,則將狀態STATE_WAITING-->STATE_BLOCKED,STATE_ACQUIRED-->STATE_BLOCKED,STATE_PAUSED-->STATE_PAUSED_BLOCKED
                if (job.isConcurrentExectionDisallowed()) {
                    state = STATE_BLOCKED;
                    force = false;
                    try {
                        getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                                STATE_BLOCKED, STATE_WAITING);
                        getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                                STATE_BLOCKED, STATE_ACQUIRED);
                        getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                                STATE_PAUSED_BLOCKED, STATE_PAUSED);
                    } catch (SQLException e) {
                        throw new JobPersistenceException(
                                "Couldn't update states of blocked triggers: "
                                        + e.getMessage(), e);
                    }
                } 
                //判斷是否還有下次觸發
                if (trigger.getNextFireTime() == null) {
                    state = STATE_COMPLETE;
                    force = true;
                }
                //插入或更新trigger
                storeTrigger(conn, trigger, job, true, state, force, false);

                job.getJobDataMap().clearDirtyFlag();
                //創建一個 TriggerFiredBundle的對象
                return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
                        .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
                        .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
            }
2.2.3.實例化並執行Job

為每個Job生成一個可運行的RunShell,並放入線程池運行

public boolean runInThread(Runnable runnable) {
                if (runnable == null) {
                    return false;
                }

                synchronized (nextRunnableLock) {

                    handoffPending = true;

                    // Wait until a worker thread is available
                    //直至線程池有可用線程
                    while ((availWorkers.size() < 1) && !isShutdown) {
                        try {
                            nextRunnableLock.wait(500);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (!isShutdown) {
                        WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                        busyWorkers.add(wt);
                        wt.run(runnable);
                    } else {
                        // If the thread pool is going down, execute the Runnable
                        // within a new additional worker thread (no thread from the pool).
                        WorkerThread wt = new WorkerThread(this, threadGroup,
                                "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                        busyWorkers.add(wt);
                        workers.add(wt);
                        wt.start();
                    }
                    nextRunnableLock.notifyAll();
                    handoffPending = false;
                }

                return true;
            }
public void run() {
                qs.addInternalSchedulerListener(this);

                try {
                    OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
                    JobDetail jobDetail = jec.getJobDetail();

                    do {

                        JobExecutionException jobExEx = null;
                        Job job = jec.getJobInstance();

                        try {
                            begin();
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError("Error executing Job ("
                                    + jec.getJobDetail().getKey()
                                    + ": couldn't begin execution.", se);
                            break;
                        }

                        // notify job & trigger listeners...
                        try {
                            if (!notifyListenersBeginning(jec)) {
                                break;
                            }
                        } catch(VetoedException ve) {
                            try {
                                CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
                                qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
                                
                                // QTZ-205
                                // Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
                                if (jec.getTrigger().getNextFireTime() == null) {
                                    qs.notifySchedulerListenersFinalized(jec.getTrigger());
                                }

                                complete(true);
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError("Error during veto of Job ("
                                        + jec.getJobDetail().getKey()
                                        + ": couldn't finalize execution.", se);
                            }
                            break;
                        }

                        long startTime = System.currentTimeMillis();
                        long endTime = startTime;

                        // execute the job
                        try {
                            log.debug("Calling execute on job " + jobDetail.getKey());
                            //執行JOB的execute(),在測試中為QuartzJobBean的execute()其中會調用子類的executeInternal()方法
                            job.execute(jec);
                            endTime = System.currentTimeMillis();
                        } catch (JobExecutionException jee) {
                            //如果execute拋出異常,並且是JobExecutionException,JobExecutionException會保存着是重試,還是結束的信息
                            endTime = System.currentTimeMillis();
                            jobExEx = jee;
                            getLog().info("Job " + jobDetail.getKey() +
                                    " threw a JobExecutionException: ", jobExEx);
                        } catch (Throwable e) {
                            endTime = System.currentTimeMillis();
                            getLog().error("Job " + jobDetail.getKey() +
                                    " threw an unhandled Exception: ", e);
                            SchedulerException se = new SchedulerException(
                                    "Job threw an unhandled exception.", e);
                            qs.notifySchedulerListenersError("Job ("
                                    + jec.getJobDetail().getKey()
                                    + " threw an exception.", se);
                            jobExEx = new JobExecutionException(se, false);
                        }

                        jec.setJobRunTime(endTime - startTime);

                        // notify all job listeners
                        if (!notifyJobListenersComplete(jec, jobExEx)) {
                            break;
                        }

                        CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;

                        // update the trigger
                        try {
                            //根據不同狀態設置不同指令編碼
                            instCode = trigger.executionComplete(jec, jobExEx);
                        } catch (Exception e) {
                            // If this happens, there's a bug in the trigger...
                            SchedulerException se = new SchedulerException(
                                    "Trigger threw an unhandled exception.", e);
                            qs.notifySchedulerListenersError(
                                    "Please report this error to the Quartz developers.",
                                    se);
                        }

                        // notify all trigger listeners
                        if (!notifyTriggerListenersComplete(jec, instCode)) {
                            break;
                        }

                        // update job/trigger or re-execute job
                        if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                            jec.incrementRefireCount();
                            try {
                                complete(false);
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError("Error executing Job ("
                                        + jec.getJobDetail().getKey()
                                        + ": couldn't finalize execution.", se);
                            }
                            continue;
                        }

                        try {
                            complete(true);
                        } catch (SchedulerException se) {
                            qs.notifySchedulerListenersError("Error executing Job ("
                                    + jec.getJobDetail().getKey()
                                    + ": couldn't finalize execution.", se);
                            continue;
                        }
                        //任務完成,其方法中 根據instCode的值去更新不同的狀態,若加入了注解@DisallowConcurrentExecution則將STATE_BLOCKED-->STATE_BLOCKED
                        //若加上@PersistJobDataAfterExecution,則將_job_details表中的jobMapData數據持久化用於下次執行共享
                        qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                        break;
                    } while (true);

                } finally {
                    qs.removeInternalSchedulerListener(this);
                }
            }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM