冷飯新炒 | 深入Quartz核心運行機制


目錄

前言
相信大家對Quartz這框架並不陌生,日常工作經常會接觸到,我們團隊也在使用。但是我發現大家在工作中對其僅停留在簡單配置使用層面,很多時候發生問題,並不知道它問題root cause是什么,配置參數也是隨便在網上copy回來亂用,並不是基於項目實際情況。自從前幾年開始做技術管理后,工作期間也沒多少時間可以在一線擼碼,剛好趁周末時間重新把源碼看了一遍整理下,希望對大家有幫助!PS:本文基於Quartz2.3.0,不會介紹如何使用Quartz,完全沒有接觸過Quartz的朋友建議先閱讀官方文檔。

常見問題

  • Quartz的核心組件?

  • Quartz的核心運行機制?

  • Quartz的線程模型

  • Quartz集群進程間如何通信?

  • Quartz集群如何保證高並發下不重復跑?

  • Quartz如何保證不漏跑

  • Quartz默認任務鎖機制?

  • Quartz常見問題

Quartz的核心組件

JobDetail

我們創建一個實現 Job 接口的類,使用 JobBuilder包裝成 JobDetail,它可以攜帶 KV 的數據,方面用戶可以擴展自己任務要用的參數。

Trigger

定義任務的觸發規則,使用 TriggerBuilder 來構建。

為什么JobDetail和Trigger是一對多的關系

因為通常我們一個任務實際上是有多種觸發規則的,例如:我想我的跑批任務周一9點跑一次,周三5點跑一起,它實際上是屬於同一個Job,只是不同的觸發規則,這時候我們就可以定義多個Trigger組合起來用。

Set<Trigger> triggersForJob = new HashSet();
		triggersForJob.add(trigger);
		triggersForJob.add(trigger1);

// 綁定關系是1:N
scheduler.scheduleJob(jobDetail, triggersForJob,true);

常見的Tigger類型

接口 描述 特點
SimpleTrigger 簡單觸發器 SimpleTrigger 可以定義固定時刻或者固定時間間隔的調度規則(精確到毫秒)

例如:每天 9 點鍾運行;每隔 30 分鍾運行一次
CalendarIntervalTrigger 基於日歷的觸發器 CalendarIntervalTrigger 可以定義更多時間單位的調度需求,精確到秒

好處是不需要去計算時間間隔,比如 1 個小時等於多少毫秒

例如每年、每個月、每周、每天、每小時、每分鍾、每秒

每年的月數和每個月的天數不是固定的,這種情況也適用
DailyTimeIntervalTrigger 基於日期的觸發器 每天的某個時間段

例如:每天早上 9 點到晚上 9 點,每隔半個小時執行一次,並且只在周一到周六執行。
CronTrigger 基於 Cron 表達式的觸發器 可以支持任意時間(推薦) 如:0/10 * * * * ?

怎么排除掉一些日期不觸發

比較常見的需求是周末不計息、節假日不觸發郵件通知

如果要在觸發器的基礎上,排除一些時間區間不執行任務,就要用到 Quartz 的 Calendar 類(注意不是 JDK 的 Calendar)。可以按年、月、周、日、特定日期、Cron 表達式排除

使用方法

  • 調用調度器的 addCalendar()方法注冊排除規則

  • 調用 Trigger modifiedByCalendar()添加到觸發器中

//排除營業時間
scheduler.addCalendar("workingHours",new CronCalendar("* * 0-7,18-23?* *”"),false,false);

Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("trigger1", "group1")
                .startNow()
                .modifiedByCalendar("workingHours") //排除時間段
                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                        .withIntervalInSeconds(2)
                        .repeatForever())
                .build();

Scheduler

調度器,是 Quartz 的指揮官,由 StdSchedulerFactory 產生,它是單例的,並且是 Quartz 中最重要的 API,默認是實現類是 StdScheduler,里面包含了一個 QuartzSchedulerQuartzScheduler 里面又包含了一個 QuartzSchedulerThread

Scheduler 中的方法主要分為三大類:

  • 操作調度器本身,例如調度器的啟動 start()、調度器的關閉 shutdown()

  • 操作 Trigger,例如 pauseTriggers()resumeTrigger()

  • 操作 Job,例如 scheduleJob()unscheduleJob()rescheduleJob()

這些方法非常重要,可以實現任務的動態調度。

Listener

事件監聽器。Quartz框架采用觀察者模式設計,可以無入侵式地讓用戶可以收到對應的通知。提供三種類型監聽器,分別是SchedulerListener(監聽 Scheduler 的),TriggerListener(監聽 Trigger 的),JobListener(監聽 Job 的)

場景

  • 任務完成了,發郵件給對應的人。例如:跑批完成了,我想系統自動給我發一個郵件通知
  • 監控任務整個生命周期。例如:作為一個中央分布式調度器需要通過Webhook或者MQ觸發多個服務,想監控每個任務的執行情況,是否有遺漏

工具類:ListenerManager,用於添加、獲取、移除監聽器

工具類:Matcher,主要是基於 groupNamekeyName 進行匹配。

JobStore

Jobstore 用來存儲任務和觸發器相關的信息,例如所有任務的名稱、數量、狀態等等。Quartz 中有兩種存儲任務的方式,一種在在內存,一種是在數據庫。

RAMJobStore

Quartz 默認JobStoreRAMJobstore,也就是把任務和觸發器信息運行的信息存儲在內存中,用到了 HashMapTreeSetHashSet 等等數據結構。

如果程序崩潰或重啟,所有存儲在內存中的數據都會丟失。所以我們需要把這些數 據持久化到磁盤。

JDBCJobStore

JDBCJobStore 可以通過 JDBC 接口,將任務運行數據保存在數據庫中。

JDBC 的實現方式有兩種,JobStoreSupport 類的兩個子類:

  • JobStoreTX:在獨立的程序中使用,自己管理事務,不參與外部事務。

  • JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事 務時,使用它。

Quartz的核心運行機制

以上只是梳理了Quartz的核心流程,列舉了一些核心組件,通過一下幾個方法作為源碼入口:

		// Scheduler
		Scheduler scheduler = factory.getScheduler();

		// 綁定關系是1:N
		scheduler.scheduleJob(jobDetail, trigger);
		scheduler.start();

從上圖可以看到,Quartz的核心流程大致分為三個階段:

  • 獲取調度實例階段
    • 通過getScheduler 方法根據配置文件加載配置和初始化,創建線程池 ThreadPool(默認是SimpleThreadPool,用來執行Quartz調度任務),創建調度器 QuartzScheduler,創建調度線程 QuartzSchedulerThread,並將調度線程初始狀態設置為暫停狀態。
  • 綁定JobDetailTrigger階段
    • Scheduler將任務添加到JobStore中,如果是使用數據庫存儲信息,這時候會把任務持久化到Quartz核心表中,同時也會對實現JobListener的監聽者通知任務已添加
  • 啟動調度器階段
    • Scheduler會調用QuartzSchedulerStart()方法,這時候會把調度線程從暫停切為啟動狀態,通知QuartzSchedulerThread正式干活。QuartzSchedulerThread會從SimpleThreadPool查看下有多少可用工作線程,然后找JobStore去拿下一批符合條件的待觸發的Trigger任務列表,包裝成FiredTriggerBundle。通過JobRunShellFactory創建FiredTriggerBundle的執行線程實例JobRunShell,然后把JobRunShell實例交給SimpleThreadPool的工作線程去執行。SimpleThreadPool會從可用線程隊列拿出對應數量的線程,去調用JobRunShellrun()方法,此時會執行任務類的execute方法 : job.execute(JobExecutionContext context)

獲取調度實例階段

加載配置和初始化調度器

StdSchedulerFactory.getScheduler
    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
          //加載quartz.properties 配置文件
            initialize();
        }
				//調度倉庫里維護着一個HashMap<String, Scheduler>,這里使用單例是為了全局共享
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
				//實際上是從HashMap<String, Scheduler>里查找Scheduler,保證了調度器名稱必須是唯一
        Scheduler sched = schedRep.lookup(getSchedulerName());
				//如果調度器已經存在
        if (sched != null) {
            if (sched.isShutdown()) {
              //假如調度器是關閉狀態,則從調度倉庫的HashMap移除
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }
				//調度器不存在則要進行初始化
        sched = instantiate();

        return sched;
    }
StdSchedulerFactory.instantiate

對調度器進行初始化工作

private Scheduler instantiate() throws SchedulerException {
	//...省略...
    		//存儲任務信息的 JobStore
        JobStore js = null;
  			//線程池,默認是SimpleThreadPool
        ThreadPool tp = null;
  			//核心調度器
        QuartzScheduler qs = null;
  			//數據庫連接器
        DBConnectionManager dbMgr = null;
  			//ID生成器,用來自動生成唯一的instance id
        String instanceIdGeneratorClass = null;
  			//線程執行器,默認為 DefaultThreadExecutor
        ThreadExecutor threadExecutor;
  	//...省略...

創建線程池(SimpleThreadPool)

StdSchedulerFactory.instantiate

這里創建了線程池,默認是配置文件指定的SimpleThreadPool

        
				//從配置中獲取線程池類名,如果沒,默認選用SimpleThreadPool作為線程池
				String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

        if (tpClass == null) {
            initException = new SchedulerException(
                    "ThreadPool class not specified. ");
            throw initException;
        }

        try {
          //反射創建線程池
            tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' could not be instantiated.", e);
            throw initException;
        }
SimpleThreadPool

此時SimpleThreadPool在創建過程中,會初始化三個列表:

  • workers(總工作線程隊列):存放所有的工作線程
  • availWorkers(可用工作線程隊列) :存放可用於做任務的工作線程
  • busyWorkers(繁忙工作線程隊列):存放已經占用的工作線程
    private List<WorkerThread> workers;
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

初始化線程池

StdSchedulerFactory.instantiate

在該方法下面有一行對該線程池進行初始化

            if(tp instanceof SimpleThreadPool) {
                if(threadsInheritInitalizersClassLoader)
                    ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
            }
						//調用線程池初始化方法
            tp.initialize();
SimpleThreadPool.initialize

在該方法里,會開始創建工作線程(WorkerThread),用於后面的任務執行,真正執行任務的是WorkerThreadrun()方法

				//根據用戶配置文件設置的線程數,來創建對應數量的工作線程
        Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
        while(workerThreads.hasNext()) {
            WorkerThread wt = workerThreads.next();
          	//激活每個工作線程
            wt.start();
           //放在可用線程隊列等待被使用
            availWorkers.add(wt);
        }

創建核心調度器QuartzScheduler

StdSchedulerFactory.instantiate

這里創建核心調度器

//這里創建核心調度器,並且把QuartzSchedulerResources調度資源信息和idleWaitTime(調度器空閑等待的時間量)傳進去,默認30秒
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
QuartzScheduler.QuartzScheduler

創建調度器時,會對調度器的成員變量進行初始化,這里還會創建調度線程QuartzSchedulerThread,它會負責把任務分配給線程池里的工作線程執行

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
     //...省略...
				//創建調度線程,resouces 里面有線程名稱
        this.schedThread = new QuartzSchedulerThread(this, resources);
				//創建線程執行器 ,默認是DefaultThreadExecutor
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
				//這里線程執行器會調用QuartzSchedulerThread的run()方法
        schedThreadExecutor.execute(this.schedThread);
     //...省略...
    }
QuartzSchedulerThread.QuartzSchedulerThread

調度線程在實例化的時候,會把調度線程控制變量paused=ture,是把調度線程暫停處理任務,halted=false是要把調度線程開始監聽調度器控制變量paused,就是讓調度線程開始運行但是不處理任務,等待被喚醒,下一步會提到

   QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
				//...省略...
        // start the underlying thread, but put this object into the 'paused'
        // state
        // so processing doesn't start yet...
        paused = true;
        halted = new AtomicBoolean(false);
    }
QuartzSchedulerThread.run

上面提到,調度線程會被schedThreadExecutor執行,此時由於halted被設置為falsepaused設置為true,此時調度線程run()方法並不會向下處理任務,等待被激活,這里會等到后面Scheduler調用start()才會真正被激活

 public void run() {
        int acquiresFailed = 0;
				//這里!halted.get() = true,因此會向下執行
        while (!halted.get()) {
            try {
                //sigLock是調度線程內的一個成員變量,用於控制線程並發
                synchronized (sigLock) {
                   // 檢查是否為暫停狀態,此時paused && !halted.get() =false,會在這里循環等待,不會往下執行
                    while (paused && !halted.get()) {
                        try {
                            //暫停狀態時,嘗試去獲得信號鎖,使當前線程等待直到另一個線程調用,超時時間是1秒
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                        // 暫停時重置失敗計數器,這樣我們就不會取消暫停后再次等待
                        acquiresFailed = 0;
                    }
										//這里為false,因此會直接跳出循環,不會向后執行任務
                    if (halted.get()) {
                        break;
                    }
              //...省略...
            }

綁定JobDetail和Trigger階段

執行作業調度

StdScheduler.scheduleJob
   public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException {
     		//這里實際調用的是QuartzScheduler
        return sched.scheduleJob(jobDetail, trigger);
    }
QuartzScheduler.scheduleJob
    public Date scheduleJob(JobDetail jobDetail,
            Trigger trigger) throws SchedulerException {
			//...省略...
        //持久化JobDetail和trigger
        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
      	//通知scheduler監聽者
        notifySchedulerListenersJobAdded(jobDetail);
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        notifySchedulerListenersSchduled(trigger);

        return ft;
    }

啟動調度器階段

調用調度器啟動方法

StdScheduler.start

StdScheduler只是代理類,實際上還是調用QuartzScheduler

    public void start() throws SchedulerException {
      	//調用QuartzScheduler.start()方法
        sched.start();
    }

通知調度線程開始干活

QuartzScheduler.start
    public void start() throws SchedulerException {
				//...省略...
         //通知Scheduler監聽者任務開始啟動
        notifySchedulerListenersStarting();
				//第一次啟動,這里initialStart為空
        if (initialStart == null) {
            initialStart = new Date();
          	//這里將恢復任何失敗或誤觸發的作業並根據需要清理數據存儲,錯過的任務會在這里重跑
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
          	//如果initialStart不為空,意味着之前已經做過初始化,則把調度器狀態恢復成運行中
            resources.getJobStore().schedulerResumed();
        }
				//這里實際上讓調度線程QuartzSchedulerThread開始執行任務,前面有提到調度線程雖然已經激活,但是由於Pause為true,因此它沒辦法處理任務,實際處於停止狀態
        schedThread.togglePause(false);

        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        //通知Scheduler監聽者任務已經啟動
        notifySchedulerListenersStarted();
    }
QuartzSchedulerThread.togglePause
//切換暫停狀態    
void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;

            if (paused) {
               //如果暫停,這里是要中斷任何可能發生的睡眠,等待着被喚醒
                signalSchedulingChange(0);
            } else {
                //喚醒在此對象監視器上等待的所有線程。
                sigLock.notifyAll();
            }
        }
    }

調度線程正式開始執行任務

QuartzSchedulerThread.run

這里由於上面一步已經把pause切換成false,因此調度線程的run()方法可以開始處理任務

//...省略...     
                    //由於pause已經被切換成flase,這里會跳出循環,線程會往下繼續執行
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        acquiresFailed = 0;
                    }
//...省略...
    // 獲取線程池可用線程數量
    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                //可用線程數量>0才往下執行
                if(availThreadCount > 0) { 

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                      // 獲取需要下次執行的 triggers 
                      // idleWaitTime: 默認 30s 
                      // availThreadCount:獲取可用(空閑)的工作線程數量,總會大於 1,因為該方法會一直阻塞, 直到有工作線程空閑下來。 
                      // maxBatchSize:一次拉取 trigger 的最大數量,默認是 1 
                      // batchTimeWindow:時間窗口調節參數,默認是 0 
                      // misfireThreshold: 超過這個時間還未觸發的 trigger,被認為發生了 misfire,默認 60s 
                      // 調度線程一次會拉取 NEXT_FIRETIME 小於(now + idleWaitTime +batchTimeWindow),大 於(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個 triggers,默認情況下,會拉取未來 30s、 過去 60s 之間還未 fire 的 1 個 trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                      
                      //...省略...
                         // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                              
                              // 觸發 Trigger,把 ACQUIRED 狀態改成 EXECUTING 
                              // 如果這個 trigger 的 NEXTFIRETIME 為空,也就是未來不再觸發,就將其狀態改為 COMPLETE 																// 如果 trigger 不允許並發執行(即 Job 的實現類標注了@DisallowConcurrentExecution), 則將狀態變為 BLOCKED,否則就將狀態改為 WAITING
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                      //...省略...
                                continue;
                            }

                        }
            //循環處理trigger          
           for (int i = 0; i < bndles.size(); i++) {
             								//從trigger任務集合取出一個
                            TriggerFiredResult result =  bndles.get(i);
             								//把trigger任務包裝成TriggerFiredBundle
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
										//...省略...
	
                            JobRunShell shell = null;
                            try {
                              // 根據 trigger 信息實例化 JobRunShell(implements Runnable),同時依據 JOB_CLASS_NAME 實例化 Job,隨后我們將 JobRunShell 實例丟入工作線。
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
             							//調用線程池的runInThread方法,實際上是調用JobRunShell的run()方法
													if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            //...省略...
SimpleThreadPool.runInThread

這里線程池開始從可用線程隊列分配工作線程去處理JobRunShell的run()方法

public boolean runInThread(Runnable runnable) {
//...省略...
  					//假如線程沒有關閉
            if (!isShutdown) {
               //從可用工作線程隊列移除一條工作線程
                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                //把工作線程加入到繁忙工作線程隊列
                busyWorkers.add(wt);
                //執行JobRunShell的run方法
                wt.run(runnable);
            } else {
                //加入線程池准備要關閉,開啟一個線程池里沒有的新工作線程
                WorkerThread wt = new WorkerThread(this, threadGroup,
                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
               //加入到繁忙工作線程隊列
                busyWorkers.add(wt);
               //工作線程隊列加入該新工作線程
                workers.add(wt);
               //執行JobRunShell的run方法
                wt.start();
            }
//...省略...
        return true;
    }

JobRunShell 用來為 Job 提供安全的運行環境的,執行 Job 中所有的作業,捕獲運行中的異常,在任務執行完畢的

時候更新 Trigger 狀態,等等。

JobRunShell 實例是用 JobRunShellFactoryQuartzSchedulerThread 創建的,在調度器決定一個 Job 被觸發的時候,它從線程池中取出一個線程來執行任務。

Quartz線程模型

  • SimpleThreadPool:包工頭,管理所有 WorkerThread
  • WorkerThread:工人,把 Job 包裝成 JobRunShell執行
  • QuartSchedulerThread:項目經理,獲取即將觸發的 Trigger,從問包工頭拿一個空閑的 worker,執行 Trigger 綁定的任務

Quartz集群進程間如何通信

Quartz集群之間是通過數據庫幾張核心的Quartz表進行通信

表名 作用
QRTZ_BLOB_TRIGGERS Trigger 作為 Blob 類型存儲
QRTZ_CALENDARS 存儲 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS 存儲 CronTrigger,包括 Cron 表達式和時區信息
QRTZ_FIRED_TRIGGERS 存儲與已觸發的 Trigger 相關的狀態信息,以及相關 Job 的執行信息
QRTZ_JOB_DETAILS 存儲每一個已配置的 Job 的詳細信息
QRTZ_LOCKS 存儲程序的悲觀鎖的信息
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的 Trigger 組的信息
QRTZ_SCHEDULER_STATE 存儲少量的有關 Scheduler 的狀態信息,和別的 Scheduler 實例
QRTZ_SIMPLE_TRIGGERS 存儲 SimpleTrigger 的信息,包括重復次數、間隔、以及已觸的次數
QRTZ_SIMPROP_TRIGGERS 存儲 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 兩種類型的觸發器
QRTZ_TRIGGERS 存儲已配置的 Trigger 的信息

Quartz集群如何保證高並發下不重復跑

Quartz有多個節點同時在運行,而任務是共享的,這時候肯定存在資源競爭問題,容易造成並發問題,Quartz節點之間是否存在分布式鎖去控制?

Quartz是通過數據庫去作為分布式鎖來控制多進程並發問題,Quartz加鎖的地方很多,Quartz是使用悲觀鎖的方式進行加鎖,讓在各個instance操作Trigger任務期間串行,這里挑選核心的代碼來看看它是符合利用數據庫防止並發的。

使用數據庫鎖需要在quartz.properties中加以下配置,讓集群生效Quartz才會對多個instance進行並發控制

org.quartz.jobStore.isClustered = true

QRTZ_LOCKS 表,它會為每個調度器創建兩行數據,獲取 Trigger 和觸發 Trigger 是兩把鎖,加鎖入口在JobStoreSupport類中,Quartz提供的鎖表,為多個節點調度提供分布式鎖,實現分布式調度,默認有2個鎖

SCHED_NAME LOCK_NAME
Myscheduler STATE_ACCESS
Myscheduler TRIGGER_ACCESS

STATE_ACCESS主要用在scheduler定期檢查是否失效的時候,保證只有一個節點去處理已經失效的scheduler;

TRIGGER_ACCESS主要用在TRIGGER被調度的時候,保證只有一個節點去執行調度

QuartzSchedulerThread.run

調度線程在獲取下一個Trigger任務的時候,會在Quartz表加行級鎖,入口在這

//...省略...
												//
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//...省略...

JobStoreSupport.acquireNextTriggers

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
//...省略...
  //這里會進入加鎖控制,lockName是鎖的key
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                   //...省略...

JobStoreSupport.executeInNonManagedTXLock

這里會進入非托管事務,加入lockName不為空,需要先獲取鎖才能執行事務回調方法和事務校驗方法

protected <T> T executeInNonManagedTXLock(
            String lockName, 
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
            if (lockName != null) {
                //只要作為鎖的key不為空,在這里就會調用JobStoreTx獲取數據庫連接
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                //真正加鎖的入口,通過LockHandler去調用DBSemaphore操作數據庫獲取鎖
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            
//...省略...

DBSemaphore.obtainLock

這里會通過執行兩條SQL去向調用線程授予對已識別資源的鎖定(阻塞)直到可用

public boolean obtainLock(Connection conn, String lockName)
        throws LockException {

//...省略...
        //判斷當前調用線程是否對標識的資源持有鎖,加入已經持有該鎖,則直接跳過
        if (!isLockOwner(lockName)) {
            //通過調用StdRowLockSemaphore的executeSQL方法對expandedSQL, expandedInsertSQL對lockName進行加鎖控制
            executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
            
//...省略...
    }

StdRowLockSemaphore.executeSQL

如果已經有lockName代表的行,直接加鎖,如果沒有插入。但是在加鎖時或插入時有可能失敗,失敗則重試,重試如果超過一定次數就會直接拋出異常。這里是使用悲觀鎖的方式進行加鎖

 protected void executeSQL(Connection conn, final String lockName, final String expandedSQL, final String expandedInsertSQL) throws LockException {
//...省略...
                ps = conn.prepareStatement(expandedSQL);
                //...省略...
  							ps.setString(1, lockName);
                //先執行查詢,看看表里是否已經有該存在
                rs = ps.executeQuery();

//...省略...
             //  如果查詢結果不為空
              if (!rs.next()) {
              ps.setString(1, lockName);
    					//
               int res = ps.executeUpdate();
                  //...省略...
                
                return; // obtained lock, go
           
    }

這兩條SQL是在DBSemaphore初始化的時候塞進來的

    public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
        this.tablePrefix = tablePrefix;
        this.schedName = schedName;
        setSQL(defaultSQL);
        setInsertSQL(defaultInsertSQL);
    }

再看看調用鏈會發現,這兩條SQL是在StdRowLockSemaphore初始化的時候調用父類DBSemaphore構造方法傳進來,分別是selectWithLockSQLSELECT_FOR_LOCK

    public StdRowLockSemaphore(String tablePrefix, String schedName, String selectWithLockSQL) {
        super(tablePrefix, schedName, selectWithLockSQL != null ? selectWithLockSQL : SELECT_FOR_LOCK, INSERT_LOCK);
    }

兩條SQL分別是:

    public static final String SELECT_FOR_LOCK = "SELECT * FROM "
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
            + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

    public static final String INSERT_LOCK = "INSERT INTO "
        + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" 
        + SCHED_NAME_SUBST + ", ?)"; 

把參數替換進去就比較清晰可以看到,Quartz通過在qrtz_LOCKS表對當前schedule job 加兩個行級鎖

expandedSQL:select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
expandedInsertSQL:INSERT INTO qrtz_LOCKS(SCHED_NAME, LOCK_NAME) VALUES ('MySchedule', 'TRIGGER_ACCESS')

Quartz集群如何保證高並發下不漏跑

有時候Quartz可能會錯過我們的調度任務:

  • 服務重啟,沒能及時執行任務,就會misfire
  • 工作線程去運行優先級更高的任務,就會misfire
  • 任務的上一次運行還沒結束,下一次觸發時間到達,就會misfire

Quartz可提供了一些補償機制應對misfire情況,用戶可以根據需要選擇對應的策略,這里挑選常用的cronTrigger作為示例

  • withMisfireHandlingInstructionDoNothing

    • 不觸發立即執行
    • 等待下次Cron觸發頻率到達時刻開始按照Cron頻率依次執行
  • withMisfireHandlingInstructionIgnoreMisfires

    • 以錯過的第一個頻率時間立刻開始執行
    • 重做錯過的所有頻率周期后當下一次觸發頻率發生時間大於當前時間后,再按照正常的Cron頻率依次執行
  • withMisfireHandlingInstructionFireAndProceed(默認)

    • 以當前時間為觸發頻率立刻觸發一次執行,然后按照Cron頻率依次執行

假如用戶沒有設置Misfire指令,Quartz默認指定MISFIRE_INSTRUCTION_SMART_POLICY作為默認策略,在Trigger接口的getMisfireInstruction源碼可以看到:

/**
     * Get the instruction the <code>Scheduler</code> should be given for
     * handling misfire situations for this <code>Trigger</code>- the
     * concrete <code>Trigger</code> type that you are using will have
     * defined a set of additional <code>MISFIRE_INSTRUCTION_XXX</code>
     * constants that may be set as this property's value.
     * 
     * <p>
     * If not explicitly set, the default value is <code>MISFIRE_INSTRUCTION_SMART_POLICY</code>.
     * </p>
     * 
     * @see #MISFIRE_INSTRUCTION_SMART_POLICY
     * @see SimpleTrigger
     * @see CronTrigger
     */
    public int getMisfireInstruction();

這里繼續以CronTrigger舉例,其他類型Trigger也類似 。如果是默認策略MISFIRE_INSTRUCTION_SMART_POLICY,在CronTrigger會選用MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,該策略的特點是立刻執行一次,然后后面的任務就按照正常的計划執行。

   @Override
    public void updateAfterMisfire(org.quartz.Calendar cal) {
        int instr = getMisfireInstruction();

        if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
            return;

        if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
            instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
        }

        if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
            Date newFireTime = getFireTimeAfter(new Date());
            while (newFireTime != null && cal != null
                    && !cal.isTimeIncluded(newFireTime.getTime())) {
                newFireTime = getFireTimeAfter(newFireTime);
            }
            setNextFireTime(newFireTime);
        } else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
            setNextFireTime(new Date());
        }
    }

Quartz對於misfire任務大致處理流程

  • QuartzScheduler.start()啟動調度

  • JobStoreSupport.schedulerStarted()執行啟動調度方法

  • 創建和初始化misfireHandler

  • 異步執行misfireHandler.run方法處理misfire任務

  • MisfileHandler通過JobStoreSupport去查詢有沒有misfire的任務,查詢條件是當前狀態是waiting下一次trigger時間< 當前時間-misfire預設閾值(默認1分鍾)

    int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
                   getDelegate().countMisfiredTriggersInState(
                       conn, STATE_WAITING, getMisfireTime()) : 
                   Integer.MAX_VALUE;
    String COUNT_MISFIRED_TRIGGERS_IN_STATE = "SELECT COUNT("
           + COL_TRIGGER_NAME + ") FROM "
           + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
           + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND NOT ("
           + COL_MISFIRE_INSTRUCTION + " = " + Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY + ") AND " 
           + COL_NEXT_FIRE_TIME + " < ? " 
           + "AND " + COL_TRIGGER_STATE + " = ?";
           
  
    protected long getMisfireTime() {
    
           long misfireTime = System.currentTimeMillis();
           if (getMisfireThreshold() > 0) {
             //當前時間減去misfire預設閾值,閾值默認一分鍾
             
               misfireTime -= getMisfireThreshold();
           }
      
           return (misfireTime > 0) ? misfireTime : 0;
       }
      
  • JobStoreSupport通過StdRowLockSemaphore 去獲取TRIGGER_ACCESS

  • 查詢所有misfire任務,查詢條件:status=waiting,current_time-next_fire_time>misfireThreshold(可配置,默認1分鍾)【即實際觸發時間-預計觸發時間大於容忍度時間】,獲取misfired的trigger,maxToRecoverAtATime默認一個事務中只能最大有20個misfired trigger(可配置)

  • 通過updateAfterMisfired方法獲取misfired的策略(默認是MISFIRE_INSTRUCTION_SMART_POLICY該策略在CronTrigger中為MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根據策略設置nexFireTime

  • nextFireTime等更新或者插入到trigger表;

  • 提交事務,釋放鎖

Quartz默認任務鎖機制

Quartz是否一定會加鎖?什么情況下不會加鎖?應該怎么避免並發問題?

什么情況下不會加鎖?

回到JobStoreSupportacquireNextTriggers()方法,可以看到當isAcquireTriggersWithinLock()true或者maxCount>1才會加鎖,否則lockName為空

   public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    //..省略..
                    }
                });
    }
    protected <T> T executeInNonManagedTXLock(
            String lockName, 
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
            if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection 
                // until after acquiring the lock since it isn't needed.
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            //..省略...
    }

Quartz 加鎖的條件有以下兩個:

  • 如 果 acquireTriggersWithinLock=true 或 者 batchTriggerAcquisitionMaxCount>1 時 , lockName 賦 值 為

LOCK_TRIGGER_ACCESS,此時獲取 Trigger 會加鎖。

  • 否則,如果 isAcquireTriggersWithinLock()值是 false 並且 maxCount=1 的話,lockName 賦值為 null,這種情況獲取 Trigger 下不加鎖。

那這兩個參數的默認值是什么?

acquireTriggersWithinLock 變量默認是 false

private boolean acquireTriggersWithinLock = false;

maxCount 來自 QuartzSchedulerThread

triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

getMaxBatchSize()來自 QuartzSchedulerResources,代表 Scheduler 一次拉取

trigger 的最大數量,默認是 1

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1

什么情況下需要加鎖?

QuartzSchedulerThreadtriggersFired()方法

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

調用了 JobStoreSupporttriggersFired()方法,接着又調用了triggerFired(Connection conn, OperableTrigger trigger)方法:

  public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
        return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
                new TransactionCallback<List<TriggerFiredResult>>() {
                    public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                        List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();

                        TriggerFiredResult result;
                        for (OperableTrigger trigger : triggers) {
                            try {
                              //觸發
                              TriggerFiredBundle bundle = triggerFired(conn, trigger);
                              result = new TriggerFiredResult(bundle);
            //...省略...
 protected TriggerFiredBundle triggerFired(Connection conn,
            OperableTrigger trigger)
        throws JobPersistenceException {
        JobDetail job;
        Calendar cal = null;

        // Make sure trigger wasn't deleted, paused, or completed...
        try { // if trigger was deleted, state will be STATE_DELETED
            String state = getDelegate().selectTriggerState(conn,
                    trigger.getKey());
            if (!state.equals(STATE_ACQUIRED)) {
                return null;
            }
//...省略...

如果 Trigger 的狀態不是 ACQUIRED,也就是說被其他的線程 fire 了,返回空。但是這種樂觀鎖的檢查在高並發下難免會出現 ABA 的問題,比如線程 A 拿到的時候還是 ACQUIRED 狀態,但是剛准備執行的時候已經變成了 EXECUTING 狀態,這個時候就會 出現重復執行的問題。

把執行步驟拆解下,比較容易看到該問題:

推薦

如果設置的數量為 1(默認值),並且使用 JDBC JobStore(RAMJobStore 不支持 分 布 式 , 只 有 一 個 調 度 器 實 例 , 所 以 不 加 鎖 ) , 則 屬 性 org.quartz.jobStore.acquireTriggersWithinLock 應設置為 true。否則不加鎖可能會導致任務重復執行。

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1 org.quartz.jobStore.acquireTriggersWithinLock=true

Quartz常見問題

服務器始終不一致問題

常見異常:

This scheduler instance (SchedulerName) is still active but was recovered by another instance in the cluster

解決:

同步所有集群節點的時間然后重啟服務

Quartz集群負載不均衡

Quartz集群是采用搶占式加鎖方式去處理任務,因此你會看到每個節點的任務處理日志並不是均衡分配的,很可能一個節點會搶占大量任務導致負載過重,但是這一點官方並沒有解決。

錯過預定觸發時間

常見異常:

Handling 1 trigger(s) that missed their scheduled fire-time

解決:

很可能是你線程數設置太少,而任務執行時間太長,超過的misfire閾值,導致線程池沒有可用線程而錯過了觸發事件。嘗試把配置文件線程數調大org.quartz.threadPool.threadCount 或者把misfire閾值調大org.quartz.jobStore.misfireThreshold


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM