(繼續貼一篇之前寫的經驗案例)
elastic-job lite 編程實戰經驗
其實這是一次失敗的項目,雖然最后還是做出來了,但是付出了很大代價。並且需要較深入的踩坑改造elastic-job,導致代碼的可讀性,可維護性也很差。
事實證明 elastic-job lite 並不適合用於做 需要長時間運行(可能幾小時或幾天不停止)的作業調度。
一、 elastic-job 簡介
Elastic-Job是當當推出的一個開源分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。Elastic-Job-Lite定位為輕量級無中心化解決方案。
詳見官網介紹,傳送門http://elasticjob.io/docs/elastic-job-lite/00-overview/intro/
本文從編程踩坑和大量測試中提煉,講解目的在於幫助有基礎的開發者理解elastic-job真實的運行邏輯,解答編程中官方文檔未提及的疑惑,避免重復踩坑。基礎的概念和高深的主從節點選舉失效轉移不是本文要講的內容,請移步官方文檔,本文更專注於怎么用對。
二、 elastic-job 與 zookeeper
elastic-job 基於zk實現分布式協調,重要的作業信息都被存儲在了zk上,如圖:
algosmElasticJobs 節點是自定義的命名空間的名稱。
nameLib-295是作業名
servers 通過子節點記錄作業在哪幾台機器上正在運行,一個機器對應一個子節點,運行結束后,servers子節點會被刪除。
instances 記錄了job實例與機器的關系
config 存儲作業的配置信息
sharding 展開 是作業的分片信息,及每個分片運行在哪個機器上,與運行狀態。
每個作業對應命名空間algosmElasticJobs下的一個子節點(節點名稱為作業名)
三、 elastic-job 作業、分片與線程
作業運行起來后,服務器、實例、線程、分片的關系圖:
(2台服務器,每台運行2個作業,每個作業4個分片)
1)作業 job : 實現作業邏輯的class類,需重寫execute(ShardingContext shardingContext) 方法。
elastic-job創建一個作業時,會在當前服務器上拉起一個作業class的實例,並需要為該實例指定唯一的作業名稱。(如下圖所示作業名稱各不相同)
同一個class可以創建多個實例,從而生成多個作業(各個作業的名稱不同,作業自定義配置也可設置的不一樣)。
如下圖 nameLib 開頭的是由ClassA 生成, makelabel是由ClassB生成。
2)分片 sharding:
創建作業時,需要設置這個作業有多少個分片來執行,如果作業A在機器1上獲得了兩個分片,那么這兩個分片實際上是兩個線程(注意是一台機器上獲得兩個分片,每台機器上裝一個elastic-job 服務的情況下),這兩個線程共用的是同一個class實例,也就是說這兩個分片 會共享 此class實例的成員變量。分片1修改了實例的成員變量,就會被分片2讀到,從而影響分片2的作業邏輯。
如果想要為每個分片設置獨享的變量,從而不受到其他分片影響,那么就需要用到線程變量。
方法是,在該class中定義線程變量,用法如下:
/**
* 與線程相關的變量, key 線程號
*/
private Map<Long,JobBaseParam> threadParam = new ConcurrentHashMap<>();
//初始化線程變量
private void initParam(ShardingContext shardingContext){
//elasticJob 單實例多線程,每次拉起需要清理線程上次殘留的狀態
JobBaseParam jobBaseParam = new JobBaseParam();
jobBaseParam.setShardingItem(shardingContext.getShardingItem());
jobBaseParam.setCompletedActive(false);
jobBaseParam.setOver(true);
jobBaseParam.setReceiveOver(false);
threadParam.put(Thread.currentThread().getId(),jobBaseParam);
jobName = shardingContext.getJobName();
}
//使用線程變量
threadParam.get(Thread.currentThread().getId()).setCompletedActive(true);
threadParam.get(masterThreadId).getReceiveOver()
線程變量使用時,需小心:在哪個線程里threadParam.put的變量,就需要在哪個線程里threadParam.get,例如在主線程里put變量,然后在子線程里get變量,就會get不到,產生邏輯錯誤。
關於作業Class中的靜態變量,該靜態變量將會被由改class new出來的所有作業分片讀到,作用域范圍最大。
jobClass不同變量作用域:
變量類型 |
靜態變量 |
成員變量 |
線程變量 |
作用域 |
所有作業實例、所有分片 |
當前作業實例的所有分片 |
當前分片 |
如果想要長久保留分片要用的變量,每次分片拉起時自動從上一次狀態繼續,可以將與分片相關的變量存儲到zk上,作業對應的分片節點下面,類似:
algosm是自定義的前綴標識,以與elastic-job管理的節點區分。注意分片下加自定義節點,是不會影響elastic-job運行的,也不會被elastic-job 清除,是可行的方案。
四、 elastic-job 分片與失效轉移
想要作業A失效轉移生效,前提是每台服務器上都要在運行着作業A。
分片序號從0開始,當服務器1同時獲得兩個分片,分片1執行完畢,分片2未結束的情況下,分片1不會被再次觸發,一直要等到分片2結束。
經典模式,正常運行時,會隨機分片,導致作業分片在不同機器上切換。
作業的兩個分片在同一台服務器上時,分片1與分片2用的是同一實例,不同線程,若有狀態殘留在實例的成員變量中,需要小心,建議分片每次運行都要初始化一次狀態。
五、 elastic-job 作業重啟恢復
elastic-job 如果發生重啟,是不會自動將作業拉起的,雖然其作業配置存儲到了zk上,需要自行實現重啟,拉起作業功能。
實現要點,是要判斷作業是否為異常結束,非正常銷毀的作業,會在servers節點下殘留子節點,如果servers的子節點不為空,說明是異常停止,需要被拉起
config節點中存儲了作業的配置信息
實戰代碼如下:
(代碼中jobPool是自行實現的一個作業池用來管理作業的實例,jobPoolLock是自行實現的細粒度鎖)
@Override
public Boolean loadPreJob(){
try {
String rootPath = "/"+algosmJobConfig.getRegCenterNamespace();
if(!zkClient.exists(rootPath)){
logger.info("初次運行,未發現{}",rootPath);
return true;
}
List<String> zkJobList = zkClient.getChildren(rootPath);
if(!CollectionUtils.isEmpty(zkJobList)){
for(String jobName : zkJobList){
try{
String jobPath = rootPath+"/"+jobName;
//只要servers 不是空的,就說明作業非正常終止,需要將作業拉起來
if(!CollectionUtils.isEmpty(zkClient.getChildren(jobPath+"/servers"))){
//說明是非正常結束的job,需要拉起
String jobConfig = zkClient.readData(jobPath+"/config",true);
if(StringUtils.isEmpty(jobConfig)){
logger.warn("job=[{}] config為空",jobName);
}else{
//取出參數
JsonNode jobNode = jsonTool.readTree(jobConfig);
String jobParam = jobNode.get("jobParameter").textValue();
//創建任務
String[] jobInfo = jobName.split("-");
if(jobInfo.length != 2 || AlgosmJobType.trans(jobInfo[0])==null){
logger.warn("jobName={} 命名非法",jobName);
}else{
jobPoolLock.lock(jobName);
try{
logger.info("初始化-加載job {}",jobName);
if(jobPool.containsKey(jobName)){
logger.warn("job={}已存在,跳過!",jobName);
}else{
JobEntityConfig jobEntityConfig = jobControl.createJob(jobName,AlgosmJobType.trans(jobInfo[0]),jobParam);
JobScheduler jobScheduler = new SpringJobScheduler(jobEntityConfig.getJobEntity(),
zookeeperRegistryCenter, jobEntityConfig.getJobConfiguration(), new AlgosmElasticJobListener());
jobScheduler.init();
jobPool.put(jobName,jobEntityConfig.getJobEntity());
logger.info("加載job={}成功! param={}",jobName,jobParam);
}
}finally {
jobPoolLock.unlock(jobName);
}
}
}
}
}catch (Exception e){
logger.error("加載作業失敗!jobName={}",jobName,e);
continue;
}
}
}
}catch (Exception e){
logger.error("初始化加載作業失敗!",e);
return false;
}
return true;
}
六、 elastic-job 分布式作業控制與任務狀態統計
如何控制所有機器上作業的啟停,如何獲取當前作業的運行狀態,考慮到作業是運行在多台機器上的,所以掛了一台,作業並不算停止。作業運行也並非所有機器都在跑就算運行,作業跑在不同機器上,每個機器上又可能不止一個分片所有分片的任務統計數據疊加,才算是作業准確的統計數據。
這塊是需要自行實現的,elastic-job是不支持的。筆者已實現該部分功能,限於篇幅與時間限制,等下篇再講述。(賣個關子,從下圖分片中的自定義節點命名可看出一二)。