quartz插件——實現任務之間的串行調度


需求背景:

      目前的項目中大概有二十幾個定時任務,在當初實現是只考慮到了階段之間的結構,即把一個完整過程的幾個階段分成若干個定時任務來實現,不如一個“下載源數據--處理--傳給目標”的任務,使用了三個定時任務來實現。 給予三個定時任務相同的處理時間間隔,每個定時任務並沒有強制依賴其他任務,主要是在每次運行開始寫一些判斷代碼。   這種模式一直運行良好,直到遇到一個業務場景——"需求希望整個過程能夠盡快的處理完成,對時間要求很敏感"。  這里遇到問題就是我們只能不斷加快這些定時任務的頻率,但永遠解決不了任務之間的錯配運行。你無法控制多個任務按照最佳順序來運行。

 

最終的解決方案:

     我實現了一個quartz 的插件,利用quartz提供的simpleTrigger特性實現了”任務執行完立即執行其后繼任務“的功能, 當然后繼任務的概念也是自己引入的,指出一個任務可以有多個后繼任務。即支持 A任務運行完畢,立即執行B,也指出A運行完畢,立即執行B和C。  后繼任務也可以有自己的后繼任務,事實上任務與其后繼任務完全是支持自定義的。

 

quartz有些默認的插件:

    我用的1.7版本,已經包含4,5個插件, LoggingJobHistoryPlugin,LoggingTriggerHistoryPlugin分別可以打印scheduler容器管理的所有triggers和jobDetails的運行日志。 還有一個插件支持使用xml方式管理trigger&job任務(不同於spring對quartz的封裝,但很類似), 一個是支持scheduler的管理的插件。 不同版本的插件可能會有些差別,這些默認的插件通常是在quartz包的plugin package下。 

   quartz插件的實現原理很簡單,上面說默認插件的原因也是希望大家隨便打開一個默認插件實現,看看源碼, 然后就能很快照貓畫虎啦。

自定義的插件實現:

    事實上,只要你的類繼承SchedulerPlugin接口,原理上這個類已經是一個插件了。當然你還需要把這個插件“插”到quartz中,容后在述。

    因為沒有寫插件的經歷,在實現之前有個問題一直困擾着我:"插件要怎么開始運行,執行入口在哪里...", 回過頭看大概是被”插件“這個名頭嚇到蒙了。 簡單說,插件本身就是類,讓普通類執行其邏輯無法兩種方式:1.邏輯的入口寫在構造方法里,創建時邏輯即開始執行。 2.邏輯的入口被調用。 我想當時的困擾應該在第二點,要執行插件就要主動調用,那既然都主動調用了,插件還有毛意義... 目前我的觀點是這種情況插件的邏輯一般是綁定在一個事件的監聽器上的,連起來就是"當某時間發生時,這個插件的邏輯開始執行"。 可以參照quartz 的jobHistory插件

    回頭看SchedulerPlugin接口,簡單到發指,將scheduler容器注給你,然后就靠你自己發揮了。  當你把自定義的插件配給quartz時, quartz在容器初始化的后期階段會實例化你的插件並注冊,如果不報錯,隨即調用initialize(),在容器初始化完成,容器本身啟動之后,插件的start()被調用.  當quartz的容器銷毀時,則會調用shutdown 。

       

     我的插件實現: class MultiSerialJobSchPlugin implements SchedulerPlugin,JobListener{},  SchedulerPlugin是插件接口,JobListener則是quartz內置的一個任務執行的監聽接口。該接口定義如下圖,其三個方法分別是job在即將執行,執行被取消是,執行完畢是被調用。 當然這個監聽器監聽的對象和范圍是要顯式的指定和綁定的,這點也容后敘述。  目前我的MultiSerialJobSchPlugin既是一個插件,也是一個監聽器。

      

     那么現在我的類MultiSerialJobSchPlugin已經有如此的潛力”當某些job執行完成后我會立刻得知“。 因為job執行完成后MultiSerialJobSchPlugin.jobExecutionVetoed()方法會被調用。 下面來看具體的實現。

1. 插件的初始化,三個工作 ,a.保存容器引用,使插件能夠隨時使用容器的資源和操作容器(如果需要) b:把這個類實例本身當作一個監聽器,聲明成一個quartz全局的監聽,即我監聽的范圍是quartz管理的所有job。c:把實例保存在scheduler容器里,使外部能通過scheduler.getcontext().get(name)獲取實例。 這一步對我這里要實現的功能關系不大,只是推薦做法。

此處說明下scheduler.getContext(). 它本質上是個map, 這個context是提供給用戶使用的,可以將任何想保存的信息放進去scheduler.getcontext().put(key,value),然后等到使用的時候拿出來scheduler.getcontext().get(key)。因為在容器里,所以里面的信息是全局的,任何時候都能得到的。 后面會看到,我將job之間的執行順序的定義放在這里,然后給插件使用的。

2.插件啟動,實際看代碼並沒有什么啟動操作,還是一些環境/數據的准備,首先解釋一個問題,為什么這些准備不統一放到initialize(),而要分布到2個方法中?

原因是這兩個方法被調用的階段不同, initialise()是在容器初始階段就被調用的。 所以保險起見,我將數據准備工作放到的容器運行是才被調用的start()里面。

這個方法2個工作:a. 將jobs之間的關系保存到本地relations對象中,relations的Properties類型,其具體保存了”jobA = JobB,JobC,JobD“(jobA的后繼任務是b,c,d)這樣的信息. 這些關系信息是通過spring配置文件手工定義的,然后由spring注入到scheduler.context中的,spring注入這一步后面會提到。b.遍歷容器,將job和trigger的對應信息保存在插件本地變量jobTriggersMap

3.jobWasExecuted(JobExecutionContext context,JobExecutionException jobException)方法的實現前面已經講到我的插件本身也是一個監聽器,並且我已經在第1步把這個監聽器的監聽范圍設為全局的了。 只要在這個方法實現如下步驟:

       a.通過傳入的JobExecutionContext參數獲取剛剛執行完畢的那個job的名稱

       b.通過job名稱在本地變量relations中查詢該job的后繼任務,followJobs

       c.遍歷followJobs,對於單個followJob, 在本地變量jobTriggersMap中找到其對應的關聯trigger

                如果為關聯,則新建一個simpleTigger(策略是立即執行且只執行一次);

                如果有關聯的trigger,如果這些triggers里面由simpleTrigger類型的,記錄為st,修改或者替換st,使其觸發策略為立即執行且只執行一次

       d.該方法立即返回,使得quartz 對job的這次調度正常結束。

       e. 其followJob由於simpleTrigger新的策略會立即開始執行,執行完畢后,flowjob也會進入到a-d的步驟,由此一個任務串建立起來

特別注意:在對followjob綁定新trigger時,一定要是替換或者修改原有的,我實現時用替換。  如果只是簡單給他多綁定一個trigger,最后會造成任務執行一次多一個trigger(在1.7版本中 即使simpleTrigger_startNow_repeat0執行完畢后也不會被銷毀,至少不會被立即銷毀)。

4. 插件的啟動

    在quartz中,其scheduler容器初始化過程中其實是涉及了多個組件的。比如threadPool,jobstore等, 這些組件quartz都是提供了多個可供選擇的,當然也有一套默認的配置,這些信息是在quartz包下的quartz.properties文件中,插件的啟動配置也只需配置他就可以了。下圖是1.7的默認配置

想要在容器啟動時啟動插件,只需指定org.quartz.plugin.${pluginName}.class的值即可,比如

org.quartz.plugin.tiggerHistory.class = org.quartz.plugins.history.LoggingJobHistoryPlugin

則啟動是quartz會加載LoggingJobHistoryPlugin這個插件, 它的功能是打印job運行時的調度日志,非常詳細。

我的配置:其中org.quartz.plugin.MultiSerialJobSch.${propertyname}=${propertyvalue},這個類似spring注入,propertyname是我在MultiSerialJobSch的一個屬性,本且實現了其set方法,當然可以配置多個。

org.quartz.plugin.MultiSerialJobSch.class = com.${我的類包路徑}.MultiSerialJobSchPlugin

org.quartz.plugin.MultiSerialJobSch.relationKey = JobRelationship

5. 與spring的整合

   其中spring與quartz本身的整合,在我的上一篇文章里有詳細的說明。

   這里整合解決的是第2步遺留下來的問題 scheduler.getcontext.get(relationKey), 這里的值是怎么來的,首先relationKey是一個字符串,在上步可以看到我是

作為變量配置在quartz.properties的,其值是JobRelationship。 下面看我spring 的配置: 在容器factorybean中的schedulerContextAsMap實際上就是spring封裝的scheduler.context對象,我注入了一個entity<JobRelationship,Object>。Object本身則是Properties。 至此這些關系信息在spring 啟動quartz時,被

寫到scheduler.context中。 然后我的插件如第2步所述,取得這些關系信息。 提供給之后的決策使用。

 

總結:

     運行過程:定義后繼任務概念, 在job本身執行完成后,立即啟動后繼任務。 其中后繼任務概念是由一個property實現的,立即啟動后繼任務則用simpletrigger和任務動態綁定實現。

     一點感觸,其實和這個插件關系不大,在做完這個之后我又用quartz獨立實現了一個定時項目(沒有用任何的spring),后續也單個研究了rmi,jndi,struts2,終於體會到了spring的致命弱點--它對其他組件封裝太完美了, 使得對於新手而言,極易產生quartz,rmi,jndi,hibernate等一離開spring就無法正常使用,無從下手,進而反而影響到新手的學習。 然后網上也充斥着這種情況: 你想查quartz原理,90%的內容都是在spring xml中怎么配置quartz. 這樣真的好嗎。

     誠然,高手封裝了簡單易用的東西固然要推薦,可是吾等便是做這行的,如果成長之路太容易真的好么

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM