xxl-job源碼閱讀一(客戶端)


1、源碼入口

使用xxl-job的時候,需要引入一個jar,然后還需要往Spring容器注入XxlJobSpringExecutor

<dependency>
	<groupId>com.xuxueli</groupId>
	<artifactId>xxl-job-core</artifactId>
	<version>2.3.0</version>
</dependency>

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }

我們就可以順着這個XxlJobSpringExecutor,分析下這個xxl-job-core做了些什么。

2、執行器啟動

XxlJobSpringExecutor代碼比較簡潔,大致框架如下:

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
    
	@Override
    public void afterSingletonsInstantiated() {
        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
  	@Override
    public void destroy() {
        super.destroy();
    }  
    
    // ......

}

1、實現了SmartInitializingSingleton接口,在項目啟動的時候,會調到afterSingletonsInstantiated方法。這個方法又可以作為進一步閱讀的下個入口了;

2、實現了DisposableBean接口,在系統停止的時候,調用destroy方法。

3、實現ApplicationContextAware,只是為了獲取applicationContext對象,這個沒啥好說的。

先來看看初始化

2.1、解析Xxl-job注解

在使用xxl-job的時候,我們會寫@XxlJob注解,來告訴xxl-job這是個定時任務的方法。

那么xxl-job就得解析這個注解並做一系列處理。對吧?

這個事情,就是在afterSingletonsInstantiated方法里面調initJobHandlerMethodRepository去做的。代碼略長,就不貼出來了。具體實現邏輯如下:

  1. 通過applicationContext.getBeanNamesForType獲取全部bean

  2. 遍歷所有bean,找到有@XxlJob標記的方法,並判斷@XxlJob標記的name值是否有重復,如果重復則報錯

  3. 如果有配置init和destroy方法,則通過反射找到他們

  4. 將信息組裝成MethodJobHandler對象,保存到ConcurrentHashMap中

    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

2.2、處理glue模式

一般基於Spring使用的時候,都是bean模式,即

任務以JobHandler方式維護在執行器端;需要結合 "JobHandler" 屬性匹配執行器中任務;

而glue模式是指

任務以源碼方式維護在調度中心;

這里初始化了一個SpringGlueFactory

2.3、啟動

這里調用父類XxlJobExecutor的start方法。方法主要執行下面幾個大的步驟:

public class XxlJobExecutor  {
	...
    public void start() throws Exception {

        // init logpath
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        initEmbedServer(address, ip, port, appname, accessToken);
    }
    ...
}
  1. 初始化執行器日志路徑,默認 /data/applogs/xxl-job/jobhandler 。

    這個XxlJobFileAppender是個單獨寫日志文件的工具類。在xxl-job-admin界面上,可以通過界面查看定時任務調度執行的日志。我們在業務代碼中,也可以通過XxlJobHelper.log方法,寫自己的日志(老版本是XxlJobLogger.log)

  2. 根據配置的adminAddresses地址,構造AdminBiz列表(后面注冊、調服務端接口等,會需要調到這個地址)

  3. 啟動一個daemon線程,每天定期清理調度日志文件(上述1步驟目錄下的文件)

  4. 定義一個LinkedBlockingQueue,這個queue里面放job執行結果。然后啟動triggerCallbackThread和triggerRetryCallbackThread 兩個線程,向job-admin反饋job執行結果。

    這里為啥是2個線程去給admin端反饋執行結果呢?

    原來,正常情況下,只有triggerCallbackThread從queue里面拿數據,提交到admin。

    但是當它提交失敗的時候,triggerCallbackThread就會寫一個callbacklog文件。再由triggerRetryCallbackThread讀取callbacklog文件,並向admin提交執行結果。

  5. 構造EmbedServer並啟動

    構造EmbedServer需要url,這里涉及到三個配置。

    ### 執行器注冊 [選填]:優先使用該配置作為注冊地址,為空時使用內嵌服務 ”IP:PORT“ 作為注冊地址。從而更靈活的支持容器類型執行器動態IP和動態映射端口問題。
    xxl.job.executor.address=
    ### 執行器IP [選填]:默認為空表示自動獲取IP,多網卡時可手動設置指定IP,該IP不會綁定Host僅作為通訊實用;地址信息用於 "執行器注冊" 和 "調度中心請求並觸發任務";
    xxl.job.executor.ip=
    ### 執行器端口號 [選填]:小於等於0則自動獲取;默認端口為9999,單機部署多個執行器時,注意要配置不同執行器端口;
    xxl.job.executor.port=9999
    

    EmbedServer是基於netty實現的一個服務,監聽9999端口,並主要響應xxl-job-admin的調度請求。從EmbedHttpServerHandler中可以看出,admin調度中心往執行器發的請求,主要有以下5個:

    beat:調度中心檢測執行器是否在線時使用
    idleBeat:調度中心檢測 指定執行器 上 指定任務 是否忙碌(運行中)時使用 (注意這里2個“指定”)
    run:觸發任務執行
    kill:終止任務
    	這里是根據jobId找到對應的jobThread,再改變執行標記后,調interrupt方法。
    	(所以如果你想優雅地停止一個線程,也可以通過線程標記+interrupt方式)
    log:查看執行日志
    

    執行器什么時候往調用中心注冊的呢?

    答案是:在EmbedServer的start方法中,啟動了一個thread,在其內部調了【startRegistry(appname, address);】

    而這行代碼里面,又啟動了一個registryThread,每30秒注冊當前執行器。

至此,xxl-job客戶端的邏輯大致分析清楚了,下一節再看看admin的代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM