通過源碼分析Java開源任務調度框架Quartz的主要流程


通過源碼分析Java開源任務調度框架Quartz的主要流程

從使用效果、調用鏈路跟蹤、E-R圖、循環調度邏輯幾個方面分析Quartz。

github項目地址: https://github.com/tanliwei/spring-quartz-cluster-sample , 補充了SQL輸出

 

系統說明:

IDE: IntelliJ

JDK:1.8

Quartz:2.2.1

 

使用效果

1.從github項目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取項目到本地,導入IDEA。

    相信讀者都有一定工作經驗,這些細節不贅述。

2.本文采用Mysql數據庫。

    請執行 resources/scripts/tables_mysql_innodb.sql

3.修改jdbc.properties中數據庫配置

 

4.通過IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat

 

 

暴露的Restful 接口 /say-hello.do 以及添加好任務后的調用效果:


 

 

添加任務

在tomcat啟動成功后,在首頁點擊“添加任務”,添加如下任務:


 

 

代碼執行邏輯在SyncJobFactory類中,從Output中可以看到執行的輸出信息,

調用鏈跟蹤的最后會回到這個類來。

 

現在開始跟蹤調用鏈路。 

 

IDEA 快捷鍵:
進入方法:  Ctrl + 鼠標左鍵
光標前進/后退: Ctrl + Shirt + 右方向鍵/左方向鍵
 
 
一、 調用鏈路跟蹤

從配置文件applicationContext.xml配置中找到任務調度核心類SchedulerFactoryBean

 resources/applicationContext.xml

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
...
</bean>

 

使用IDEA快捷鍵,點擊進入SchedulerFactoryBean類,它實現了InitializingBean接口,

在Spring中凡是實現了InitializingBean接口的Bean,都會在Bean屬性都設置完成后調用afterPropertiesSet()方法.

 SchedulerFactoryBean.java

//---------------------------------------------------------------------
// Implementation of InitializingBean interface
// 實現 InitializingBean 接口
//---------------------------------------------------------------------
public void afterPropertiesSet() throws Exception {
    //...
    // Create SchedulerFactory instance.
    // 創建 SchedulerFactory 調度器工廠實例
    SchedulerFactory schedulerFactory = (SchedulerFactory)
            BeanUtils.instantiateClass(this.schedulerFactoryClass);
    initSchedulerFactory(schedulerFactory);
    //...
    // Get Scheduler instance from SchedulerFactory.
    // 通過調度器工廠 獲取 調度器實例
    try {
        this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
    //...
}

 

 SchedulerFactoryBean.java

/**
 * Create the Scheduler instance for the given factory and scheduler name.
 * 通過制定工廠和調度器名稱創建調度器實例
 * Called by {@link #afterPropertiesSet}.
 * <p>The default implementation invokes SchedulerFactory's <code>getScheduler</code>
 * method. Can be overridden for custom Scheduler creation.
 */
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
        throws SchedulerException {
    //...
    try {
        SchedulerRepository repository = SchedulerRepository.getInstance();
        synchronized (repository) {
            Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
            Scheduler newScheduler = schedulerFactory.getScheduler();
            if (newScheduler == existingScheduler) {
                throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                        "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
            }
            //...
}

 

 這個項目走的邏輯是 StdSchedulerFactory.getScheduler()方法,可自行debug。

 StdSchedulerFactory.java

/**
 * Returns a handle to the Scheduler produced by this factory.
 * 返回該工廠創造的調度器的句柄
 */
public Scheduler getScheduler() throws SchedulerException {
    if (cfg == null) {
        initialize();
    }

    SchedulerRepository schedRep = SchedulerRepository.getInstance();

    Scheduler sched = schedRep.lookup(getSchedulerName());
    //...
    sched = instantiate();
    return sched;
}

 

StdSchedulerFactory.java

private Scheduler instantiate() throws SchedulerException {
    //...
    //大量的配置初始化、實例化代碼
    //...
    //第1298行代碼
    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    //...
}

 

QuartzScheduler.java

/**
 * Create a <code>QuartzScheduler</code> with the given configuration
 * 根據給定的配置 創建Quartz調度器
 */
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }
        //private QuartzSchedulerThread schedThread;
        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        //通過線程池執行 Quartz調度器線程
        schedThreadExecutor.execute(this.schedThread);
        //...
}

 

 QuartzSchedulerThread.java

/**
 * <p>
 * The main processing loop of the <code>QuartzSchedulerThread</code>.
 * Quartz調度器線程的主循環邏輯
 * </p>
 */
@Override
public void run() {
    //while循環執行,只要調度器為被暫停
    while(!halted.get()){
                        JobRunShell shell = null;
                        try {
                            shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                            shell.initialize(qs);
                        }
                        if (qsRsrcs.getThreadPool().runInThread(shell) == false){}

    }
}

 

 JobRunShell.java

public void run() {
        //...
        Job job = jec.getJobInstance();
        //...
        try {
            log.debug("Calling execute on job " + jobDetail.getKey());
            //執行
            job.execute(jec);
            endTime = System.currentTimeMillis();
        }
        //...
        //更新Trigger觸發器狀態,刪除FIRED_TRIGGERS觸發記錄
        instCode = trigger.executionComplete(jec, jobExEx);
        //...
}

 

QuartzJobBean.java

/**
 * This implementation applies the passed-in job data map as bean property
 * values, and delegates to <code>executeInternal</code> afterwards.
 * 這個實現 把傳入的map數據作為bean屬性值,然后委托給 executeInternal 方法
 */
public final void execute(JobExecutionContext context) throws JobExecutionException {
    try {
    //執行
 executeInternal(context);
}

 

  SyncJobFactory.java

//回到了我們的業務類SyncJobFactory的executeInternal方法,
//里面執行我們的業務代碼
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
    try {
        LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());
    }
    //...
    System.out.println("jobName:" + scheduleJob.getJobName() + "  " + scheduleJob);
    //...
}

 

 

 二、E-R圖

梳理6張主要的Quartz表:

 

 
QRTZ_TRIGGERS 觸發器表

    SCHED_NAME,調度器名稱,集群時為常量值:“ClusterScheduler”。 聯合主鍵,QRTZ_JOB_DETAILS表SCHED_NAME外鍵

    JOB_NAME,任務名。自定義值。 聯合主鍵,QRTZ_JOB_DETAILS表JOB_NAME外鍵

    JOB_GROUP,任務組。 自定義值。聯合主鍵,QRTZ_JOB_DETAILS表JOB_GROUP外鍵

    TRIGGER_STATE,觸發器狀態: WAITING , ACQUIRED, BLOCKING

    NEXT_FIRE_TIME, 下次觸發時間:

    MISFIRE_INSTR,執行失敗后的指令,

        非失敗策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1; 

        失敗策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;

    TRIGGER_TYPE, 觸發器類型,例如CRON,cron表達式類型的觸發器

    PRIORITY,優先級

 

QRTZ_CRON_TRIGGERS cron類型觸發器表

    SCHED_NAME,調度器名稱,集群時為常量值:“ClusterScheduler”。 聯合主鍵,QRTZ_TRIGGERS表SCHED_NAME外鍵

    JOB_NAME,任務名。自定義值。 聯合主鍵,QRTZ_TRIGGERS表JOB_NAME外鍵

    JOB_GROUP,任務組。 自定義值。聯合主鍵,QRTZ_TRIGGERS表JOB_GROUP外鍵

    CRON_EXPRESSION, cron表達式, 例如每30秒執行一次, 0/30 * * * * ?

 

QRTZ_JOB_DETAILS 任務詳細表

    SCHED_NAME,調度器名稱,集群時為常量值:“ClusterScheduler”。聯合主鍵

    JOB_NAME,任務名。自定義值。 聯合主鍵

    JOB_GROUP,任務組。 自定義值。聯合主鍵

    JOB_DATA,blob類型,任務參數

 

QRTZ_FIRED_TRIGGERS 任務觸發表

    SCHED_NAME,調度器名稱,集群時為常量值:“ClusterScheduler”。聯合主鍵

    ENTRY_ID,entry id,聯合主鍵

    JOB_NAME,任務名。自定義值。 

    JOB_GROUP,任務組。 自定義值。

    FIRED_TIME, 任務觸發時間

    STATE,狀態

    INSTANCE_NAME, 服務器實例名

    PRIORITY,優先級

 

QRTZ_SCHEDULER_STATE 

    SCHED_NAME,調度器名稱,集群時為常量值:“ClusterScheduler”。聯合主鍵

    INSTANCE_NAME,服務器實例名。聯合主鍵

    LAST_CHECKIN_TIME,上次檢查時間

    CHECKIN_INTERVAL,檢查間隔

 

QRTZ_LOCKS 全局鎖

    SCHED_NAME,調度器名稱,集群時為常量值:“ClusterScheduler”。聯合主鍵

    LOCK_NAME,鎖名稱,例如,TRIGGER_ACCESS。聯合主鍵

   

 

三、循環調度邏輯

    主要流程如下:

 

    源碼如下:

QuartzSchedulerThread.java

 public void run() {
        //...
        while (!halted.get()) {
            try {
                //合理休眠
                //...
                        //獲取接下來的觸發器
                        //1.狀態為WAITING
                        //2.觸發時間在30秒內
                        //3.不是錯過執行的或者錯過了但是時間不超過兩分鍾
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                       
                                //... 
                                //觸發任務
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                //...
                            JobRunShell shell = null;
                            //...
                            //執行代碼
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            //...
        } // while (!halted)
        //..
    } 

 

 JobRunShell.java

    protected QuartzScheduler qs = null;
    
    public void run() {
        qs.addInternalSchedulerListener(this);
        try {
            //...
            do {
                Job job = jec.getJobInstance();
                // execute the job
                try {
                    //執行任務代碼
                    job.execute(jec);
                //更新觸發器,刪除觸發記錄
                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                break;
            } while (true);
        } 
    //...
    }

 

 

四、擴展

 

除了對主線程 QuartzSchedulerThread 的分析

繼續分析JobStoreSupport類的兩個線程 ClusterManager 和 MisfireHandler 的分析, 它們維護觸發器的MISFIRE_INSTR狀態,和調度器狀態QRTZ_SCHEDULER_STATE。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM