Quartz學習筆記:集群部署&高可用
集群部署
一個Quartz集群中的每個節點是一個獨立的Quartz應用,它又管理着其他的節點。這就意味着你必須對每個節點分別啟動或停止。Quartz集群中,獨立的Quartz節點並不與另一其的節點或是管理節點通信,而是通過同一個數據庫表來感知到另一Quartz應用的。
因為Quartz集群依賴於數據庫,所以必須首先創建Quartz數據庫表,Quartz發布包中包括了所有被支持的數據庫平台的SQL腳本。這些SQL腳本存放於<quartz_home>/docs/dbTables 目錄下。不同版本,表個數可能不同。
集群配置
創建完表結構之后,我們需要配置Quartz,以讓其感知自己是集群的一份子。定義quartz.properties
配置文件默認放在應用classpath路徑下,其他路徑只能自己手動加載properties。下面是集群配置參考:
#集群中應用采用相同的Scheduler實例 org.quartz.scheduler.instanceName: wenqyScheduler #集群節點的ID必須唯一,可由quartz自動生成 org.quartz.scheduler.instanceId: AUTO #通知Scheduler實例要它參與到一個集群當中 org.quartz.jobStore.isClustered: true #需持久化存儲 org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate #數據源 org.quartz.jobStore.dataSource=myDS #quartz表前綴 org.quartz.jobStore.tablePrefix=QRTZ_ #數據源配置 org.quartz.dataSource.myDS.driver: com.mysql.jdbc.Driver org.quartz.dataSource.myDS.URL: jdbc:mysql://localhost:3306/ncdb org.quartz.dataSource.myDS.user: root org.quartz.dataSource.myDS.password: 123456 org.quartz.dataSource.myDS.maxConnections: 5 org.quartz.dataSource.myDS.validationQuery: select 0
同一集群下,instanceName必須相同,instanceId可自動生成,isClustered為true,持久化存儲,指定數據庫類型對應的驅動類和數據源連接。
集群高可用
數據行鎖避免重復執行
Quartz究竟是如何保證集群情況下trgger處理的信息同步?
下面跟着源碼一步一步分析,QuartzSchedulerThread包含有決定何時下一個Job將被觸發的處理循環,主要邏輯在其run()方法中:
public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { ...... int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { ...... //調度器在trigger隊列中尋找30秒內一定數目的trigger(需要保證集群節點的系統時間一致) triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); ...... //觸發trigger List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); ...... //釋放trigger for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } } }
由此可知,QuartzScheduler調度線程不斷獲取trigger,觸發trigger,釋放trigger。下面分析trigger的獲取過程,qsRsrcs.getJobStore()返回對象是JobStore,集群環境配置如下:
<!-- JobStore 配置 --> <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
JobStoreTX繼承自JobStoreSupport,而JobStoreSupport的acquireNextTriggers、triggersFired、releaseAcquiredTrigger方法負責具體trigger相關操作,都必須獲得TRIGGER_ACCESS鎖。核心邏輯在executeInNonManagedTXLock方法中:
protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } //獲取鎖 transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(lockName, transOwner); //釋放鎖 } finally { cleanupConnection(conn); } } }
一個調度器實例在執行涉及到分布式問題的數據庫操作前,首先要獲取QUARTZ_LOCKS表中對應的行級鎖,獲取鎖后即可執行其他表中的數據庫操作,隨着操作事務的提交,行級鎖被釋放,供其他調度實例獲取。集群中的每一個調度器實例都遵循這樣一種嚴格的操作規程。
getLockHandler()方法返回的對象類型是Semaphore,獲取鎖和釋放鎖的具體邏輯由該對象子類StdRowLockSemaphore 維護。從中很容易看出核心的鎖機制是數據庫鎖。
public class StdRowLockSemaphore extends DBSemaphore { public static final String SELECT_FOR_LOCK = "SELECT * FROM {0}LOCKS WHERE SCHED_NAME = {1} AND LOCK_NAME = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO {0}LOCKS(SCHED_NAME, LOCK_NAME) VALUES ({1}, ?)"; ... }
關於更多數據庫鎖的資料還請查看我得另一篇文章:https://www.cnblogs.com/MrSaver/p/11917345.html
故障切換
當集群一個節點在執行一個或多個作業期間失敗時發生故障切換(Fail Over
)。當節點出現故障時,其他節點會檢測到該狀況並識別數據庫中在故障節點內正在進行的作業。任何標記為恢復的作業(在JobDetail上都具有“請求恢復(requests recovery
)”屬性)將被剩余的節點重新執行,已達到失效任務 轉移。沒有標記為恢復的作業將在下一次相關的Triggers觸發時簡單地被釋放以執行。
1、每個節點Scheduler實例由集群管理線程ClusterManager周期性(配置文件中檢測周期屬性clusterCheckinInterval
默認值是 15000 (即15 秒))定時檢測CHECKIN數據庫,遍歷集群各兄弟節點的實例狀態,檢測集群各個兄弟節點的健康情況。
2、當集群中一個節點的Scheduler實例執行CHECKIN時,它會查看是否有其他節點的Scheduler實例在到達它們所預期的時間還未CHECKIN。若檢測到有節點在預期時間未CHECKIN,則認為該節點故障。判斷節點是否故障與節點Scheduler實例最后CHECKIN的時間有關,而判斷條件:
LAST_CHECKIN_TIME + Max(檢測周期,檢測節點現在距上次最后CHECKIN的時間) + 7500ms < currentTime。
3、集群管理線程檢測到故障節點,就會更新觸發器狀態,狀態更新如下:
4、集群管理線程刪除故障節點的實例狀態(qrtz_scheduler_state
表),即重置了所有故障節點觸發任務一般。原先故障任務和正常任務一樣就交由調度處理線程處理了。
負載均衡
負載平衡自動發生,群集的每個節點都盡可能快地觸發Jobs。當Triggers的觸發時間發生時,獲取它的第一個節點(通過在其上放置一個鎖定)是將觸發它的節點。 哪個節點運行它或多或少是隨機的。
集群下任務的調度存在一定的隨機性,誰先擁有觸發器行鎖TRIGGER_ACCESS,誰就先可能觸發任務。當某一個機子的調度線程拿到該鎖(別的機子只能等待)時:
1、 acquireNextTriggers
獲取待觸發隊列,查詢Trigger表的判斷條件:
NEXT_FIRE_TIME < now + idleWaitTime + timeWindow and TRIGGER_STATE = 'WAITING
然后更新觸發器狀態為ACQUIRE。
2、觸發待觸發隊列,修改 Trigger 表中的 NEXT_FIRE_TIME 字段,也就是下次觸發時間,計算下次觸發時間的方法與具體的觸發器實現有關,如Cron表達式觸發器,計算觸發時間與Cron表達式有關。觸發待觸發隊列后及時釋放觸發器行鎖。
3、這樣,別的機子拿到該鎖,也查詢 Trigger 表,但是由於任務觸發器的下次觸發時間或者狀態已經修改,所以不會被查找出來。這時拿到的任務就可能是別的觸發任務。這樣就實現了多個節點的應用在某一時刻對任務只進行一次調度。對於重復任務每次都不一定是相同的節點,它或多或少會隨機節點運行它。
參考資料
- http://wenqy.com/2018/05/05/quartz%e7%ae%a1%e4%b8%ad%e7%aa%a5%e8%b1%b9%e4%b9%8b%e9%9b%86%e7%be%a4%e9%ab%98%e5%8f%af%e7%94%a8.html
- http://wenqy.com/2018/04/03/quartz%e7%ae%a1%e4%b8%ad%e7%aa%a5%e8%b1%b9%e4%b9%8b%e9%9b%86%e7%be%a4%e7%ae%a1%e7%90%86.html
- https://tech.meituan.com/2014/08/31/mt-crm-quartz.html