1、源碼入口
使用xxl-job的時候,需要引入一個jar,然后還需要往Spring容器注入XxlJobSpringExecutor
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
我們就可以順着這個XxlJobSpringExecutor,分析下這個xxl-job-core做了些什么。
2、執行器啟動
XxlJobSpringExecutor代碼比較簡潔,大致框架如下:
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void destroy() {
super.destroy();
}
// ......
}
1、實現了SmartInitializingSingleton接口,在項目啟動的時候,會調到afterSingletonsInstantiated方法。這個方法又可以作為進一步閱讀的下個入口了;
2、實現了DisposableBean接口,在系統停止的時候,調用destroy方法。
3、實現ApplicationContextAware,只是為了獲取applicationContext對象,這個沒啥好說的。
先來看看初始化
2.1、解析Xxl-job注解
在使用xxl-job的時候,我們會寫@XxlJob注解,來告訴xxl-job這是個定時任務的方法。
那么xxl-job就得解析這個注解並做一系列處理。對吧?
這個事情,就是在afterSingletonsInstantiated方法里面調initJobHandlerMethodRepository去做的。代碼略長,就不貼出來了。具體實現邏輯如下:
-
通過applicationContext.getBeanNamesForType獲取全部bean
-
遍歷所有bean,找到有@XxlJob標記的方法,並判斷@XxlJob標記的name值是否有重復,如果重復則報錯
-
如果有配置init和destroy方法,則通過反射找到他們
-
將信息組裝成MethodJobHandler對象,保存到ConcurrentHashMap中
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
2.2、處理glue模式
一般基於Spring使用的時候,都是bean模式,即
任務以JobHandler方式維護在執行器端;需要結合 "JobHandler" 屬性匹配執行器中任務;
而glue模式是指
任務以源碼方式維護在調度中心;
這里初始化了一個SpringGlueFactory
2.3、啟動
這里調用父類XxlJobExecutor的start方法。方法主要執行下面幾個大的步驟:
public class XxlJobExecutor {
...
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
...
}
-
初始化執行器日志路徑,默認 /data/applogs/xxl-job/jobhandler 。
這個XxlJobFileAppender是個單獨寫日志文件的工具類。在xxl-job-admin界面上,可以通過界面查看定時任務調度執行的日志。我們在業務代碼中,也可以通過XxlJobHelper.log方法,寫自己的日志(老版本是XxlJobLogger.log)
-
根據配置的adminAddresses地址,構造AdminBiz列表(后面注冊、調服務端接口等,會需要調到這個地址)
-
啟動一個daemon線程,每天定期清理調度日志文件(上述1步驟目錄下的文件)
-
定義一個LinkedBlockingQueue,這個queue里面放job執行結果。然后啟動triggerCallbackThread和triggerRetryCallbackThread 兩個線程,向job-admin反饋job執行結果。
這里為啥是2個線程去給admin端反饋執行結果呢?
原來,正常情況下,只有triggerCallbackThread從queue里面拿數據,提交到admin。
但是當它提交失敗的時候,triggerCallbackThread就會寫一個callbacklog文件。再由triggerRetryCallbackThread讀取callbacklog文件,並向admin提交執行結果。
-
構造EmbedServer並啟動
構造EmbedServer需要url,這里涉及到三個配置。
### 執行器注冊 [選填]:優先使用該配置作為注冊地址,為空時使用內嵌服務 ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執行器動態IP和動態映射端口問題。 xxl.job.executor.address= ### 執行器IP [選填]:默認為空表示自動獲取IP,多網卡時可手動設置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用於 "執行器注冊" 和 "調度中心請求並觸發任務"; xxl.job.executor.ip= ### 執行器端口號 [選填]:小於等於0則自動獲取;默認端口為9999,單機部署多個執行器時,注意要配置不同執行器端口; xxl.job.executor.port=9999EmbedServer是基於netty實現的一個服務,監聽9999端口,並主要響應xxl-job-admin的調度請求。從EmbedHttpServerHandler中可以看出,admin調度中心往執行器發的請求,主要有以下5個:
beat:調度中心檢測執行器是否在線時使用 idleBeat:調度中心檢測 指定執行器 上 指定任務 是否忙碌(運行中)時使用 (注意這里2個“指定”) run:觸發任務執行 kill:終止任務 這里是根據jobId找到對應的jobThread,再改變執行標記后,調interrupt方法。 (所以如果你想優雅地停止一個線程,也可以通過線程標記+interrupt方式) log:查看執行日志執行器什么時候往調用中心注冊的呢?
答案是:在EmbedServer的start方法中,啟動了一個thread,在其內部調了【startRegistry(appname, address);】
而這行代碼里面,又啟動了一個registryThread,每30秒注冊當前執行器。
至此,xxl-job客戶端的邏輯大致分析清楚了,下一節再看看admin的代碼
