quartz2.x源碼分析——啟動過程



title: quartz2.x源碼分析——啟動過程
date: 2017-04-13 14:59:01
categories: quartz
tags: [quartz, 源碼分析]

先簡單介紹一下quartz,Quartz是一個功能豐富的開源作業調度庫,可以集成到幾乎任何Java應用程序中 - 從最小的獨立應用程序到最大的電子商務系統。quartz可用於創建執行數十,數百甚至數十萬個作業的簡單或復雜的計划; 任務定義為標准Java組件的任務,可以執行任何可以對其進行編程的任何內容。Quartz Scheduler包含許多企業級功能,例如支持JTA事務和集群。
以上內容來自quartz官網

我們會針對quartz的幾個主要類進行分析,看一下quartz是如何實現定時調度功能

1.1測試Demo

來自官方實例的example1

public class SimpleExample {

  public void run() throws Exception {
    Logger log = LoggerFactory.getLogger(SimpleExample.class);

    log.info("------- Initializing ----------------------");

    // First we must get a reference to a scheduler
    SchedulerFactory sf = new StdSchedulerFactory();
    Scheduler sched = sf.getScheduler();

    log.info("------- Initialization Complete -----------");

    // computer a time that is on the next round minute
    Date runTime = evenMinuteDate(new Date());

    log.info("------- Scheduling Job  -------------------");

    // define the job and tie it to our HelloJob class
    JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();

    // Trigger the job to run on the next round minute
    Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();

    // Tell quartz to schedule the job using our trigger
    sched.scheduleJob(job, trigger);
    log.info(job.getKey() + " will run at: " + runTime);

    // Start up the scheduler (nothing can actually run until the
    // scheduler has been started)
    sched.start();

    log.info("------- Started Scheduler -----------------");

    // wait long enough so that the scheduler as an opportunity to
    // run the job!
    log.info("------- Waiting 65 seconds... -------------");
    try {
      // wait 65 seconds to show job
      Thread.sleep(65L * 1000L);
      // executing...
    } catch (Exception e) {
      //
    }

    // shut down the scheduler
    log.info("------- Shutting Down ---------------------");
    sched.shutdown(true);
    log.info("------- Shutdown Complete -----------------");
  }

  public static void main(String[] args) throws Exception {

    SimpleExample example = new SimpleExample();
    example.run();

  }

}

public class HelloJob implements Job {

    private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
    public HelloJob() {
    }
    public void execute(JobExecutionContext context)
        throws JobExecutionException {
        _log.info("Hello World! - " + new Date());
    }

}

1.2 quartz關鍵API

從上述example中我們可以看到quartz主要接口和類

  • Scheduler - 進行作業調度的主要接口.
  • Job - 作業接口,編寫自己的作業需要實現,如example中的HelloJob
  • JobDetail - 作業的詳細信息,除了包含作業本身,還包含一些額外的數據。
  • Trigger - 作業計划的組件-作業何時執行,執行次數,頻率等。
  • JobBuilder - 建造者模式創建 JobDetail實例.
  • TriggerBuilder - 建造者模式創建 Trigger 實例.
  • QuartzSchedulerThread 繼承Thread 主要的執行任務線程

從上面的幾個接口,可以看到quartz設計非常精妙,將作業和觸發器分開設計,同時調度器完成對作業的調度。
了解了幾個關鍵類和接口作用,下面我們來分析整個執行過程。

1.3 執行過程分析

1.3.1 從StdSchedulerFactory獲取scheduler


    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }
  1. cfg變量為PropertiesParser實例————是quartz的配置信息(主要是quartz.properties),如果為空,就初始化讀取quartz的配置信息。
  2. SchedulerRepository是一個HashMap,用於存儲Scheduler。如果有重名的,判斷是否已經停止,是從hashMap刪掉,否直接返回已保存實例
  3. SchedulerRepository未找到,實例化一個scheduler
private Scheduler instantiate() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        if (initException != null) {
            throw initException;
        }

        .....
        .....

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        // Get Scheduler Properties
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String schedName = cfg.getStringProperty(PROP_SCHED_INSTANCE_NAME,
                "QuartzScheduler");

        String threadName = cfg.getStringProperty(PROP_SCHED_THREAD_NAME,
                schedName + "_QuartzSchedulerThread");

        .....
        .....

        String managementRESTServiceHostAndPort = cfg.getStringProperty(MANAGEMENT_REST_SERVICE_HOST_PORT, "0.0.0.0:9889");

        Properties schedCtxtProps = cfg.getPropertyGroup(PROP_SCHED_CONTEXT_PREFIX, true);

        // If Proxying to remote scheduler, short-circuit here...
        // ~~~~~~~~~~~~~~~~~~
        if (rmiProxy) {

           ....
           ....

            schedRep.bind(remoteScheduler);

            return remoteScheduler;
        }


        // Create class load helper
        ClassLoadHelper loadHelper = null;
        try {
            loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass)
                    .newInstance();
        } catch (Exception e) {
            throw new SchedulerConfigException(
                    "Unable to instantiate class load helper class: "
                            + e.getMessage(), e);
        }
        loadHelper.initialize();

        // If Proxying to remote JMX scheduler, short-circuit here...
        // ~~~~~~~~~~~~~~~~~~
        if (jmxProxy) {
            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

           ....
           ....

            jmxScheduler.initialize();

            schedRep.bind(jmxScheduler);

            return jmxScheduler;
        }

        
        JobFactory jobFactory = null;
        if(jobFactoryClass != null) {
           ....
           ....
        }

        InstanceIdGenerator instanceIdGenerator = null;
        if(instanceIdGeneratorClass != null) {
           .....
           .....
        }

        // Get ThreadPool Properties
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
        ....
        ....

        // Get JobStore Properties
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());

        if (jsClass == null) {
        }
        ....
        ....
        ....

        // Set up any DataSources
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
        for (int i = 0; i < dsNames.length; i++) {
            PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
                    PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

            String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

            // custom connectionProvider...
           ....
           ....
           ....

        }

        // Set up any SchedulerPlugins
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
        SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
        for (int i = 0; i < pluginNames.length; i++) {
            Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
                    + pluginNames[i], true);

            ....
            ....

            plugins[i] = plugin;
        }

        // Set up any JobListeners
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        Class<?>[] strArg = new Class[] { String.class };
        String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
        JobListener[] jobListeners = new JobListener[jobListenerNames.length];
        for (int i = 0; i < jobListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
                    + jobListenerNames[i], true);

            .....
            .....

            jobListeners[i] = listener;
        }

        // Set up any TriggerListeners
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
        TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
        for (int i = 0; i < triggerListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
                    + triggerListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

           .....
           .....
           .....

            triggerListeners[i] = listener;
        }

        boolean tpInited = false;
        boolean qsInited = false;


        // Get ThreadExecutor Properties
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

        String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
        if (threadExecutorClass != null) {
            tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
           .....
           .....
        }



        // Fire everything up
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        try {
                
    
            JobRunShellFactory jrsf = null; // Create correct run-shell factory...
    
            ......
           ......
    
            schedRep.bind(scheduler);
            return scheduler;
        }
        catch(SchedulerException e) {
            shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
            throw e;
        }
        catch(RuntimeException re) {
            shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
            throw re;
        }
        catch(Error re) {
            shutdownFromInstantiateException(tp, qs, tpInited, qsInited);
            throw re;
        }
    }

instantiate()是一個比較重要的方法,主要從上一段代碼介紹的PropertiesParser獲取Scheduler配置信息

  1. 獲取Scheduler Properties
  2. 如果是rmi代理sheduler,則創建RemoteScheduler,並通過scheRep.bind放入SchedulerRepository中,返回scheduler,結束
  3. 創建class load helper 加載類提供幫助
  4. 如果是jmx scheduler 則進行對應操作,並通過scheRep.bind放入SchedulerRepository中,返回scheduler,結束

往下是本地調度,不是遠程調度,因此需要獲取和本地調度相關的信息

6.獲取線程池配置
7.獲取Job存儲設置,(分為內存存儲和數據庫存儲作業還有一個不太了解的方式,默認是內存存儲)
8.設置數據庫連接池
9.安裝Scheduler插件
10.安裝JobListener ,監聽作業啟動前,作業唄否決執行,作業已經執行,參見[JobListener.class]
11.安裝TriggerListener,監聽觸發器被觸發工作將要執行時,觸發錯過,否決工作執行,工作完成,參見[TriggerListener.class]
12.獲取ThreadExecutor配置
13.初始化JobRunShellFactory,這個工廠類很重要,后面會介紹
14.創建標准StdScheduler,返回。

1.3.2 創建JobDetail

JobDeatil包含jobDataMap和jobClass,以及一些描述,名稱等等,采用Build模式建造,jobDataMap主要存儲一些額外信息。

1.3.3 創建Trigger

Trigger主要包含了一些計划信息,詳細可參考接口

准備工作做完了,開始進行調度

1.3.3 scheduler.scheduleJob()

public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException {
        return sched.scheduleJob(jobDetail, trigger);
    }

sched是StdScheduler的一個成員,是QuartzScheduler的實例,在上面的instantiate()方法中實例化過並且用它構造StdScheduler;看一下QuartzScheduler的scheduleJob

    public Date scheduleJob(JobDetail jobDetail,
            Trigger trigger) throws SchedulerException {
        validateState();

        if (jobDetail == null) {
            throw new SchedulerException("JobDetail cannot be null");
        }
        
        if (trigger == null) {
            throw new SchedulerException("Trigger cannot be null");
        }
        
        if (jobDetail.getKey() == null) {
            throw new SchedulerException("Job's key cannot be null");
        }

        if (jobDetail.getJobClass() == null) {
            throw new SchedulerException("Job's class cannot be null");
        }
        
        OperableTrigger trig = (OperableTrigger)trigger;

        if (trigger.getJobKey() == null) {
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
            throw new SchedulerException(
                "Trigger does not reference given job!");
        }

        trig.validate();

        Calendar cal = null;
        if (trigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        }
        Date ft = trig.computeFirstFireTime(cal);

        if (ft == null) {
            throw new SchedulerException(
                    "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
        }

        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
        notifySchedulerListenersJobAdded(jobDetail);
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        notifySchedulerListenersSchduled(trigger);

        return ft;
    }

讓我們來看一下schedule()方法

  1. 一些校驗,jobDetail,trigger,jobClass判空
  2. 計算第一次執行的日期,計算日期為空,則拋出異常,job永遠不會被調度
  3. 如果上述通過,通過jobStore存儲jobDetail和trigger
  4. 通知監聽器程序:工作加入通知、
   public void notifySchedulerListenersJobAdded(JobDetail jobDetail) {
        // build a list of all scheduler listeners that are to be notified...
        List<SchedulerListener> schedListeners = buildSchedulerListenerList();

        // notify all scheduler listeners
        for(SchedulerListener sl: schedListeners) {
            try {
                sl.jobAdded(jobDetail);
            } catch (Exception e) {
                getLog().error(
                        "Error while notifying SchedulerListener of JobAdded.",
                        e);
            }
        }
    }

5.通知監聽器程序:通知正在休眠(工作可能都執行完,主線程sigLock.wait())的主執行線程,有工作加入,喚醒主線程

 protected void notifySchedulerThread(long candidateNewNextFireTime) {
        if (isSignalOnSchedulingChange()) {
            signaler.signalSchedulingChange(candidateNewNextFireTime);
        }
    }

  public void signalSchedulingChange(long candidateNewNextFireTime) {
        synchronized(sigLock) {
            signaled = true;
            signaledNextFireTime = candidateNewNextFireTime;
            sigLock.notifyAll();
        }
    }

6.通知調度器job被調度了

    public void notifySchedulerListenersSchduled(Trigger trigger) {
        // build a list of all scheduler listeners that are to be notified...
        List<SchedulerListener> schedListeners = buildSchedulerListenerList();

        // notify all scheduler listeners
        for(SchedulerListener sl: schedListeners) {
            try {
                sl.jobScheduled(trigger);
            } catch (Exception e) {
                getLog().error(
                        "Error while notifying SchedulerListener of scheduled job."
                                + "  Triger=" + trigger.getKey(), e);
            }
        }
    }

此時scheduler.scheduleJob()執行完畢

1.3.4 scheduler.start() 未執行此函數,沒有什么會真正執行,主線程loop循環一直被wait。

public void start() throws SchedulerException {
        sched.start();
    }

//sched.start()
public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();

        if (initialStart == null) {
            initialStart = new Date();
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }

        schedThread.togglePause(false);

        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        
        notifySchedulerListenersStarted();
    }

  1. 判斷調度器是否關掉或停止(因為可能開始之前已經將調度器停止或關閉),是則直接拋異常
  2. 通知監聽器,馬上就要執行
  3. 啟動一些通知jobStore和一些插件開始執行,如果initial不為空,說明曾經啟動過,則重新恢復
  4. 切換線程開始執行togglePause(false)。此時開始執行, 下面的run()方法選取了一部分,未設置開始之前,while里面還有一個while (paused && !halted.get()) 一直等待變量改變,否則,就一直wait這也就是sche.scheduleJob()未真正啟動工作的原因
 void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;

            if (paused) {
                signalSchedulingChange(0);
            } else {
                sigLock.notifyAll();
            }
        }
    }

  public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }
}

5.通知任務已經開始執行

   public void notifySchedulerListenersStarted() {
        // build a list of all scheduler listeners that are to be notified...
        List<SchedulerListener> schedListeners = buildSchedulerListenerList();

        // notify all scheduler listeners
        for(SchedulerListener sl: schedListeners) {
            try {
                sl.schedulerStarted();
            } catch (Exception e) {
                getLog().error(
                        "Error while notifying SchedulerListener of startup.",
                        e);
            }
        }
    }

至此,整個流程大約完成了。
具體執行的(run)細節看看有空再講一下。

參考文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM