開源任務調度平台elastic-job-lite源碼解析


前段時間寫過一遍文章<一文揭秘定時任務調度框架quartz>,有讀者建議我再講講elastic-job這個任務調度框架,年末沒有那么忙,就來學習一下elastic-job。

首先一點,elastic-job基於quartz,理解quartz的運行機制有助於對elastic-job的快速理解。

首先看一下elastic-job-lite的架構

我們知道quartz有三個重要的概念:Job,Trigger,Scheduler。那么elastic-job里面三個概念是什么體現的呢?

1.Job

LiteJob繼承自quartz的job接口

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * Lite調度作業.
 *
 * @author zhangliang
 */
public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

其中,

1.1 ElasticJob實現了不同的Job類型

1.2.JobFacade是作業內部服務門面服務

注意:elasticJob的特性在里面可以看到如:

任務分片:

  將整體任務拆解為多個子任務

  可通過服務器的增減彈性伸縮任務處理能力

  分布式協調,任務服務器上下線的全自動發現與處理

容錯性:

  支持定時自我故障檢測與自動修復

  分布式任務分片唯一性保證

  支持失效轉移和錯過任務重觸發

任務跟蹤

任務調度

public interface JobFacade {
    
    /**
     * 讀取作業配置.
     * 
     * @param fromCache 是否從緩存中讀取
     * @return 作業配置
     */
    JobRootConfiguration loadJobRootConfiguration(boolean fromCache);
    
    /**
     * 檢查作業執行環境.
     * 
     * @throws JobExecutionEnvironmentException 作業執行環境異常
     */
    void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException;
    
    /**
     * 如果需要失效轉移, 則執行作業失效轉移.
     */
    void failoverIfNecessary();
    
    /**
     * 注冊作業啟動信息.
     *
     * @param shardingContexts 分片上下文
     */
    void registerJobBegin(ShardingContexts shardingContexts);
    
    /**
     * 注冊作業完成信息.
     *
     * @param shardingContexts 分片上下文
     */
    void registerJobCompleted(ShardingContexts shardingContexts);
    
    /**
     * 獲取當前作業服務器的分片上下文.
     *
     * @return 分片上下文
     */
    ShardingContexts getShardingContexts();
    
    /**
     * 設置任務被錯過執行的標記.
     *
     * @param shardingItems 需要設置錯過執行的任務分片項
     * @return 是否滿足misfire條件
     */
    boolean misfireIfRunning(Collection<Integer> shardingItems);
    
    /**
     * 清除任務被錯過執行的標記.
     *
     * @param shardingItems 需要清除錯過執行的任務分片項
     */
    void clearMisfire(Collection<Integer> shardingItems);
    
    /**
     * 判斷作業是否需要執行錯過的任務.
     * 
     * @param shardingItems 任務分片項集合
     * @return 作業是否需要執行錯過的任務
     */
    boolean isExecuteMisfired(Collection<Integer> shardingItems);
    
    /**
     * 判斷作業是否符合繼續運行的條件.
     * 
     * <p>如果作業停止或需要重分片或非流式處理則作業將不會繼續運行.</p>
     * 
     * @return 作業是否符合繼續運行的條件
     */
    boolean isEligibleForJobRunning();
    
    /**判斷是否需要重分片.
     *
     * @return 是否需要重分片
     */
    boolean isNeedSharding();
    
    /**
     * 作業執行前的執行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    void beforeJobExecuted(ShardingContexts shardingContexts);
    
    /**
     * 作業執行后的執行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    void afterJobExecuted(ShardingContexts shardingContexts);
    
    /**
     * 發布執行事件.
     *
     * @param jobExecutionEvent 作業執行事件
     */
    void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);
    
    /**
     * 發布作業狀態追蹤事件.
     *
     * @param taskId 作業Id
     * @param state 作業執行狀態
     * @param message 作業執行消息
     */
    void postJobStatusTraceEvent(String taskId, State state, String message);
}

2.JobDetail

通用的Job屬性,定義在job.xsd

    <xsd:complexType name="base">
        <xsd:complexContent>
            <xsd:extension base="beans:identifiedType">
                <xsd:all>
                    <xsd:element ref="listener" minOccurs="0" maxOccurs="1" />
                    <xsd:element ref="distributed-listener" minOccurs="0" maxOccurs="1" />
                </xsd:all>
                <xsd:attribute name="class" type="xsd:string" />
                <xsd:attribute name="job-ref" type="xsd:string" />
                <xsd:attribute name="registry-center-ref" type="xsd:string" use="required" />
                <xsd:attribute name="cron" type="xsd:string" use="required" />
                <xsd:attribute name="sharding-total-count" type="xsd:string" use="required" />
                <xsd:attribute name="sharding-item-parameters" type="xsd:string" />
                <xsd:attribute name="job-parameter" type="xsd:string" />
                <xsd:attribute name="monitor-execution" type="xsd:string" default="true"/>
                <xsd:attribute name="monitor-port" type="xsd:string" default="-1"/>
                <xsd:attribute name="max-time-diff-seconds" type="xsd:string" default="-1"/>
                <xsd:attribute name="failover" type="xsd:string" default="false"/>
                <xsd:attribute name="reconcile-interval-minutes" type="xsd:int" default="10"/>
                <xsd:attribute name="misfire" type="xsd:string" default="true"/>
                <xsd:attribute name="job-sharding-strategy-class" type="xsd:string" />
                <xsd:attribute name="description" type="xsd:string" />
                <xsd:attribute name="disabled" type="xsd:string" default="false"/>
                <xsd:attribute name="overwrite" type="xsd:string" default="false"/>
                <xsd:attribute name="executor-service-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultExecutorServiceHandler"/>
                <xsd:attribute name="job-exception-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultJobExceptionHandler"/>
                <xsd:attribute name="event-trace-rdb-data-source" type="xsd:string" />
            </xsd:extension>
        </xsd:complexContent>
    </xsd:complexType>

其中Simple類型的任務完全繼承通用屬性,dataflow類型的任務增加了streaming-process屬性,script增加了script-command-line屬性

使用的解析器定義在spring.handlers

http\://www.dangdang.com/schema/ddframe/reg=io.elasticjob.lite.spring.reg.handler.RegNamespaceHandler
http\://www.dangdang.com/schema/ddframe/job=io.elasticjob.lite.spring.job.handler.JobNamespaceHandler

JobNamespaceHandler

/**
 * 分布式作業的命名空間處理器.
 * 
 * @author caohao
 */
public final class JobNamespaceHandler extends NamespaceHandlerSupport {
    
    @Override
    public void init() {
        registerBeanDefinitionParser("simple", new SimpleJobBeanDefinitionParser());
        registerBeanDefinitionParser("dataflow", new DataflowJobBeanDefinitionParser());
        registerBeanDefinitionParser("script", new ScriptJobBeanDefinitionParser());
    }
}

在彈性化分布式作業執行器AbstractElasticJobExecutor.java初始化時獲取配置屬性,並使用對應的Handler進行處理。

    protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
        this.jobFacade = jobFacade;
        jobRootConfig = jobFacade.loadJobRootConfiguration(true);
        jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
        executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
        jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
        itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
    }

3 執行作業

彈性化分布式作業執行器AbstractElasticJobExecutor.java

    /**
     * 執行作業.
     */
    public final void execute() {
        try { jobFacade.checkJobExecutionEnvironment(); //1  } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } ShardingContexts shardingContexts = jobFacade.getShardingContexts(); //2 if (shardingContexts.isAllowSendJobEvent()) {  jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); //3 } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); //4 //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON  jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); //5 while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); //6 try { jobFacade.afterJobExecuted(shardingContexts); //7 //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON  jobExceptionHandler.handleException(jobName, cause); } }

 3.1 環境監測

檢查本機與注冊中心的時間誤差秒數是否在允許范圍

    /**
     * 檢查本機與注冊中心的時間誤差秒數是否在允許范圍.
     * 
     * @throws JobExecutionEnvironmentException 本機與注冊中心的時間誤差秒數不在允許范圍所拋出的異常
     */
    public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
        int maxTimeDiffSeconds =  load(true).getMaxTimeDiffSeconds();
        if (-1  == maxTimeDiffSeconds) {
            return;
        }
        long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
        if (timeDiff > maxTimeDiffSeconds * 1000L) {
            throw new JobExecutionEnvironmentException(
                    "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
        }
    }

3.2 根據分片規則進行分片

如果需要分片且當前節點為主節點, 則作業分片.

 如果當前無可用節點則不分片.

    /**
     * 如果需要分片且當前節點為主節點, 則作業分片.
     * 
     * <p>
     * 如果當前無可用節點則不分片.
     * </p>
     */
    public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);
 JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

3.3 使用EventBus通知

com.google.common.eventbus.EventBus

  /**
   * Posts an event to all registered subscribers.  This method will return
   * successfully after the event has been posted to all subscribers, and
   * regardless of any exceptions thrown by subscribers.
   *
   * <p>If no subscribers have been subscribed for {@code event}'s class, and
   * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
   * DeadEvent and reposted.
   *
   * @param event  event to post.
   */
  public void post(Object event) {
    Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

    boolean dispatched = false;
    for (Class<?> eventType : dispatchTypes) {
      subscribersByTypeLock.readLock().lock();
      try {
        Set<EventSubscriber> wrappers = subscribersByType.get(eventType);

        if (!wrappers.isEmpty()) {
          dispatched = true;
          for (EventSubscriber wrapper : wrappers) {
            enqueueEvent(event, wrapper);
          }
        }
      } finally {
        subscribersByTypeLock.readLock().unlock();
      }
    }

    if (!dispatched && !(event instanceof DeadEvent)) {
      post(new DeadEvent(this, event));
    }

    dispatchQueuedEvents();
  }

3.4 job預執行,監聽ElasticJobListener

 

    @Override
    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
        for (ElasticJobListener each : elasticJobListeners) {
            each.beforeJobExecuted(shardingContexts);
        }
    }

3.5 job執行

    private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
            }
            return;
        }
        jobFacade.registerJobBegin(shardingContexts);//1
        String taskId = shardingContexts.getTaskId();
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
        }
        try {
           process(shardingContexts, executionSource);//2
        } finally {
            // TODO 考慮增加作業失敗的狀態,並且考慮如何處理作業失敗的整體回路
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
                }
            } else {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
                }
            }
        }
    }

  >>1.將job注冊到注冊中心

  >>2.將各個任務分片放到線程池中執行

3.6 實現轉移

如果需要失效轉移, 則執行作業失效轉移.

    /**
     * 在主節點執行操作.
     * 
     * @param latchNode 分布式鎖使用的作業節點名稱
     * @param callback 執行操作的回調
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

3.7 作業執行后處理

作業執行后的執行的方法

    @Override
    public void afterJobExecuted(final ShardingContexts shardingContexts) {
        for (ElasticJobListener each : elasticJobListeners) {
            each.afterJobExecuted(shardingContexts);
        }
    }

4.Trigger 

elasticJob默認使用Cron Trigger,在job屬性里定義

  <xsd:attribute name="cron" type="xsd:string" use="required" />

5.作業調度器JobScheduler

    /**
     * 初始化作業.
     */
    public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); //1
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
 JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); //2 JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); //3
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); //4
    }
    
    private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
    
    protected Optional<ElasticJob> createElasticJobInstance() {
        return Optional.absent();
    }
    
    private Scheduler createScheduler() {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties());
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }
    
    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
        result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
        return result;
    }

5.1 更新作業配置.

    /**
     * 更新作業配置.
     *
     * @param liteJobConfig 作業配置
     * @return 更新后的作業配置
     */
    public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
        configService.persist(liteJobConfig);
        return configService.load(false);
    }

5.2 初始化一系列操作

5.2.1 創建quartz scheduler

    private Scheduler createScheduler() {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties());
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }

5.2.2 創建JobDetail

    private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
    

5.2.3 添加作業調度控制器.

    /**
     * 添加作業調度控制器.
     * 
     * @param jobName 作業名稱
     * @param jobScheduleController 作業調度控制器
     * @param regCenter 注冊中心
     */
    public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }

5.2.4 調度作業.

    /**
     * 調度作業.
     * 
     * @param cron CRON表達式
     */
    public void scheduleJob(final String cron) {
        try {
            if (!scheduler.checkExists(jobDetail.getKey())) {
                scheduler.scheduleJob(jobDetail, createTrigger(cron));
            }
            scheduler.start();
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }

6.總結

  >>elastic-job使用了quartz的調度機制,內部原理一致,增加了性能和可用性。

  >>elastic-job使用注冊中心(zookeeper)替換了quartz的jdbc數據存儲方式,性能有較大提升。

 >> elastic-job增加了job的追蹤(使用Listener),便於monitor

 >>elastic-job使用了分片機制,可以將job分成多個任務項,放到不同的地方執行

 >>elastic-job僅支持cronTrigger,quartz支持更多的trigger實現

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM