Quartz的線程池解析


【org.quartz.core相關類圖】

可以看到核心類為QuartzScheduler

【QuartzScheduler構造函數】

復制代碼
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
    throws SchedulerException {
    this.resources = resources;
    if (resources.getJobStore() instanceof JobListener) {
        addInternalJobListener((JobListener)resources.getJobStore());
    }
</span><span style="color: #ff0000;">this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
</span><span style="color: #0000ff;">if</span> (idleWaitTime &gt; 0<span style="color: #000000;">) {
    </span><span style="color: #0000ff;">this</span><span style="color: #000000;">.schedThread.setIdleWaitTime(idleWaitTime);
}

jobMgr </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> ExecutingJobsManager();
addInternalJobListener(jobMgr);
errLogger </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> ErrorLogger();
addInternalSchedulerListener(errLogger);

signaler </span>= <span style="color: #0000ff;">new</span> SchedulerSignalerImpl(<span style="color: #0000ff;">this</span>, <span style="color: #0000ff;">this</span><span style="color: #000000;">.schedThread);

</span><span style="color: #0000ff;">if</span><span style="color: #000000;">(shouldRunUpdateCheck()) 
    updateTimer </span>=<span style="color: #000000;"> scheduleUpdateCheck();
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">
    updateTimer </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;

getLog().info(</span>"Quartz Scheduler v." + getVersion() + " created."<span style="color: #000000;">);

}

復制代碼

 這里創建了一個QuartzSchedulerThread並在ThreadExecutor(默認DefaultThreadExecutor )中運行。這里的ThreadExecutor並非我們關心的,繼續看QuartzSchedulerThread的run方法。

【QuartzSchedulerThread#run()】

run方法比較長,大意就是根據Trigger找出要運行的job然后在線程池中執行。下面是run方法的代碼片段:

復制代碼
JobRunShell shell = null;
try {
    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    shell.initialize(qs);
} catch (SchedulerException se) {
    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
    continue;
}

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

復制代碼

這里的ThreadPool必須實現org.quartz.spi.ThreadPool接口。

【ThreadPool實現類】

ThreadPool的默認實現類是org.quartz.simpl.SimpleThreadPool。

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

(from org.quartz.impl.StdSchedulerFactory#instantiate())

這個線程池非常簡單,都沒有BlockingQueue:

private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); // SimpleThreadPool成員
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); // SimpleThreadPool成員

 

如果與Spring集成,使用org.springframework.scheduling.quartz.SchedulerFactoryBean且配置了taskExecutor(java.util.concurrent.Executor的實現類),則會使用org.springframework.scheduling.quartz.LocalTaskExecutorThreadPool。

if (this.taskExecutor != null) {
    mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,
            LocalTaskExecutorThreadPool.class.getName());
}

(from org.springframework.scheduling.quartz.SchedulerFactoryBean#initSchedulerFactory)

 

也可以自定義實現類,並為quartzProperties配置org.quartz.threadPool.class參數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM