什么是Elastic-Job
Elastic-Job是當當網大牛基於Zookepper,Quartz開發並且開源的Java分布式定時任務,解決Quartz不支持分布式的弊端。它由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
基本概念
- 分片概念:任務分布式的執行,需要將一個任務拆分成多個獨立的任務項,然后由分布式的服務器分別執行某一個或幾個分片項
- 個性化參數:shardingItemParameter,可以和分片項匹配對應關系。比如:將商品的狀態分成上架,下架。那么配置0=上架,1=下架,代碼中直接使用上架下架的枚舉值即可完成分片項與業務邏輯的對應關系
- 作用高可用:將分片總數設置成1,多台服務器執行作業將采用1主n從的方式執行
- 彈性擴容:將任務拆分為n個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服器加入集群或有服務器宕機。Elastic-Job將保留本次任務不變,下次任務開始前重新分片。
- 並行調度:采用任務分片方式實現。將一個任務拆分為n個獨立的任務項,由分布式的服務器並行執行各自分配到的分片項。
- 集中管理:采用基於zookepper的注冊中心,集中管理和協調分布式作業的狀態,分配和監聽。外部系統可直接根據Zookeeper的數據管理和監控elastic-job。
- 定制化流程任務:作業可分為簡單和數據流處理兩種模式,數據流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的線程快速的處理數據,而順序性處理模式將每個分片項分配到一個獨立線程,用於保證同一分片的順序性,這點類似於kafka的分區順序性。
整體架構圖
Elastic-Job的具體模塊的底層及如何實現
Elastic-Job采用去中心化設計,主要分為注冊中心、數據分片、分布式協調、定時任務處理和定制化流程型任務等模塊。
- 去中心化:指Elastic-Job沒有調度中心這一概念。每個運行在集群中的作業服務器都是對等的,節點之間通過注冊中心進行分布式協調。但elastic-job有主節點的概念,主節點用於處理一些集中式任務,如分片,清理運行時信息等,並無調度功能,定時調度都是由作業服務器自行觸發。
中心化 | 去中心化 | |
實現難度 | 難 | 易 |
部署難度 | 難 | 易 |
觸發時間統一控制 | 可以 | 不可以 |
觸發延遲 | 有 | 無 |
異構語言支持 | 容易 | 困難 |
- 注冊中心:注冊中心模塊目前直接使用zookeeper,用於記錄作業的配置,服務器信息以及作業運行狀態。Zookeeper雖然很成熟,但原理復雜,使用較難,在海量數據支持的情況下也會有性能和網絡問題。
- 數據分片:數據分片是elastic-job中實現分布式的重要概念,將真實數據和邏輯分片對應,用於解耦作業框架和數據的關系。作業框架只負責將分片合理的分配給相關的作業服務器,而作業服務器需要根據所分配的分片匹配數據進行處理。服務器分片目前都存儲在注冊中心中,各個服務器根據自己的IP地址拉取分片。
- 分布式協調:分布式協調模塊用於處理作業服務器的動態擴容縮容。一旦集群中有服務器發生變化,分布式協調將自動監測並將變化結果通知仍存活的作業服務器。協調時將會涉及主節點選舉,重分片等操作。目前使用的Zookeeper的臨時節點和監聽器實現主動檢查和通知功能。
- 定時任務處理:定時任務處理根據cron表達式定時觸發任務,目前有防止任務同時觸發,錯過任務重出發等功能。主要還是使用Quartz本身的定時調度功能,為了便於控制,每個任務都使用獨立的線程池。
- 定制化流程型任務:定制化流程型任務將定時任務分為多種流程,有不經任何修飾的簡單任務;有用於處理數據的fetchData/processData的數據流任務;以后還將增加消息流任務,文件任務,工作流任務等。用戶能以插件的形式擴展並貢獻代碼。
作業開發
Elastic-Job提供Simple、Dataflow和Script 3種作業類型。方法參數shardingContext包含作業配置、片和運行時信息。可通過getShardingTotalCount(), getShardingItem()等方法分別獲取分片總數,運行在本作業服務器的分片序列號等。
- Simple類型的作業:該類型意為簡單實現,只需實現SimpleJob接口,重寫它的execute方法即可
- Dataflow類型作業:用於處理數據流,實現DataflowJob接口,並重寫兩個方法——用於抓取(fetchData方法)和處理(processData方法)數據。比如在fetchData方法里面查詢沒有上架的商品,在processData方法修改該商品的狀態。
注意:可通過DataflowJobConfiguration配置是否流式處理。當配置成流式處理,fetchData方法返回值(返回值是集合)是null或長度是0,作業才停止抓取,否則將一直運行。非流式的則每次作業只執行一次這兩個方法就結束該作業。 - Script類型作業:意為腳本類型作業,支持shell、python、perl等類型腳本。只需通過控制台或代碼配置scriptCommandLine即可,無需編碼。
引入Maven依賴
<!-- 引入elastic-job-lite核心模塊 -->
<dependency>
<groupId>io.elasticjob</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${latest.release.version}</version>
</dependency>
<!-- 使用springframework自定義命名空間時引入 -->
<dependency>
<groupId>io.elasticjob</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${latest.release.version}</version>
</dependency>
作業配置
Elasti-Job配置分成3個層級,Core, Type和Root。
- Core對應JobCoreConfiguration,用於提供作業核心配置信息,如:作業名稱、分片總數、CRON表達式等。
- Type對應JobTypeConfiguration,有三個子類分別對應SIMPLE, DATAFLOW和SCRIPT類型作業,供3種作業需要的不同配置,如:DATAFLOW類型是否流式處理或SCRIPT類型的命令行等。
- Root對應JobRootConfiguration,有兩個子類分別對應Lite和Cloud部署類型,提供不同部署類型所需的配置,如:Lite類型的是否需要覆蓋本地配置或Cloud占用CPU或Memory數量等。
與Spring結合
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作業注冊中心-->
<reg:zookeeper id="regCenter" server-lists="127.0.0.1:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置簡單作業-->
<job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob">
<property name="fooService" ref="xxx.FooService"/>
</bean>
<!-- 配置關聯Bean作業-->
<job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<!-- 配置數據流作業-->
<job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<!-- 配置腳本作業-->
<job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />
<!-- 配置帶監聽的簡單作業-->
<job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
<job:listener class="xx.MySimpleJobListener"/>
<job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
</job:simple>
<!-- 配置帶作業數據庫事件追蹤的簡單作業-->
<job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource">
</job:simple>
</beans>
補充:啟動zookepper,通過spring啟動配置,作業就能加載。
Spring部分配置參數說明
注冊中心配置(只支持zookepper)
- id:注冊中心在Spring容器中的主鍵
- server-lists:IP地址加端口號,可配置多個,用逗號隔開
- namespace:zookepper的命名空間
- max-retries:最大重置次數
作業配置
JobCoreConfiguration屬性
- id:作業名稱
- class:作業實現類,需實現ElasticJob接口
- cron:cron表達式,控制作用觸發時間
- sharding-total-count:作業分片總數
- registry-center-ref:注冊中心bean的引用
- sharding-item-parameters:分片序列號和參數用等號分隔,多個鍵值對用逗號分隔片,序列號從0開始,不可大於或等於作業分片總數如:0=a,1=b,2=c
- failover:是否開啟失效轉移
補充:開啟失效轉移的情況下,如果任務執行過程中一台服務器失去連接,那么已經分配到該服務器的任務,將會在下次任務執行之前被當前集群中正常的服務器獲取分片並執行,執行結束后再進行下一次任務;未開啟失效轉移,那么服務器丟失后,程序將不作任務處理,任由其丟失,但下次任務會重新分片。 - disabled:作業是否禁止啟動
- overwrite:本地配置是否可覆蓋注冊中心配置,如果可覆蓋,每次啟動作業都以本地配置為准
- event-trace-rdb-data-source:作業事件追蹤的數據源Bean引用
- streaming-process:dataflow特有的——是否流式處理數據
- job-sharding-strategy-class:作業分片策略實現類全路徑
分片策略
- AverageAllocationJobShardingStrategy:平均分配,默認分配策略,不能整除的多余分片將依次追加到序號小的服務器
- OdevitySortByNameJobShardingStrategy:根據作業名的哈希值奇偶數決定IP升降序算法的分片策略。作業名的哈希值為奇數則IP升序,偶數則IP降序
- RotateServerByNameJobShardingStrategy:根據作業名的哈希值對服務器列表進行輪轉的分片策略
- 自定義策略:實現JobShardingStrategy接口並實現sharding方法,接口方法參數為作業服務器IP列表和分片策略選項,分片策略選項包括作業名稱,分片總數以及分片序列號和個性化參數對照表
對比
與Spring Batch比較
- Spring Batch 是一款批處理應用框架,不是調度框架。如果我們希望批處理任務定期執行,可結合 Quartz 等成熟的調度框架實現。Elastic-Job集成了調度框架,不需要額外添加
- Spring Batch提供了豐富的讀寫組件,適用於復雜的流程化作業
- Elastic-Job采用分片的方式,是分布式調度解決方案。適用場景是:相對於流程比較簡單,但是任務可以拆分到多個線程去執行。