quartz源碼解析(一)


本文的起因源於一次quartz的異常,在win2003正常運行的程序放在linux環境就拋出異常了,雖然找出異常沒花我多長時間,不過由此加深了對quzrtz的了解;古人說,三折肱,為良醫,說明經驗對於我們平時開發的重要。

quartz是一個任務調度框架,對於開發者而言通常是透明的,如果不熟悉內部機制,碰到問題往往會束手無策;接下來本人本着開放的精神,來闡述本人對quartz的理解。

本人是采用spring對quartz封裝的實現,spring的org.springframework.scheduling.quartz.SchedulerFactoryBean類用於初始化Scheduler對象並啟動Scheduler對象主線程(通過實現spring的InitializingBean接口和SmartLifecycle接口)

Scheduler對象的初始化在SchedulerFactoryBean的void afterPropertiesSet()方法

步驟:

1 創建SchedulerFactory對象並初始化

2 通過第一步創建Scheduler工廠對象創建scheduler對象並初始化

3 添加配置文件中的相關監聽器和觸發器等

//---------------------------------------------------------------------    
    // Implementation of InitializingBean interface
    //---------------------------------------------------------------------

    public void afterPropertiesSet() throws Exception {
        //這里省略部分代碼
        // Create SchedulerFactory instance.
        SchedulerFactory schedulerFactory = (SchedulerFactory)
                BeanUtils.instantiateClass(this.schedulerFactoryClass);
        //初始化配置屬性
        initSchedulerFactory(schedulerFactory);

         //這里省略部分代碼

        // Get Scheduler instance from SchedulerFactory.
        try {
            //實例化scheduler對象
            this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
            //初始化scheduler對象的上下文
            populateSchedulerContext();

             //這里省略部分代碼
        }

        finally {
           //釋放資源
        }

        registerListeners();
        registerJobsAndTriggers();
    }

步驟一用於創建和初始化Scheduler工廠(SchedulerFactory這里默認為Class<?> schedulerFactoryClass = StdSchedulerFactory.class)

initSchedulerFactory(schedulerFactory)方法用於初始化StdSchedulerFactory的配置屬性(這些屬性用於下一步創建Scheduler對象)

/**
     * 初始化配置屬性
     * Load and/or apply Quartz properties to the given SchedulerFactory.
     * @param schedulerFactory the SchedulerFactory to initialize
     */
    private void initSchedulerFactory(SchedulerFactory schedulerFactory)
            throws SchedulerException, IOException {

        //這里省略部分代碼

        Properties mergedProps = new Properties();

        if (this.resourceLoader != null) {
            mergedProps.setProperty(StdSchedulerFactory.PROP_SCHED_CLASS_LOAD_HELPER_CLASS,
                    ResourceLoaderClassLoadHelper.class.getName());
        }

        if (this.taskExecutor != null) {
            mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,
                    LocalTaskExecutorThreadPool.class.getName());
        }
        else {
            // Set necessary default properties here, as Quartz will not apply
            // its default configuration when explicitly given properties.
            mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
            mergedProps.setProperty(PROP_THREAD_COUNT, Integer.toString(DEFAULT_THREAD_COUNT));
        }
        
        if (this.configLocation != null) {
            if (logger.isInfoEnabled()) {
                logger.info("Loading Quartz config from [" + this.configLocation + "]");
            }
            PropertiesLoaderUtils.fillProperties(mergedProps, this.configLocation);
        }

        CollectionUtils.mergePropertiesIntoMap(this.quartzProperties, mergedProps);
        

        if (this.dataSource != null) {
            mergedProps.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, LocalDataSourceJobStore.class.getName());
        }

        // Make sure to set the scheduler name as configured in the Spring configuration.
        if (this.schedulerName != null) {
            mergedProps.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME, this.schedulerName);
        }        

        ((StdSchedulerFactory) schedulerFactory).initialize(mergedProps);
    }

步驟二用於創建scheduler對象並初始化,其中創建scheduler對象方法如下

protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
            throws SchedulerException {

        // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
        Thread currentThread = Thread.currentThread();
        ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
        boolean overrideClassLoader = (this.resourceLoader != null &&
                !this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
        if (overrideClassLoader) {
            currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
        }
        try {
            SchedulerRepository repository = SchedulerRepository.getInstance();
            synchronized (repository) {
                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
                Scheduler newScheduler = schedulerFactory.getScheduler();
                if (newScheduler == existingScheduler) {
                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
                }
                if (!this.exposeSchedulerInRepository) {
                    // Need to remove it in this case, since Quartz shares the Scheduler instance by default!
                    SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
                }
                return newScheduler;
            }
        }
        finally {
            if (overrideClassLoader) {
                // Reset original thread context ClassLoader.
                currentThread.setContextClassLoader(threadContextClassLoader);
            }
        }
    }

該方法里面首先設置當前線程的classload對象,然后調用schedulerFactory.工廠實例化scheduler對象

 /**
     * <p>
     * Returns a handle to the Scheduler produced by this factory.
     * </p>
     *
     * <p>
     * If one of the <code>initialize</code> methods has not be previously
     * called, then the default (no-arg) <code>initialize()</code> method
     * will be called by this method.
     * </p>
     */
    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }

方法Scheduler instantiate()實例化Scheduler對象,這個方法很長很長,其中關鍵代碼如下

        // Get ThreadPool Properties
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        //實例化線程池對象
        String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, null);

        if (tpClass == null) {
            initException = new SchedulerException(
                    "ThreadPool class not specified. ",
                    SchedulerException.ERR_BAD_CONFIGURATION);
            throw initException;
        }

        try {
            tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' could not be instantiated.", e);
            initException
                    .setErrorCode(SchedulerException.ERR_BAD_CONFIGURATION);
            throw initException;
        }
        tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
        try {
            setBeanProps(tp, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' props could not be configured.", e);
            initException
                    .setErrorCode(SchedulerException.ERR_BAD_CONFIGURATION);
            throw initException;
        }

        // Get JobStore Properties
        // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        //實例化JobStore對象
        String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());

        if (jsClass == null) {
            initException = new SchedulerException(
                    "JobStore class not specified. ",
                    SchedulerException.ERR_BAD_CONFIGURATION);
            throw initException;
        }

        try {
            js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' could not be instantiated.", e);
            initException
                    .setErrorCode(SchedulerException.ERR_BAD_CONFIGURATION);
            throw initException;
        }

        SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
        try {
            setBeanProps(js, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' props could not be configured.", e);
            initException
                    .setErrorCode(SchedulerException.ERR_BAD_CONFIGURATION);
            throw initException;
        }
             //初始化DBConnectionManager dbMgr

                try {
                        PoolingConnectionProvider cp = new PoolingConnectionProvider(
                                dsDriver, dsURL, dsUser, dsPass, dsCnt,
                                dsValidation);
                        dbMgr = DBConnectionManager.getInstance();
                        dbMgr.addConnectionProvider(dsNames[i], cp);
                    } catch (SQLException sqle) {
                        initException = new SchedulerException(
                                "Could not initialize DataSource: " + dsNames[i],
                                sqle);
                        throw initException;
                    }

            //創建QuartzSchedulerResources rsrcs對象
            QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
            rsrcs.setName(schedName);
            rsrcs.setThreadName(threadName);
            rsrcs.setInstanceId(schedInstId);
            //設置JobRunShellFactory
            rsrcs.setJobRunShellFactory(jrsf);
            rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
            rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
            rsrcs.setRunUpdateCheck(!skipUpdateCheck);
            rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
            rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
            rsrcs.setJMXExport(jmxExport);
            rsrcs.setJMXObjectName(jmxObjectName);
    
            
            SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);
            //設置ThreadPool
            rsrcs.setThreadPool(tp);
            //創建並啟動工作線程
            tp.initialize();
            tpInited = true;
            //設置JobStore
            rsrcs.setJobStore(js);
    
            schedCtxt = new SchedulingContext();
            schedCtxt.setInstanceId(rsrcs.getInstanceId());
    
            qs = new QuartzScheduler(rsrcs, schedCtxt, idleWaitTime, dbFailureRetry);
            qsInited = true;
    
            // Create Scheduler ref...
            Scheduler scheduler = instantiate(rsrcs, qs);

            // fire up job store, and runshell factory
    
            js.setInstanceId(schedInstId);
            js.setInstanceName(schedName);
            js.initialize(loadHelper, qs.getSchedulerSignaler());
            
            jrsf.initialize(scheduler, schedCtxt);
            
            qs.initialize();    
        
    
            // prevents the repository from being garbage collected
            qs.addNoGCObject(schedRep);
            // prevents the db manager from being garbage collected
            if (dbMgr != null) {
                qs.addNoGCObject(dbMgr);
            }
    
            schedRep.bind(scheduler);

該方法最終通過調用Scheduler instantiate(QuartzSchedulerResources rsrcs, QuartzScheduler qs)方法創建對象

protected Scheduler instantiate(QuartzSchedulerResources rsrcs, QuartzScheduler qs) {
        SchedulingContext schedCtxt = new SchedulingContext();
        schedCtxt.setInstanceId(rsrcs.getInstanceId());

        Scheduler scheduler = new StdScheduler(qs, schedCtxt);
        return scheduler;
    }

通過StdScheduler的構造函數傳入QuartzScheduler sched, SchedulingContext schedCtxt參數對象

這里關鍵是QuartzScheduler sched參數對象是作為StdScheduler對象的代理身份的,我們調用的StdScheduler對象方法都是間接執行QuartzScheduler sched的相關方法

(修正:StdScheduler對象為QuartzScheduler sched參數對象代理,QuartzScheduler sched參數對象為真實對象)

QuartzScheduler對象的構造方法QuartzSchedulerResources resources參數保存了相關資源的引用,具體QuartzScheduler對象具體邏輯下文再分析

現在回到SchedulerFactoryBean的void afterPropertiesSet()方法,populateSchedulerContext()用於初始化scheduler對象的上下文,這里是保存相關對象的引用,在任務執行方法里面方便調用相關引用對象的方法

/**
     * Expose the specified context attributes and/or the current
     * ApplicationContext in the Quartz SchedulerContext.
     */
    private void populateSchedulerContext() throws SchedulerException {
        // Put specified objects into Scheduler context.
        if (this.schedulerContextMap != null) {
            this.scheduler.getContext().putAll(this.schedulerContextMap);
        }

        // Register ApplicationContext in Scheduler context.
        if (this.applicationContextSchedulerContextKey != null) {
            if (this.applicationContext == null) {
                throw new IllegalStateException(
                    "SchedulerFactoryBean needs to be set up in an ApplicationContext " +
                    "to be able to handle an 'applicationContextSchedulerContextKey'");
            }
            this.scheduler.getContext().put(this.applicationContextSchedulerContextKey, this.applicationContext);
        }
    }

步驟三添加監聽器和觸發器,這里不具體分析在創建創建Scheduler對象后,接下來就是調用它的啟動方法(主線程方法)了

/**
     * Start the Quartz Scheduler, respecting the "startupDelay" setting.
     * @param scheduler the Scheduler to start
     * @param startupDelay the number of seconds to wait before starting
     * the Scheduler asynchronously
     */
    protected void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException {
        if (startupDelay <= 0) {
            logger.info("Starting Quartz Scheduler now");
            scheduler.start();
        }
        else {
            if (logger.isInfoEnabled()) {
                logger.info("Will start Quartz Scheduler [" + scheduler.getSchedulerName() +
                        "] in " + startupDelay + " seconds");
            }
            Thread schedulerThread = new Thread() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(startupDelay * 1000);
                    }
                    catch (InterruptedException ex) {
                        // simply proceed
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Starting Quartz Scheduler now, after delay of " + startupDelay + " seconds");
                    }
                    try {
                        scheduler.start();
                    }
                    catch (SchedulerException ex) {
                        throw new SchedulingException("Could not start Quartz Scheduler after delay", ex);
                    }
                }
            };
            schedulerThread.setName("Quartz Scheduler [" + scheduler.getSchedulerName() + "]");
            schedulerThread.setDaemon(true);
            schedulerThread.start();
        }
    }

首先是延遲指定時間,然后在線程對象里面調用主線程方法

--------------------------------------------------------------------------- 

本系列quartz源碼解析系本人原創

作者 博客園 刺蝟的溫馴 

郵箱 chenying998179(爬蟲繞道)163.com

本文鏈接 http://www.cnblogs.com/chenying99/p/3151850.html

本文版權歸作者所有,未經作者同意,嚴禁轉載及用作商業傳播,否則將追究法律責任。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM