http://blog.itpub.NET/11627468/viewspace-1764753/
一、quartz數據庫鎖
其中,QRTZ_LOCKS就是Quartz集群實現同步機制的行鎖表,其表結構如下:
注:此表結構在2.2版本有新增字段,這里暫時不考慮。
可以看出QRTZ_LOCKS中有5條記錄,代表5把鎖,分別用於實現多個Quartz Node對Job、Trigger、Calendar訪問的同步控制。
關於行鎖的機制:
1、mysql > set autocommit=0; //先把mysql設置為不自動提交。
2、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ; //線程一通過for update 可以把這行鎖住
3、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ; //線程二通過for update 無法獲得鎖,線程等待。
4、commit; //線程一通過commit 釋放鎖
5、 //線程二可以訪問到數據,線程不再等待。
所以,通過這個機制,一次只能有一個線程來操作 加鎖 - 操作 - 釋放鎖。 如果 操作 的時間過長的話,會帶來集群間的主線程等待。
數據庫行鎖是一種悲觀鎖,鎖表時其它線程無法查詢。
源碼中關於數據庫集群加鎖的方法有如下幾種:
1、executeInNonManagedTXLock方法的含義是自己管理事務,不讓容器管理事務的加鎖方法。
三個參數lockName的值是上面所說的TRIGGER_ACCESS,表示要加鎖的類型。
txCallback是加鎖后再回調的方法。
txValidator是驗證方法,一般為null
函數先執行加鎖,再回調要操作的方法,然后再解鎖。
看一下源碼:
2、如果不是通過這種回調方法的加鎖,一般是:
getLockHandler().obtainLock
執行
commitConnection(conn)
releaseLock
cleanupConnection
二、源碼分析鎖
先了解一篇文章,通過源碼來分析quartz是如何通過加鎖來實現集群環境,觸發器狀態的一致性。
http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
可以看到觸發器的操作主要用主線程StdScheduleThread來完成,不管是獲取需要觸發的30S內的觸發器,還是觸發過程。select和update觸發器表時
都會先加鎖,后解鎖。如果數據庫資源競爭比較大的話,鎖會影響整個性能。可以考慮將任務信息放在分布式內存,如redis上進行處理。數據庫只是定時從redis上load數據下來做統計。
參考: quartz詳解2:quartz由淺入深 查看第四章第1,2節
實現都在JobStoreSupport類
---
2、STATE_TRIGGER
實現都在JobStoreSupport類
---
其中,QRTZ_LOCKS就是Quartz集群實現同步機制的行鎖表,其表結構如下:
點擊(此處)折疊或打開
- --QRTZ_LOCKS表結構
- CREATE TABLE `QRTZ_LOCKS` (
- `LOCK_NAME` varchar(40) NOT NULL,
- PRIMARY KEY (`LOCK_NAME`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- --QRTZ_LOCKS記錄
- +-----------------+
- | LOCK_NAME |
- +-----------------+
- | CALENDAR_ACCESS |
- | JOB_ACCESS |
- | MISFIRE_ACCESS |
- | STATE_ACCESS |
- | TRIGGER_ACCESS |
- +-----------------+
可以看出QRTZ_LOCKS中有5條記錄,代表5把鎖,分別用於實現多個Quartz Node對Job、Trigger、Calendar訪問的同步控制。
關於行鎖的機制:
1、mysql > set autocommit=0; //先把mysql設置為不自動提交。
2、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ; //線程一通過for update 可以把這行鎖住
3、 select * from es_locks where lock_name = 'TRIGGER_ACCESS' for update ; //線程二通過for update 無法獲得鎖,線程等待。
4、commit; //線程一通過commit 釋放鎖
5、 //線程二可以訪問到數據,線程不再等待。
所以,通過這個機制,一次只能有一個線程來操作 加鎖 - 操作 - 釋放鎖。 如果 操作 的時間過長的話,會帶來集群間的主線程等待。
數據庫行鎖是一種悲觀鎖,鎖表時其它線程無法查詢。
源碼中關於數據庫集群加鎖的方法有如下幾種:
1、executeInNonManagedTXLock方法的含義是自己管理事務,不讓容器管理事務的加鎖方法。
點擊(此處)折疊或打開
- executeInNonManagedTXLock(
- String lockName,
- TransactionCallback<T> txCallback , final TransactionValidator<T> txValidator )
txCallback是加鎖后再回調的方法。
txValidator是驗證方法,一般為null
函數先執行加鎖,再回調要操作的方法,然后再解鎖。
看一下源碼:
點擊(此處)折疊或打開
- if (lockName != null) {
- // If we aren't using db locks, then delay getting DB connection
- // until after acquiring the lock since it isn't needed.
- if (getLockHandler().requiresConnection()) {
- conn = getNonManagedTXConnection();
- }
- transOwner = getLockHandler().obtainLock(conn, lockName);
- }
- if (conn == null) {
- conn = getNonManagedTXConnection();
- }
- final T result = txCallback.execute(conn);
- try {
- commitConnection(conn);
- } catch (JobPersistenceException e) {
- rollbackConnection(conn);
- if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
- @Override
- public Boolean execute(Connection conn) throws JobPersistenceException {
- return txValidator.validate(conn, result);
- }
- })) {
- throw e;
- }
- }
- Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
- if(sigTime != null && sigTime >= 0) {
- signalSchedulingChangeImmediately(sigTime);
- }
- return result;
- } catch (JobPersistenceException e) {
- rollbackConnection(conn);
- throw e;
- } catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
2、如果不是通過這種回調方法的加鎖,一般是:
getLockHandler().obtainLock
執行
commitConnection(conn)
releaseLock
cleanupConnection
二、源碼分析鎖
目前代碼中行鎖只用到了STATE_ACCESS 和TRIGGER_ACCESS 這兩種。
1、TRIGGER_ACCESS
先了解一篇文章,通過源碼來分析quartz是如何通過加鎖來實現集群環境,觸發器狀態的一致性。
http://www.360doc.com/content/14/0926/08/15077656_412418636.shtml
可以看到觸發器的操作主要用主線程StdScheduleThread來完成,不管是獲取需要觸發的30S內的觸發器,還是觸發過程。select和update觸發器表時
都會先加鎖,后解鎖。如果數據庫資源競爭比較大的話,鎖會影響整個性能。可以考慮將任務信息放在分布式內存,如redis上進行處理。數據庫只是定時從redis上load數據下來做統計。
參考: quartz詳解2:quartz由淺入深 查看第四章第1,2節
實現都在JobStoreSupport類
| 加鎖類型 | 加鎖方法 | 底層數據庫操作 | 備注 |
| executeInNonManagedTXLock | acquireNextTrigger | selectTriggerToAcquire selectTrigger selectJobDetail insertFiredTrigger |
查詢需要點火的trigger 選擇需要執行的trigger加入到fired_trigger表 |
| for執行 triggerFired | selectJobDetail selectCalendar updateFiredTrigger triggerExists updateTrigger |
點火trigger 修改trigger狀態為可執行狀態。 |
|
| recoverJobs | updateTriggerStatesFromOtherStates hasMisfiredTriggersInState doUpdateOfMisfiredTrigger selectTriggersForRecoveringJobs selectTriggersInState deleteFiredTriggers |
非集群環境下重新執行 failed與misfired的trigger |
|
| retryExecuteInNonManagedTXLock | releaseAcquiredTrigger | updateTriggerStateFromOtherState deleteFiredTrigger |
異常情況下重新釋放trigger到初使狀態。 |
| triggeredJobComplete | selectTriggerStatus removeTrigger updateTriggerState deleteFiredTrigger |
觸發JOB任務完成后的處理。 | |
| obtainLock | recoverMisfiredJobs | hasMisfiredTriggersInState doUpdateOfMisfiredTrigger | 重新執行misfired的trigger 可以在啟動時執行,也可以由misfired線程定期執行。 |
| clusterRecover | selectInstancesFiredTriggerRecords updateTriggerStatesForJobFromOtherState storeTrigger deleteFiredTriggers selectFiredTriggerRecords removeTrigger deleteSchedulerState |
集群有結點faied,讓JOB能重新執行。 | |
| executeInLock 數據庫集群里等同於 executeInNonManagedTXLock |
storeJobAndTrigger | updateJobDetail insertJobDetail triggerExists selectJobDetail updateTrigger insertTrigger |
保存JOB和TRIGGER配置 |
| storeJob | 保存JOB | ||
| removeJob | 刪除JOB | ||
| removeJobs | 批量刪除JOB | ||
| removeTriggers | 批量刪除triggers | ||
| storeJobsAndTriggers | 保存JOB和多個trigger配置 | ||
| removeTrigger | 刪除trigger | ||
| replaceTrigger | 替換trigger | ||
| storeCalendar | 保存定時日期 | ||
| removeCalendar | 刪除定時日期 | ||
| clearAllSchedulingData | 清除所有定時數據 | ||
| pauseTrigger | 停止觸發器 | ||
| pauseJob | 停止任務 | ||
| pauseJobs | 批量停止任務 | ||
| resumeTrigger | 恢復觸發器 | ||
| resumeJob | 恢復任務 | ||
| resumeJobs | 批量恢復任務 | ||
| pauseTriggers | 批量停止觸發器 | ||
| resumeTriggers | 批量恢復觸發器 | ||
| pauseAll | 停止所有 | ||
| resumeAll | 恢復所有 |
---
2、STATE_TRIGGER
實現都在JobStoreSupport類
| 加鎖類型 | 加鎖方法 | 底層數據庫操作 | 備注 |
| obtainLock | doCheckin | clusterCheckIn | 判斷集群狀態 先用LOCK_STATE_ACCESS鎖集群狀態 再用LOCK_TRIGGER_ACCESS恢復集群運行 |
---
