當當的Elastic-Job開源出了兩種分布式Job的解決方案:
1. elastic-job-lite,這是一個無中心節點的調度;
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。
2. elastic-job-cloud是一個有中心節點的分布式調度開源工具
Elastic-Job-Cloud使用Mesos + Docker(TBD)的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。
Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API開發作業,它們的核心都是elastic-job-core,開發者僅需一次開發,即可根據需要以Lite或Cloud的方式部署
Elastic-Job-Lite實現分布式job的原理:
要在分布式集群環境下去安全的執行一個調度任務,我們常規的想法就是保證在集群環境下,只有集群中的一台機器能夠獲取執行任務的權限。
但是Elastic-Job在實現分布式Job時沒有按照這個思路去設計,而是將集群中所有的機器都利用起來,去多進程多線程的執行我們的作業任務。
那么它是如何做到分布式作業任務的不重不漏的呢?
Elastic-Job提出了一個數據分片(shardingItem)的概念。
舉個例子:
假如我現在部署了3台機器的一個集群,並且按照Elastic-Job的要求設置了分片數量shardingCount=10,則按照Elastic-Job基於平均分配算法的分片策略得到的分片結果為:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8] (參見:com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy.java)
那么在每一台機器上去執行時,我只要取到本機的數據分片對應的數據來處理,就能夠實現數據的分片處理。並且集群中所有機器處理的數據總和即為全量數據。
這樣就能把集群中所有的機器都調動起來去同時並行的處理任務,效率比較高。
更有甚者,如果本機的數據分片分到了多個分片(即一個JVM進程分到了多個分片),則Elastic-Job會為每一個分片去啟動一個線程來執行分片任務。
這樣,一台機器就會開啟多個線程就並行的處理作業任務,效率進一步的提高。
(參見:com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#process(ShardingContexts shardingContexts,
JobExecutionEvent.ExecutionSource executionSource))
特殊場景:分片數為1時

public class MyElasticJob implements SimpleJob { public void execute(ShardingContext shardingContext) { // 1.當分片數為1時,在同一個zookepper和jobname情況下,多台機器部署了Elastic // job時,只有拿到shardingContext.getShardingItem()為0的機器得以執行,其他的機器不執行 // 2.當分片數大於1時,假如有3台服務器,分成10片,則分片項分配結果為服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。此時每台服務器可根據拿到的shardingItem值進行相應的處理, // 舉例場景: // 假如job處理數據庫中的數據業務,方法為:A服務器處理數據庫中Id以0,1,2結尾的數據,B處理數據庫中Id以3,4,5結尾的數據,C處理器處理6,7,8,9結尾的數據,合計處理0-9為全部數據 // 如果服務器C崩潰,Elastic // Job自動進行進行失效轉移,將C服務器的分片轉移到A和B服務器上,則分片項分配結果為服務器A=0,1,2,3,4;服務器B=5,6,7,8,9 // 此時,A服務器處理數據庫中Id以0,1,2,3,4結尾的數據,B處理數據庫中Id以5,6,7,8,9結尾的數據,合計處理0-9為全部數據. processByEndId(shardingContext.getShardingItem()); } private void processByEndId(int shardingContext) { // TODO: 處理數據Id結尾為 shardingContext的數據 } }
主要功能
a) 分布式:重寫Quartz基於數據庫的分布式功能,改用Zookeeper實現注冊中心。
b) 並行調度:采用任務分片方式實現。將一個任務拆分為n個獨立的任務項,由分布式的服務器並行執行各自分配到的分片項。
c) 彈性擴容縮容:將任務拆分為n個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。
d) 集中管理:采用基於Zookeeper的注冊中心,集中管理和協調分布式作業的狀態,分配和監聽。外部系統可直接根據Zookeeper的數據管理和監控elastic-job。
e) 定制化流程型任務:作業可分為簡單和數據流處理兩種模式,數據流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的線程快速的處理數據,而順序性處理模式將每個分片項分配到一個獨立線程,用於保證同一分片的順序性,這點類似於kafka的分區順序性。
2. 其他功能
a) 失效轉移:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。
b) Spring命名空間支持:elastic-job可以不依賴於spring直接運行,但是也提供了自定義的命名空間方便與spring集成。
c) 運維平台:提供web控制台用於管理作業。
3. 非功能需求
a) 穩定性:在服務器無波動的情況下,並不會重新分片;即使服務器有波動,下次分片的結果也會根據服務器IP和作業名稱哈希值算出穩定的分片順序,盡量不做大的變動。
b) 高性能:同一服務器的批量數據處理采用自動切割並多線程並行處理。
c) 靈活性:所有在功能和性能之間的權衡,都可通過配置開啟/關閉。如:elastic-job會將作業運行狀態的必要信息更新到注冊中心。如果作業執行頻度很高,會造成大量Zookeeper寫操作,而分布式Zookeeper同步數據可能引起網絡風暴。因此為了考慮性能問題,可以犧牲一些功能,而換取性能的提升。
d) 冪等性:elastic-job可犧牲部分性能用以保證同一分片項不會同時在兩個服務器上運行。
e) 容錯性:作業服務器和Zookeeper斷開連接則立即停止作業運行,用於防止分片已經重新分配,而腦裂的服務器仍在繼續執行,導致重復執行。
以一個官方的例子(com.dangdang.ddframe.job.example.JavaMain.java)來分析Elastic-Job的執行過程:
准備:
由於Elastic-Job是基於Quratz來封裝的,我們先看一下Quartz是如何來提交一個作業任務的:

public class MyJob implements Job { @Override public void execute(JobExecutionContext arg0) throws JobExecutionException { System.out.println(DateTime.now() + "-------執行MyJob"); } public static void main(String[] args) { // 通過schedulerFactory獲取一個調度器 SchedulerFactory schedulerfactory = new StdSchedulerFactory(); Scheduler scheduler = null; try { // 通過schedulerFactory獲取一個調度器 scheduler = schedulerfactory.getScheduler(); // 創建jobDetail實例,綁定Job實現類, 指明job的名稱,所在組的名稱,以及綁定job類 JobDetail job = JobBuilder.newJob(MyJob.class).withIdentity("job1", "jgroup1").build(); // 定義調度觸發規則, 使用simpleTrigger規則(cornTrigger) :從第2s開始,每5s執行一次。(http://cron.qqe2.com/ 生成corn) Trigger trigger = TriggerBuilder.newTrigger().withIdentity("simpleTrigger", "triggerGroup") .withSchedule(CronScheduleBuilder.cronSchedule("2/5 * * * * ? *")).startNow().build(); // 把作業和觸發器注冊到任務調度中 scheduler.scheduleJob(job, trigger); // 啟動調度 System.out.println(DateTime.now() + "------job start----"); scheduler.start(); } catch (Exception e) { e.printStackTrace(); } } }
以此為基礎,我們來看Elastic-Job-Lite是如何對它進行封裝的:
0. 入口 (我們要執行的任務是JavaSimpleJob.java)
// 將要執行的作業任務放到 SimpleJobConfiguration 中
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
// 啟動
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
作業任務 job 是通過 LiteJobConfiguration 來定義的,里面會定義是哪一種類型的job(JobTypeConfiguration)。
現在支持3種類型的job:SimpleJobConfiguration、DataflowJobConfiguration、ScriptJobConfiguration
1. JobScheduler#init()
1.1 com.dangdang.ddframe.job.lite.api.JobScheduler 實例化時,會構造JobFacade。
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
JobFacade 是用來操作分片上下文的(后面會用到)
1.2 創建org.quartz.JobDetail
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
.....
scheduler.start(); // 啟動Quartz
2. Quartz執行作業任務
到了corn表達式設定的執行時間,Quartz會去執行LiteJob#execute()
2.1 LiteJob#execute(final JobExecutionContext context)
// 根據作業任務的類型,拿到作業任務對應的JobExecutor,然后去執行
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
2.2 AbstractElasticJobExecutor#execute()
/** * 執行作業. */ public final void execute() { try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); }
// 重要:獲取當前作業服務器的分片上下文。如果發現沒有leader,就選出一個leader,並進行分片 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); try { jobFacade.afterJobExecuted(shardingContexts); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }
附:
分片服務:
com.dangdang.ddframe.job.lite.internal.sharding.ShardingService
ShardingService#getShardingItems(String jobInstanceId) -- 獲取本機的shardingItems
默認分片策略:
com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
主節點選舉:
com.dangdang.ddframe.job.lite.internal.election.LeaderService
作業實例的唯一標記:
com.dangdang.ddframe.job.lite.api.strategy.JobInstance
jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
節點信息寫入:
PersistShardingInfoTransactionExecutionCallback#execute()
作業節點數據:
com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage
注意 ShardingContexts 與 ShardingContext 的區別:
ShardingContexts是本機的分片上下文(里面保存了本機分配到的shardingItemParameters的集合)
/** * 分配於本作業實例的分片項和參數的Map. */ private final Map<Integer, String> shardingItemParameters;
ShardingContext是當前線程的分片上下文(里面只保存有一個shardingItem)
/** * 分配於本作業實例的分片項. */ private final int shardingItem;
Elastic-Job特性:
1. Elastic-Job-Lite並無作業調度中心節點,而是基於部署作業框架的程序在到達相應時間點時各自觸發調度。
注冊中心僅用於作業注冊和監控信息存儲。而主作業節點僅用於處理分片和清理等功能。
2. 為了保證作業的在分布式場景下的一致性,一旦作業與注冊中心無法通信,運行中的作業會立刻停止執行,但作業的進程不會退出,這樣做的目的是為了防止作業重分片時,將與注冊中心失去聯系的節點執行的分片分配給另外節點,導致同一分片在兩個節點中同時執行。
3. Elastic-Job-Lite采用無中心化設計,若每個客戶端的配置不一致,不做控制的話,最后一個啟動的客戶端配置將會成為注冊中心的最終配置。
4. 彈性擴容縮容:將任務拆分為n個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片
5. 定制化流程型任務:作業可分為簡單和數據流處理兩種模式,數據流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的線程快速的處理數據,而順序性處理模式將每個分片項分配到一個獨立線程,用於保證同一分片的順序性,這點類似於kafka的分區順序性。
6. 失敗轉移:http://www.cnblogs.com/nevermorewang/p/5744324.html
LTS、xxl-job、Elastic-Job對比
Elastic-Job官方文檔及相關文章:
http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/00-overview/
https://my.oschina.net/u/719192/blog/506062?nocache=1497421466555
http://www.infoq.com/cn/articles/dangdang-distributed-work-framework-elastic-job/
開源方案對比: