認識 Elastic-Job:
任務調度高級需求,Quartz 的不足:
- 作業只能通過 DB 搶占隨機負載,無法協調
- 任務不能分片——單個任務數據太多了跑不完,消耗線程,負載不均
- 作業日志可視化監控、統計
發展歷史:
官網:http:://elasticjob.io/docs/elastic-job-lite/00-overview/
Elastic-Job 是怎么來的?在當當的 ddframe 框架中,需要一個任務調度系統(作業系統)。實現的話有兩種思路,一個是修改開源產品,一種是基於開源產品搭建(封裝),當當選擇了后者,最開始這個調度系統叫做 dd-job。它是一個無中心化的分布式調度框架。因為數據庫缺少分布式協調功能(比如選主),替換為 Zookeeper 后,增加了彈性擴容和數據分片的功能。Elastic-Job 是 ddframe 中的 dd-job 作業模塊分離出來的作業框架,基於 Quartz和 Curator 開發,在 2015 年開源。輕量級,無中心化解決方案。為什么說是去中心化呢?因為沒有統一的調度中心。集群的每個節點都是對等的,節點之間通過注冊中心進行分布式協調。E-Job 存在主節點的概念,但是主節點沒有調度的功能,而是用於處理一些集中式任務,如分片,清理運行時信息等。
Elastic-Job 最開始只有一個 elastic-job-core 的項目,在 2.X 版本以后主要分為Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個子項目。其中,Elastic-Job-Lite 定位為輕量級 無 中 心 化 解 決 方 案 , 使 用 jar 包 的 形 式 提 供 分 布 式 任 務 的 協 調 服 務 。 而Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應用分發以及進程隔離等服務(跟 Lite 的區別只是部署方式不同,他們使用相同的 API,只要開發一次)。
功能特性:
- 分布式調度協調:用 ZK 實現注冊中心
- 錯過執行作業重觸發(Misfire)
- 支持並行調度(任務分片)
- 作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
- 彈性擴容縮容:將任務拆分為 n 個任務項后,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job 將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。
- 失效轉移 failover:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。
- 支持作業生命周期操作(Listener)
- 豐富的作業類型(Simple、DataFlow、Script)
- Spring 整合以及命名空間提供
- 運維平台
項目架構:
應用在各自的節點執行任務,通過 ZK 注冊中心協調。節點注冊、節點選舉、任務分片、監聽都在 E-Job 的代碼中完成。下圖是官網提供得架構圖。

任務類型:
接下去就針對 Elastic-Job 的三種任務類型進行切入。來體會一下 Elastic-Job的開發流程。任務類型分為:SimpleJob、DataflowJob、ScriptJob
1.導入 pom 依賴:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.9</version>
</dependency>
2.SimpleJob :簡單實現,未經任何封裝的類型。需實現 SimpleJob 接口.
public class MySimpleJob implements SimpleJob { public void execute(ShardingContext context) { System.out.println( String.format("分片項 ShardingItem: %s | 運行時間: %s | 線程ID: %s | 分片參數: %s ", context.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), context.getShardingParameter()) ); } }
測試類:
public class SimpleJobTest { // TODO 如果修改了代碼,跑之前清空ZK public static void main(String[] args) { // ZK注冊中心 CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter( new ZookeeperConfiguration("192.168.1.101:2181", "ejob-standalone") ); regCenter.init(); // 數據源 , 事件執行持久化策略 DruidDataSource dataSource =new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://192.168.1.101:3306/study?useUnicode=true&characterEncoding=utf-8"); dataSource.setUsername("root"); dataSource.setPassword("123456"); JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); // 定義作業核心配置 JobCoreConfiguration coreConfig = JobCoreConfiguration .newBuilder("MySimpleJob", "0/20 * * * * ?", 4) .shardingItemParameters("0=RDP, 1=CORE, 2=SIMS, 3=ECIF").failover(true).build(); // 定義SIMPLE類型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration( coreConfig, MySimpleJob.class.getCanonicalName()); // 作業分片策略 // 基於平均分配算法的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); // 定義Lite作業根配置 // LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build(); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); // 構建Job // new JobScheduler(regCenter, simpleJobRootConfig).init(); new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); } }
3.DataflowJob:Dataflow 類型用於處理數據流,必須實現 fetchData()和processData()的方法,一個用來獲取數據,一個用來處理獲取到的數據。
public class MyDataFlowJob implements DataflowJob<String> { private boolean flag = false; public List<String> fetchData(ShardingContext shardingContext) { System.out.println("開始獲取數據"); if (flag) { return null; } return Arrays.asList("qingshan", "jack", "seven"); } public void processData(ShardingContext shardingContext, List<String> data) { for (String val : data) { // 處理完數據要移除掉,不然就會一直跑,處理可以在上面的方法里執行。這里采用 flag
System.out.println("開始處理數據:" + val); } flag = true; } }
測試的話同 SimpleJob,只需要把任務類型修改為 DataflowJobConfiguration
4.ScriptJob:Script 類型作業意為腳本類型作業,支持 shell,python,perl 等所有類型腳本。我這邊直接在 D盤下 新增一個 test.bat 腳本,內容如下:
@echo ------[Script Task]Sharding Context: %*
測試類:
public class ScriptJobTest { // 如果修改了代碼,跑之前清空ZK public static void main(String[] args) { // ZK注冊中心 CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter( new ZookeeperConfiguration("192.168.1.101:2181", "ejob-standalone")); regCenter.init(); // 定義作業核心配置 JobCoreConfiguration scriptJobCoreConfig = JobCoreConfiguration .newBuilder("MyScriptJob", "0/4 * * * * ?", 2) .build(); // 定義SCRIPT類型配置 ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration( scriptJobCoreConfig,"D:/test.bat"); // 作業分片策略 // 基於平均分配算法的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); // 定義Lite作業根配置 // LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build(); LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build(); // 構建Job new JobScheduler(regCenter, scriptJobRootConfig).init(); // new JobScheduler(regCenter, scriptJobRootConfig, jobEventConfig).init(); } }
通過以上的三個 demo 我們對於 elastic-job 有了新的認識,可以看到要創建一個新的job需要經歷一系列的配置,那么接下去就來看看這些配置都是怎么定義的。
Elastic-Job 配置:
配置手冊: http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/
配置的過程大概分為以下幾個步驟:
- 由於 Elastic-Job依賴於 ZK,所以首先是注冊中心的配置。
- 如果想對事件的執行鏈路持久化,還需要配置相關JobEventConfiguration。
- 作業核心配置:JobCoreConfiguration,配置作業名稱、CRON 表達式、分片總數等。
- 作業類型配置,如SimpleJobConfiguration,配置自己的實現等。
- 定義Lite作業根配置 LiteJobConfiguration,配置 作業分片策略
- 最后構建 Job,JobScheduler
作業核心配置分為 3 級,分別是 JobCoreConfiguration,JobTypeConfiguration 和LiteJobConfiguration 。 LiteJobConfiguration 使 用 JobTypeConfiguration ,JobTypeConfiguration 使用 JobCoreConfiguration,層層嵌套。JobTypeConfiguration 根 據 不 同 實 現 類 型 分 為 SimpleJobConfiguration ,DataflowJobConfiguration 和 ScriptJobConfiguration。E-Job 使用 ZK 來做分布式協調,所有的配置都會寫入到 ZK 節點。
Zookeeper 注冊中心數據結構:
一個任務一個二級節點。這里面有些節點是臨時節點,只有任務運行的時候才能看到。
注意:修改了任務重新運行任務不生效,是因為 ZK 的信息不會更新, 除非把overwrite 修改成 true。
接下去我們先將三類的任務都跑一遍,然后保留 SimpleJob 把其他兩個關了。這個時候可以查看 ZK節點如下:
從這里我們可以得出結論 Elastic-Job是以 jobName 來區分他們的。點開正在運行的 SimpleJob 節點可以看到如下信息:
config 節點:
JSON 格式存儲。存儲任務的配置信息,包含執行類,cron 表達式,分片算法類,分片數量,分片參數等等。
{ "jobName": "MySimpleJob", "jobClass": "com.wuzz.demo.job.demo.simple.MySimpleJob", "jobType": "SIMPLE", "cron": "0/20 * * * * ?", "shardingTotalCount": 4, "shardingItemParameters": "0\u003dRDP, 1\u003dCORE, 2\u003dSIMS, 3\u003dECIF", "jobParameter": "", "failover": true, "misfire": true, "description": "", "jobProperties": { "job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler", "executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler" }, "monitorExecution": true, "maxTimeDiffSeconds": -1, "monitorPort": -1, "jobShardingStrategyClass": "", "reconcileIntervalMinutes": 10, "disabled": false, "overwrite": true }
config節點的數據是通過ConfigService持久化到zookeeper中去的。默認狀態下,如果你修改了 Job 的配置比如 cron 表達式、分片數量等是不會更新到 zookeeper 上去的,除非你在 Lite 級別的配置把參數 overwrite 修改成 true。
instances 節點:
同一個 Job 下的 elastic-job 的部署實例。一台機器上可以啟動多個 Job 實例,也就是 Jar 包。instances 的命名是 IP+@-@+PID。只有在運行的時候能看到。
leader 節點:
任務實例的主節點信息,通過 zookeeper 的主節點選舉,選出來的主節點信息。在elastic job 中,任務的執行可以分布在不同的實例(節點)中,但任務分片等核心控制,需要由主節點完成。因此,任務執行前,需要選舉出主節點。下面有三個子節點:
- election:主節點選舉
- sharding:分片
- failover:失效轉移,這里沒有顯示是未發生失效轉移
election 下面的 instance 節點顯示了當前主節點的實例 ID:jobInstanceId。election 下面的 latch 節點也是一個永久節點用於選舉時候的實現分布式鎖。sharding 節點下面有一個臨時節點,necessary,是否需要重新分片的標記。如果分片總數變化,或任務實例節點上下線或啟用/禁用,以及主節點選舉,都會觸發設置重分片標記,主節點會進行分片計算。
servers 節點:
任務實例的信息,主要是 IP 地址,任務實例的 IP 地址。跟 instances 不同,如果多個任務實例在同一台機器上運行則只會出現一個 IP 子節點。可在 IP 地址節點寫入DISABLED 表示該任務實例禁用。
sharding 節點:
任務的分片信息,子節點是分片項序號,從 0 開始。分片個數是在任務配置中設置的。分片項序號的子節點存儲詳細信息。每個分片項下的子節點用於控制和記錄分片運行狀態。最主要的子節點就是 instance。
運維平台:
github下載源碼 https://github.com/elasticjob/elastic-job-lite.
對 elastic-job-lite-console 打包得到安裝包。解壓縮 elastic-job-lite-console-${version}.tar.gz 並執行 bin\start.sh(Windows運行.bat)。打開瀏覽器訪問 http://localhost:8899/即可訪問控制台。8899 為默認端口號,可通過啟動腳本輸入-p 自定義端口號。默認管理員用戶名和密碼是 root/root。右上角可以切換語言。
第一步,添加注冊中心,輸入 ZK 地址和命名空間,並連接。
運維平台和 elastic-job-lite 並無直接關系,是通過讀取作業注冊中心數據展現作業狀態,或更新注冊中心數據修改全局配置。控制台只能控制作業本身是否運行,但不能控制作業進程的啟動,因為控制台和作業本身服務器是完全分離的,控制台並不能控制作業服務器。
修改作業:
事件追蹤:
http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/
Elastic-Job 提供了事件追蹤功能,可通過事件訂閱的方式處理調度過程的重要事件,用於查詢、統計和監控。
Elastic-Job-Lite 在配置中提供了 JobEventConfiguration,目前支持數據庫方式配置。在上文的 SimpleJobTest 中已經貼出代碼。
事件追蹤的 event_trace_rdb_url 屬性對應庫自動創建 JOB_EXECUTION_LOG 和 JOB_STATUS_TRACE_LOG 兩張表以及若干索引
需要在運維平台中添加數據源信息,並且連接:
提交后點擊連接,就可以在作業歷史中查看到歷史執行事件:
springboot 集成 Elastic-Job:
1.導入依賴:
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-spring --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency>
2. 定義配置類和任務類中要用到的參數
server.port=${random.int[10000,19999]} regCenter.serverList = 192.168.1.101:2181 regCenter.namespace = ejob-springboot wuzzJob.cron = 0/5 * * * * ? wuzzJob.shardingTotalCount = 2 wuzzJob.shardingItemParameters = 0=0,1=1
3.創建任務
@Component public class SimpleJobDemo implements SimpleJob { public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, %s,任務總片數: %s, " + "當前分片項: %s.當前參數: %s," + "當前任務名稱: %s.當前任務參數 %s", Thread.currentThread().getId(), new SimpleDateFormat("HH:mm:ss").format(new Date()), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
4.注冊中心配置,Bean 的 initMethod 屬性用來指定 Bean 初始化完成之后要執行的方法,用來替代繼承 InitializingBean 接口,以便在容器啟動的時候創建注冊中心。
@Configuration public class ElasticRegCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter( @Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
5.作業三級配置:Core——Type——Lite
@Configuration public class ElasticJobConfig { //注冊中心 @Autowired private ZookeeperRegistryCenter regCenter; @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final SimpleJobDemo simpleJob, @Value("${wuzzJob.cron}") final String cron, @Value("${wuzzJob.shardingTotalCount}") final int shardingTotalCount, @Value("${wuzzJob.shardingItemParameters}") final String shardingItemParameters) { //參數依次是 自定義任務類型實現類、cron表達式、分片數、分片參數 LiteJobConfiguration liteJobConfiguration = getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters); return new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration); } /** * 功能描述: <br> * LiteJobConfiguration 配置 * @Param: [jobClass, cron, shardingTotalCount, shardingItemParameters] * @Return: com.dangdang.ddframe.job.lite.config.LiteJobConfiguration * @Author: wuzhenzhao * @Date: 2020/7/14 17:03 */ private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) { JobCoreConfiguration coreConfig = JobCoreConfiguration //任務名稱、cron表達式、分片數 .newBuilder(jobClass.getName(), cron, shardingTotalCount) .shardingItemParameters(shardingItemParameters).build(); // 作業分片策略 // 基於平均分配算法的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); return LiteJobConfiguration.newBuilder( new SimpleJobConfiguration(coreConfig, jobClass.getCanonicalName())) .jobShardingStrategyClass(jobShardingStrategyClass) //允許重寫配置 .overwrite(true).build(); } }
6.啟動主類即可。
分片策略:
分片項與分片參數:
任務分片,是為了實現把一個任務拆分成多個子任務,在不同的 ejob 示例上執行。例如 100W 條數據,在配置文件中指定分成 10 個子任務(分片項),這 10 個子任務再按照一定的規則分配到 5 個實際運行的服務器上執行。除了直接用分片項 ShardingItem獲取分片任務之外,還可以用 item 對應的 parameter 獲取任務。
定義幾個分片項,一個任務就會有幾個線程去運行它。
注意:分片個數和分片參數要一一對應。通常把分片項設置得比 E-Job 服務器個數大一些,比如 3 台服務器,分成 9 片,這樣如果有服務器宕機,分片還可以相對均勻。
分片驗證:
運行上述 Springboot集成項目。SimpleJob 的分片項改成 2。
打成 jar 包:mvn package -Dmaven.test.skip=true
多實例運行(單機):
- 多運行一個點,任務不會重跑(兩個節點各獲得一個分片項)
- 關閉一個節點,任務不會漏跑
分片策略 http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/ 分片項如何分配到服務器?這個跟分片策略有關。
- AverageAllocationJobShardingStrategy:基於平均分配算法的分片策略,也是默認的分片策略。如果分片不能整除,則不能整除的多余分片將依次追加到序號小的服務器。如果有 3 台服務器,分成 9 片,則每台服務器 分 到 的 分 片 是 : 1=[0,1,2], 2=[3,4,5],3=[6,7,8]。如果有 3 台服務器,分成 8 片,則每台服務器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]。如果有 3 台服務器,分成 10 片,則每台服務器 分 到 的 分 片 是 : 1=[0,1,2,9], 2=[3,4,5],3=[6,7,8]。AverageAllocationJobShardingStrategy 的缺點是,一旦分片數小於作業服務器數,作業將永遠分配至 IP 地址靠前的服務器,導致 IP 地址靠后的服務器空閑。而 OdevitySortByNameJobShardingStrategy 則可以根據作業名稱重新分配服務器負載。如:如果有 3 台服務器,分成 2 片,作業名稱的哈希值為奇數,則每台服務器分到的分片是:1=[0], 2=[1], 3=[]。如果有 3 台服務器,分成 2 片,作業名稱的哈希值為偶數,則每台服務器分到的分片是:3=[0], 2=[1], 1=[]
- OdevitySortByNameJobShardingStrategy:根據作業名的哈希值奇偶數決定 IP 升降序算法的分片策略。根據作業名的哈希值奇偶數決定 IP 升降序算法的分片策略。1.作業名的哈希值為奇數則 IP 升序。2.作業名的哈希值為偶數則 IP 降序。用於不同的作業平均分配負載至不同的服務器。
- RotateServerByNameJobShardingStrategy:根據作業名的哈希值對服務器列表進行輪轉的分片策略。
- 自定義分片策略:實現 JobShardingStrategy 接口並實現 sharding 方法,接口方法參數為作業服務器 IP 列表和分片策略選項,分片策略選項包括作業名稱,分片總數以及分片序列號和個性化參數對照表,可以根據需求定制化自己的分片策略。
設置分片策略:
// 作業分片策略 // 基於平均分配算法的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); // 定義Lite作業根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();
分片方案:
獲取到分片項 shardingItem 之后,怎么對數據進行分片嗯?
- 對業務主鍵進行取模,獲取余數等於分片項的數據,舉例:獲取到的 sharding item 是 0,1在 SQL 中加入過濾條件:where mod(id, 4) in (1, 2)。這種方式的缺點:會導致索引失效,查詢數據時會全表掃描。解決方案:在查詢條件中在增加一個索引條件進行過濾。
- 在表中增加一個字段,根據分片數生成一個 mod 值。取模的基數要大於機器數。否則在增加機器后,會導致機器空閑。例如取模基數是 2,而服務器有 5 台,那么有三台服務器永遠空閑。而取模基數是 10,生成 10 個 shardingItem,可以分配到 5 台服務器。當然,取模基數也可以調整。
- 如果從業務層面,可以用 ShardingParamter 進行分片。例如 0=RDP, 1=CORE, 2=SIMS, 3=ECIF,List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID ='RDP' limit 0, 100。
在 Spring Boot 中要 Elastic-Job 要配置的內容太多了,有沒有更簡單的添加任務的方法呢?比如在類上添加一個注解?這個時候我們就要用到 starter 了。
Elastic Job 原理:
啟動:
由 new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init(); 進入啟動流程。
/** * 初始化作業. */
public void init() { LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); // 設置分片數
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); // 構建任務,創建調度器
JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); // 在 ZK 上注冊任務
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); // 添加任務信息並進行節點選舉
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); // 啟動調度器
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }
registerStartUpInfo 方法
/** * 注冊作業啟動信息. * * @param enabled 作業是否啟用 */
public void registerStartUpInfo(final boolean enabled) { // 啟動所有的監聽器、監聽器用於監聽 ZK 節點信息的變化。
listenerManager.startAllListeners(); // 節點選舉
leaderService.electLeader(); // 服務信息持久化(寫到 ZK)
serverService.persistOnline(enabled); // 實例信息持久化(寫到 ZK)
instanceService.persistOnline(); // 重新分片 shardingService.setReshardingFlag(); // 監控信息監聽器
monitorService.listen(); // 自診斷修復,使本地節點與 ZK 數據一致
if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }
啟動的時候進行主節點選舉:
/** * 選舉主節點. */
public void electLeader() { log.debug("Elect a new leader now."); jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()); log.debug("Leader election completed."); }
Latch 是一個分布式鎖,選舉成功后在 instance 寫入服務器信息。
啟動調度任務則是:
/** * 調度作業. * * @param cron CRON表達式 */
public void scheduleJob(final String cron) { try { if (!scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron)); }
//調用 Quartz 一樣的類進行啟動 scheduler.start(); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } }
任務執行與分片:
關注兩個問題:
- LiteJob 是怎么被執行的?
- 分片項是怎么分配給不同的服務實例的?
在創建 Job 的時候(createJobDetail),創建的是實現了 Quartz 的 Job 接口的 LiteJob 類,LiteJob 類實現了 Quartz 的 Job 接口。在 LiteJob 的 execute 方法中獲取對應類型的執行器,調用 execute()方法。
/** * 獲取作業執行器. * * @param elasticJob 分布式彈性作業 * @param jobFacade 作業內部服務門面服務 * @return 作業執行器 */ @SuppressWarnings("unchecked") public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) { if (null == elasticJob) { return new ScriptJobExecutor(jobFacade); } if (elasticJob instanceof SimpleJob) { return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade); } if (elasticJob instanceof DataflowJob) { return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade); } throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName()); }
EJOB 提供管理任務執行器的抽象類 AbstractElasticJobExecutor,核心動作在execute()方法中執行。
/** * 執行作業. */
public final void execute() { // .....省略代碼
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); // .....省略代碼
}
然后回走到一個分片執行的 process 方法內:
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) { Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet(); if (1 == items.size()) {// 只有一個分片項時,直接執行
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next(); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item); process(shardingContexts, item, jobExecutionEvent); return; } final CountDownLatch latch = new CountDownLatch(items.size()); // 本節點遍歷執行相應的分片信息
for (final int each : items) { final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each); if (executorService.isShutdown()) { return; } executorService.submit(new Runnable() { @Override public void run() { try { process(shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } } }); } try {// 等待所有的分片項任務執行完畢
latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } }
這里會一直走到 AbstractElasticJobExecutor 的 process 方法
交給具體的實現類(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去處理。這里使 SimpleJobExecutor,最后調到我們實現類的 execute 方法。
public final class SimpleJobExecutor extends AbstractElasticJobExecutor { private final SimpleJob simpleJob; public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) { super(jobFacade); this.simpleJob = simpleJob; } @Override protected void process(final ShardingContext shardingContext) { simpleJob.execute(shardingContext); } }
失效轉移:
所謂失效轉移,就是在執行任務的過程中發生異常時,這個分片任務可以在其他節點再次執行。
FailoverListenerManager 監聽的是 zk 的 instance 節點刪除事件。如果任務配置了 failover 等於 true,其中某個 instance 與 zk 失去聯系或被刪除,並且失效的節點又不是本身,就會觸發失效轉移邏輯。Job 的失效轉移監聽來源於 FailoverListenerManager 中內部類JobCrashedJobListener 的 dataChanged 方法。當節點任務失效時會調用 JobCrashedJobListener 監聽器,此監聽器會根據實例 id獲取所有的分片,然后調用 FailoverService 的 setCrashedFailoverFlag 方法,將每個分片 id 寫到/jobName/leader/failover/items 下,例如原來的實例負責 1、2 分片項,那么 items 節點就會寫入 1、2,代表這兩個分片項需要失效轉移。
class JobCrashedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) { String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1); if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) { return; }
// 獲取到失效的分片集合 List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId); if (!failoverItems.isEmpty()) { for (int each : failoverItems) {
// 設置失效的分片項標記 failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } else { for (int each : shardingService.getShardingItems(jobInstanceId)) { failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } } } }
然后接下來調用 FailoverService 的 failoverIfNessary 方法,首先判斷是否需要失敗轉移,如果可以需要則只需作業失敗轉移。
/** * 如果需要失效轉移, 則執行作業失效轉移. */ public void failoverIfNecessary() { if (needFailover()) { jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback()); } } private boolean needFailover() { return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT)
&& !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty() && !JobRegistry.getInstance().isJobRunning(jobName); }
條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。條件二:當前作業不在運行中。
/** * 在主節點執行操作. * * @param latchNode 分布式鎖使用的作業節點名稱 * @param callback 執行操作的回調 */
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); //CHECKSTYLE:OFF
} catch (final Exception ex) { //CHECKSTYLE:ON
handleException(ex); } }
緊接着調用 FailoverLeaderExecutionCallback 的 execute方法:
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { //再次判斷是否需要失效轉移;
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) { return; } //從注冊中心獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項;
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0)); log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem); // 在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover` 注冊作業分片項為當前作業節點;
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); // 然后移除任務轉移分片項
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem)); // TODO 不應使用triggerJob, 而是使用executor統一調度
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName); if (null != jobScheduleController) { //最后調用執行,提交任務。
jobScheduleController.triggerJob(); } } }