分布式調度平台XXL-JOB源碼分析-調度中心


架構圖

上圖是我們要進行源碼分析的2.1版本的整體架構圖。其分為兩大塊,調度中心和執行器,本文先分析調度中心,也就是xxl-job-admin這個包的代碼。

關鍵bean

在application.properties配置正確的數據庫連接信息后,直接啟動XxlJobAdminApplication即可。

配置類XxlJobAdminConfig,里面維護了一些調度中心端的配置數據。

XxlJobScheduler這個組件實現了InitializingBean接口,所以spring容器在初始化的時候會調用afterPropertiesSet方法,此方法如下:

第一步國際化相關。

第二步監控相關。

第三步失敗重試相關。

第四步啟動admin端服務,接收注冊請求等。

第五步JobScheduleHelper調度器,死循環,在xxl_job_info表里取將要執行的任務,更新下次執行時間的,調用JobTriggerPoolHelper類,來給執行器發送調度任務的

JobScheduleHelper

這個類就是死循環從xxl_job_info表中取出未來5秒內要執行的任務,進行調度分發。

 啟動了兩個守護線程,先來看scheduleThread。

死循環內的代碼如上圖,首先利用for update語句進行獲取任務的資格鎖定,再去獲取未來5秒內即將要執行的任務。

展開遍歷任務的邏輯代碼,有三個分支

 

 第一個分支當前任務的觸發時間已經超時5秒以上了,不在執行,直接計算下一次觸發時間。

 第二個分支為觸發時間已滿足,利用JobTriggerPoolHelper這個類進行任務調度,之后判斷下一次執行時間如果在5秒內,進行此任務數據的緩存,處理邏輯與第三個分支一樣。

對觸發時間秒數進行60取模,跟進pushTimeRing方法

ringData是以0到59的整數為key,以jobId集合為value的Map集合。這個集合數據的處理邏輯,就在我們第二個守護線程ringThread中。

 1 while (!ringThreadToStop) {
 2     try {
 3         // second data
 4         List<Integer> ringItemData = new ArrayList<>();
 5         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
 6         for (int i = 0; i < 2; i++) {
 7             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
 8             if (tmpData != null) {
 9                 ringItemData.addAll(tmpData);
10             }
11         }
12         // ring trigger
13         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
14         if (ringItemData!=null && ringItemData.size()>0) {
15             // do trigger
16             for (int jobId: ringItemData) {
17                 // do trigger
18                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
19             }
20             // clear
21             ringItemData.clear();
22         }
23     } catch (Exception e) {
24         if (!ringThreadToStop) {
25             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
26         }
27     }
28     // next second, align second
29     try {
30         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
31     } catch (InterruptedException e) {
32         if (!ringThreadToStop) {
33             logger.error(e.getMessage(), e);
34         }
35     }
36 }

根據當前秒數刻度和前一個刻度進行時間輪的任務獲取,之后和上文一樣,利用JobTriggerPoolHelper進行任務調度。

時序圖

 

 

 

JobTriggerPoolHelper

如前文所述,不管是scheduleThread還是ringThread,最后完成任務調度的都是JobTriggerPoolHelper.trigger方法,這個類有兩個線程池fastTriggerPool和slowTriggerPool,顧名思義,分別是執行較快任務和較慢任務的,后查官方文檔,如下:

  

minTim屬性,作用待明確

jobTimeoutCountMap屬性,計數,key為jobId,value使用AtomicInteger計數。

helper靜態變量指向自己本身,提供外部靜態方法調用。

重要方法,向兩種線程池其中之一提交調度任務,進行調度,引出XxlJobTrigger這個類,一路跟進去

繼續跟進

至此,完成執行器的任務調度。

時序圖

接收注冊和心跳請求

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM