Elastic-Job源碼分析之JobScheduler類分析


JobScheduler這個類是EJ中比較核心的一個類,我們現在開始解析這個類。

一、構造器

首先我們看一下JobScheduler的幾個構造器。

    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }

這個構造器比較清晰,第一個參數regCenter表示的是注冊中心,這里我們用的是zk作為我們的注冊中心,所以這塊的配置我們一般從xml配置文件中讀取即可。我們按照官方文檔配置即可。

    <!--配置作業注冊中心 -->
    <reg:zookeeper id="regCenter" server-lists="ip1:2181,ip2:2181"
                   namespace="your-job-name" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000"
                   max-retries="3"/>

然后從xml文件中取出該配置即可。

ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
		ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);

第二個參數liteJobConfig表示的是作業的一些配置,這塊我們后續再細細了解。

第三個參數jobEventBus表示作業運行痕跡總線。如果想把作業運行的內容寫到DB中,我們需要用到另一個構造器,同時定義自己的JobEventConfiguration,目前來說實現這個接口的只有一個類JobEventRdbConfiguration,通過這個可以將作業運行的痕跡進行持久化到DB的操作。

第四個參數elasticJobListeners表示一些監聽器,這里我們可以定義一些監聽器,具體的監聽器我們可以參考官方文檔。

二、初始化

定義好我們的作業后,怎么將我們的作業運行起來呢?這塊就需要用到我們的初始化作業的方法init()。

public void init() {
    JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
    JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
    JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
    schedulerFacade.registerStartUpInfo(liteJobConfig);
    jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
    }

初始化主要做了以下幾個動作:

  • 設置當前分片總數setCurrentShardingTotalCount
  • 創建調度器createScheduler
  • 創建任務詳情createJobDetail
  • 將作業在zk上面進行注冊registerJob
  • 注冊作業啟動信息registerStartUpInfo
  • 開始進行作業調度scheduleJob

這幾個動作是一個作業能夠最終被調度的關鍵,我們來一一分析。

2.1 createScheduler

    private Scheduler createScheduler() {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties());//TODO 每次新建任務都會創建一個線程,會導致線程數過多
            result = factory.getScheduler();
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }

這個方法調用了quartz底層的方法,創建了一個調度器。我們看看quartz中的一些配置:

private Properties getBaseQuartzProperties() {
    Properties result = new Properties();
    result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
    result.put("org.quartz.threadPool.threadCount", "1");
    result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
    result.put("org.quartz.jobStore.misfireThreshold", "1");
    result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
    result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
    return result;
}

我們可以看到一個比較重要的參數:org.quartz.threadPool.threadCount,這里設置為了1,也就是說每個任務,EJ默認都會創建一個線程來進行調度,所以如果想動態的創建任務,你就要考慮好你的內存夠不夠這些線程使用了。

2.2 createJobDetail

private JobDetail createJobDetail(final String jobClass) {
    JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
    result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
    Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
    if (elasticJobInstance.isPresent()) {
        result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
    } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
        try {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
        } catch (final ReflectiveOperationException ex) {
            throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
        }
    }
    return result;
}

這個方法創建了任務的一些屬性。

  • 首先調用JobBuilder的構造器方法,生成了一個詳細的任務。
  • 然后將作業的門面放入到作業內部的一個作業數據map中,相當於是一個緩存。
  • 調用createElasticJobInstance創建實例,生成一個Optional的類,這個Optional是Guava中的一個類,后續咱們再研究。我們只需要知道創建了一個實例就行了,實例放在了Optional這個容器中,而Optional可以做一些額外的判斷。我們可以聯想一下,利用Optional來處理null?
  • 判斷實例是否創建成功,通過isPresent()方法來判斷。Optional的這個方法判斷實例是否為空。
    • 如果不為空,就把實例放入到緩存map中。
    • 否則,調用反射,把實例放入緩存map中。如果異常的話,直接拋出異常,說明實例化失敗。

2.3 registerJob

看方法名我們就能知道,這個方法的作用是注冊我們的任務。

/**
 * 添加作業調度控制器.
 * 
 * @param jobName 作業名稱
 * @param jobScheduleController 作業調度控制器
 * @param regCenter 注冊中心
 */
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
    schedulerMap.put(jobName, jobScheduleController);
    regCenterMap.put(jobName, regCenter);
    regCenter.addCacheData("/" + jobName);
}

這步的操作,把作業控制調度器放入到調度緩存中,把任務信息放入到regCenter的緩存中。另外還把任務名稱加入到本地緩存中。

2.4 registerStartUpInfo

這個方法是注冊一些任務的啟動信息,這塊的步驟比較多,需要細細理解一下。

    public void registerStartUpInfo(final boolean enabled) {
        listenerManager.startAllListeners();
        leaderService.electLeader();
        serverService.persistOnline(enabled);
        instanceService.persistOnline();
        shardingService.setReshardingFlag();
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }
  • 第一步,啟動所有監聽器。
    public void startAllListeners() {
        electionListenerManager.start();//主節點選舉
        shardingListenerManager.start();//分片
        failoverListenerManager.start();//失效轉移
        shutdownListenerManager.start();//關閉
        triggerListenerManager.start();//觸發器
        rescheduleListenerManager.start();//重新調度
        guaranteeListenerManager.start();//保證分布式任務全部開始和結束狀態
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);//注冊中心與任務節點的連接狀態
    }
  • 第二步,選舉主節點。這塊底層調用的是curator的方法,一個zk的客戶端。
  • 第三步,持久化作業信息到zk中。先把服務器信息持久化,再把作業實例信息持久化,
  • 第四步,設置重新分片的信息,同樣也是在zk中。
  • 第五步,初始化作業監聽服務。
  • 最后,調解分布式作業不一致狀態服務異步啟動。

可以看到,這塊做的東西比較多,而且最近版本這塊修改的內容也比較多。總的來說,就是啟動監聽器,同時將作業的信息持久化到zk中。

2.5 scheduleJob

調度作業,這塊是一個比較基礎的服務。底層調用的是quartz的方法。

初始化這塊就分析完了,里面的東西還是挺多的。今天就先分析到這里,咱們以后再細細分析吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM