前段時間寫過一遍文章<一文揭秘定時任務調度框架quartz>,有讀者建議我再講講elastic-job這個任務調度框架,年末沒有那么忙,就來學習一下elastic-job。
首先一點,elastic-job基於quartz,理解quartz的運行機制有助於對elastic-job的快速理解。
首先看一下elastic-job-lite的架構
我們知道quartz有三個重要的概念:Job,Trigger,Scheduler。那么elastic-job里面三個概念是什么體現的呢?
1.Job
LiteJob繼承自quartz的job接口
import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * Lite調度作業. * * @author zhangliang */ public final class LiteJob implements Job { @Setter private ElasticJob elasticJob; @Setter private JobFacade jobFacade; @Override public void execute(final JobExecutionContext context) throws JobExecutionException { JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute(); } }
其中,
1.1 ElasticJob實現了不同的Job類型
1.2.JobFacade是作業內部服務門面服務
注意:elasticJob的特性在里面可以看到如:
任務分片:
將整體任務拆解為多個子任務
可通過服務器的增減彈性伸縮任務處理能力
分布式協調,任務服務器上下線的全自動發現與處理
容錯性:
支持定時自我故障檢測與自動修復
分布式任務分片唯一性保證
支持失效轉移和錯過任務重觸發
任務跟蹤
任務調度
public interface JobFacade { /** * 讀取作業配置. * * @param fromCache 是否從緩存中讀取 * @return 作業配置 */ JobRootConfiguration loadJobRootConfiguration(boolean fromCache); /** * 檢查作業執行環境. * * @throws JobExecutionEnvironmentException 作業執行環境異常 */ void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException; /** * 如果需要失效轉移, 則執行作業失效轉移. */ void failoverIfNecessary(); /** * 注冊作業啟動信息. * * @param shardingContexts 分片上下文 */ void registerJobBegin(ShardingContexts shardingContexts); /** * 注冊作業完成信息. * * @param shardingContexts 分片上下文 */ void registerJobCompleted(ShardingContexts shardingContexts); /** * 獲取當前作業服務器的分片上下文. * * @return 分片上下文 */ ShardingContexts getShardingContexts(); /** * 設置任務被錯過執行的標記. * * @param shardingItems 需要設置錯過執行的任務分片項 * @return 是否滿足misfire條件 */ boolean misfireIfRunning(Collection<Integer> shardingItems); /** * 清除任務被錯過執行的標記. * * @param shardingItems 需要清除錯過執行的任務分片項 */ void clearMisfire(Collection<Integer> shardingItems); /** * 判斷作業是否需要執行錯過的任務. * * @param shardingItems 任務分片項集合 * @return 作業是否需要執行錯過的任務 */ boolean isExecuteMisfired(Collection<Integer> shardingItems); /** * 判斷作業是否符合繼續運行的條件. * * <p>如果作業停止或需要重分片或非流式處理則作業將不會繼續運行.</p> * * @return 作業是否符合繼續運行的條件 */ boolean isEligibleForJobRunning(); /**判斷是否需要重分片. * * @return 是否需要重分片 */ boolean isNeedSharding(); /** * 作業執行前的執行的方法. * * @param shardingContexts 分片上下文 */ void beforeJobExecuted(ShardingContexts shardingContexts); /** * 作業執行后的執行的方法. * * @param shardingContexts 分片上下文 */ void afterJobExecuted(ShardingContexts shardingContexts); /** * 發布執行事件. * * @param jobExecutionEvent 作業執行事件 */ void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent); /** * 發布作業狀態追蹤事件. * * @param taskId 作業Id * @param state 作業執行狀態 * @param message 作業執行消息 */ void postJobStatusTraceEvent(String taskId, State state, String message); }
2.JobDetail
通用的Job屬性,定義在job.xsd
<xsd:complexType name="base"> <xsd:complexContent> <xsd:extension base="beans:identifiedType"> <xsd:all> <xsd:element ref="listener" minOccurs="0" maxOccurs="1" /> <xsd:element ref="distributed-listener" minOccurs="0" maxOccurs="1" /> </xsd:all> <xsd:attribute name="class" type="xsd:string" /> <xsd:attribute name="job-ref" type="xsd:string" /> <xsd:attribute name="registry-center-ref" type="xsd:string" use="required" /> <xsd:attribute name="cron" type="xsd:string" use="required" /> <xsd:attribute name="sharding-total-count" type="xsd:string" use="required" /> <xsd:attribute name="sharding-item-parameters" type="xsd:string" /> <xsd:attribute name="job-parameter" type="xsd:string" /> <xsd:attribute name="monitor-execution" type="xsd:string" default="true"/> <xsd:attribute name="monitor-port" type="xsd:string" default="-1"/> <xsd:attribute name="max-time-diff-seconds" type="xsd:string" default="-1"/> <xsd:attribute name="failover" type="xsd:string" default="false"/> <xsd:attribute name="reconcile-interval-minutes" type="xsd:int" default="10"/> <xsd:attribute name="misfire" type="xsd:string" default="true"/> <xsd:attribute name="job-sharding-strategy-class" type="xsd:string" /> <xsd:attribute name="description" type="xsd:string" /> <xsd:attribute name="disabled" type="xsd:string" default="false"/> <xsd:attribute name="overwrite" type="xsd:string" default="false"/> <xsd:attribute name="executor-service-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultExecutorServiceHandler"/> <xsd:attribute name="job-exception-handler" type="xsd:string" default="io.elasticjob.lite.executor.handler.impl.DefaultJobExceptionHandler"/> <xsd:attribute name="event-trace-rdb-data-source" type="xsd:string" /> </xsd:extension> </xsd:complexContent> </xsd:complexType>
其中Simple類型的任務完全繼承通用屬性,dataflow類型的任務增加了streaming-process屬性,script增加了script-command-line屬性
使用的解析器定義在spring.handlers
http\://www.dangdang.com/schema/ddframe/reg=io.elasticjob.lite.spring.reg.handler.RegNamespaceHandler
http\://www.dangdang.com/schema/ddframe/job=io.elasticjob.lite.spring.job.handler.JobNamespaceHandler
JobNamespaceHandler
/** * 分布式作業的命名空間處理器. * * @author caohao */ public final class JobNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("simple", new SimpleJobBeanDefinitionParser()); registerBeanDefinitionParser("dataflow", new DataflowJobBeanDefinitionParser()); registerBeanDefinitionParser("script", new ScriptJobBeanDefinitionParser()); } }
在彈性化分布式作業執行器AbstractElasticJobExecutor.java初始化時獲取配置屬性,並使用對應的Handler進行處理。
protected AbstractElasticJobExecutor(final JobFacade jobFacade) { this.jobFacade = jobFacade; jobRootConfig = jobFacade.loadJobRootConfiguration(true); jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName(); executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER)); jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER); itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1); }
3 執行作業
彈性化分布式作業執行器AbstractElasticJobExecutor.java
/**
* 執行作業.
*/
public final void execute() {
try { jobFacade.checkJobExecutionEnvironment(); //1 } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } ShardingContexts shardingContexts = jobFacade.getShardingContexts(); //2 if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); //3 } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); //4 //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); //5 while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); //6 try { jobFacade.afterJobExecuted(shardingContexts); //7 //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }
3.1 環境監測
檢查本機與注冊中心的時間誤差秒數是否在允許范圍
/** * 檢查本機與注冊中心的時間誤差秒數是否在允許范圍. * * @throws JobExecutionEnvironmentException 本機與注冊中心的時間誤差秒數不在允許范圍所拋出的異常 */ public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException { int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds(); if (-1 == maxTimeDiffSeconds) { return; } long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime()); if (timeDiff > maxTimeDiffSeconds * 1000L) { throw new JobExecutionEnvironmentException( "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds); } }
3.2 根據分片規則進行分片
如果需要分片且當前節點為主節點, 則作業分片.
如果當前無可用節點則不分片.
/** * 如果需要分片且當前節點為主節點, 則作業分片. * * <p> * 如果當前無可用節點則不分片. * </p> */ public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherShardingItemCompleted(); LiteJobConfiguration liteJobConfig = configService.load(false); int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass()); jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }
3.3 使用EventBus通知
com.google.common.eventbus.EventBus
/** * Posts an event to all registered subscribers. This method will return * successfully after the event has been posted to all subscribers, and * regardless of any exceptions thrown by subscribers. * * <p>If no subscribers have been subscribed for {@code event}'s class, and * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a * DeadEvent and reposted. * * @param event event to post. */ public void post(Object event) { Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass()); boolean dispatched = false; for (Class<?> eventType : dispatchTypes) { subscribersByTypeLock.readLock().lock(); try { Set<EventSubscriber> wrappers = subscribersByType.get(eventType); if (!wrappers.isEmpty()) { dispatched = true; for (EventSubscriber wrapper : wrappers) { enqueueEvent(event, wrapper); } } } finally { subscribersByTypeLock.readLock().unlock(); } } if (!dispatched && !(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } dispatchQueuedEvents(); }
3.4 job預執行,監聽ElasticJobListener
@Override public void beforeJobExecuted(final ShardingContexts shardingContexts) { for (ElasticJobListener each : elasticJobListeners) { each.beforeJobExecuted(shardingContexts); } }
3.5 job執行
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) { if (shardingContexts.getShardingItemParameters().isEmpty()) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName)); } return; } jobFacade.registerJobBegin(shardingContexts);//1 String taskId = shardingContexts.getTaskId(); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); } try { process(shardingContexts, executionSource);//2 } finally { // TODO 考慮增加作業失敗的狀態,並且考慮如何處理作業失敗的整體回路 jobFacade.registerJobCompleted(shardingContexts); if (itemErrorMessages.isEmpty()) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } } else { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); } } } }
>>1.將job注冊到注冊中心
>>2.將各個任務分片放到線程池中執行
3.6 實現轉移
如果需要失效轉移, 則執行作業失效轉移.
/** * 在主節點執行操作. * * @param latchNode 分布式鎖使用的作業節點名稱 * @param callback 執行操作的回調 */ public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON handleException(ex); } }
3.7 作業執行后處理
作業執行后的執行的方法
@Override public void afterJobExecuted(final ShardingContexts shardingContexts) { for (ElasticJobListener each : elasticJobListeners) { each.afterJobExecuted(shardingContexts); } }
4.Trigger
elasticJob默認使用Cron Trigger,在job屬性里定義
<xsd:attribute name="cron" type="xsd:string" use="required" />
5.作業調度器JobScheduler
/** * 初始化作業. */ public void init() { LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); //1 JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); //2 JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); //3 schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); //4 } private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; } protected Optional<ElasticJob> createElasticJobInstance() { return Optional.absent(); } private Scheduler createScheduler() { Scheduler result; try { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties()); result = factory.getScheduler(); result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener()); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } return result; } private Properties getBaseQuartzProperties() { Properties result = new Properties(); result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName()); result.put("org.quartz.threadPool.threadCount", "1"); result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName()); result.put("org.quartz.jobStore.misfireThreshold", "1"); result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName()); result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString()); return result; }
5.1 更新作業配置.
/** * 更新作業配置. * * @param liteJobConfig 作業配置 * @return 更新后的作業配置 */ public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) { configService.persist(liteJobConfig); return configService.load(false); }
5.2 初始化一系列操作
5.2.1 創建quartz scheduler
private Scheduler createScheduler() { Scheduler result; try { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties()); result = factory.getScheduler(); result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener()); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } return result; }
5.2.2 創建JobDetail
private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; }
5.2.3 添加作業調度控制器.
/** * 添加作業調度控制器. * * @param jobName 作業名稱 * @param jobScheduleController 作業調度控制器 * @param regCenter 注冊中心 */ public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) { schedulerMap.put(jobName, jobScheduleController); regCenterMap.put(jobName, regCenter); regCenter.addCacheData("/" + jobName); }
5.2.4 調度作業.
/** * 調度作業. * * @param cron CRON表達式 */ public void scheduleJob(final String cron) { try { if (!scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron)); } scheduler.start(); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } }
6.總結
>>elastic-job使用了quartz的調度機制,內部原理一致,增加了性能和可用性。
>>elastic-job使用注冊中心(zookeeper)替換了quartz的jdbc數據存儲方式,性能有較大提升。
>> elastic-job增加了job的追蹤(使用Listener),便於monitor
>>elastic-job使用了分片機制,可以將job分成多個任務項,放到不同的地方執行
>>elastic-job僅支持cronTrigger,quartz支持更多的trigger實現