DolphinScheduler源碼分析


DolphinScheduler源碼分析

本博客是基於1.2.0版本進行分析,與最新版本的實現有一些出入,還請讀者辯證的看待本源碼分析。具體細節可能描述的不是很准確,僅供參考

源碼版本

1.2.0

技術框架

所有模塊均采用比較流行的SprintBoot框架

架構圖

DolphinScheduler架構圖

重要概念

流程定義

在DolphinScheduler中,作業的DAG被命名為“流程定義”。

挺不可思議的。

流程實例

流程實例是流程定義的實例化,可以通過手動啟動或定時調度生成,流程定義每運行一次,產生一個流程實例。流程實例由Master解析流程定義生成。

任務實例

任務實例是流程定義中任務節點的實例化,標識着具體的任務執行狀態。

居然沒有作業的概念,直接上來任務實例的概念,真是匪夷所思。

定時

DAG的觸發頻率。與DAG概念隔離,單獨創建、單獨管理,一個DAG可以沒有與之對應的定時。

架構說明

Quartz

內部對Quartz進行了一個封裝,org.apache.dolphinscheduler.server.quartz.QuartzExecutors僅僅提供增加、刪除作業的基礎功能。其作業的狀態等信息保存在數據庫中以QRTZ_開頭的表。

為了將實際作業的定義與Quartz隔離,抽象了一個ProcessScheduleJob類,用它來創建JobDetail。

該類僅僅是根據流程定義的定時等信息創建了一個CommandType.SCHEDULER類型的Command對象,然后插入了數據庫,並沒有的執行任務的具體邏輯。

MasterSchedulerThread

架構圖中有一個CommandScanner,對應到源碼中就是org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread類。

這是一個掃描線程,定時掃描數據庫中的 t_ds_command 表,根據不同的命令類型進行不同的業務操作。掃描的SQL如下:

select command.* from t_ds_command command join t_ds_process_definition definition on command.process_definition_id = definition.id where definition.release_state = 1 AND definition.flag = 1 order by command.update_time asc limit 1 

定時的默認是1秒,由Constants.SLEEP_TIME_MILLIS設置。Command的創建與執行是異步的。

MasterSchedulerThread類查詢到一個Comamand后將其轉化為一個ProcessInstance,交由MasterExecThread進行執行。

MasterSchedulerThread功能比較簡單,就是負責銜接Quartz創建的Command,一個橋梁的作用。

MasterExecThread

org.apache.dolphinscheduler.server.master.runner.MasterExecThread負責執行ProcessInstance,功能主要是DAG任務切分、任務提交監控等其他邏輯處理。

其實DAG切割也比較簡單,首先找入度為0的任務(也就是沒有任務依賴),放到准備提交隊列;任務執行成功后,掃描后續的任務,如果該任務的所有依賴都成功,則執行該任務;循環處理。MasterExecThread隨着DAG中所有任務的執行結束而結束。

一個任務執行,會分別占用master和worker各一個線程,這一點不太好。

同樣,該線程在一個邏輯處理結束后,也會休眠1秒,由Constants.SLEEP_TIME_MILLIS設置。

當然在MasterExecThread中,也沒有執行具體的任務邏輯,只是創建了一個MasterTaskExecThread負責任務的“執行”。

MasterTaskExecThread

org.apache.dolphinscheduler.server.master.runner.MasterTaskExecThread由MasterExecThread負責創建。其功能主要就是負責任務的持久化,簡單來說就是把TaskInstacne信息保存到數據庫中,同時如果一個任務滿足執行條件,也會把任務ID提交到TaskQueue中的。

這個線程會每隔1秒(Constants.SLEEP_TIME_MILLIS設置)查詢作業的狀態,直到作業執行完畢(不管是成功還是失敗)。

這樣來看,一個任務執行,會占用master2個線程。

TaskQueue

架構圖中Master/Worker通信的重要渠道,它把待執行的隊列放到了TaskQueue,由Worker獲取到之后,執行具體的業務邏輯。根據技術架構介紹,這個TaskQueue是由Zookeeper實現。由此也可以看出,Master、Worker是沒有直接的物理交互的。

FetchTaskThread

org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread循環從TaskQueue中獲取任務,並根據不同任務類型調用TaskScheduleThread對應執行器。每次循環依舊休眠1秒。

FetchTaskThread會一次性查詢所有任務,檢查當前是否有任務。這個設計有點不合理。

如果當前有可執行的任務,則一次性取出當前節點剩余可執行任務數量的任務ID。

根據任務ID查詢創建TaskInstance,交由TaskScheduleThread具體執行。

由此可見FetchTaskThread每個Worker只有一個,TaskScheduleThread會有很多個。

TaskScheduleThread

org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread負責任務的具體執行。該線程的邏輯比較清晰,就是構造獲取任務相關的文件、參數等信息,創建Process類,執行對應的命令行,然后等待其執行完畢,獲取標准輸出、標准錯誤輸出、返回碼等信息。

LoggerServer

org.apache.dolphinscheduler.server.rpc.LoggerServer跟Worker、Master屬於同一級別,都是需要單獨啟動的進程。這就是一個RPC服務器,提供日志分片查看、刷新和下載等功能。

項目結構

模塊

  1. dolphinscheduler-ui 前端頁面模塊
  2. dolphinscheduler-server 核心模塊。包括master/worker等功能
  3. dolphinscheduler-common 公共模塊。公共方法或類
  4. dolphinscheduler-api Restful接口。前后端交互層,與master/worker交互等功能
  5. dolphinscheduler-dao 數據操作層。實體定義、數據存儲
  6. dolphinscheduler-alert 預警模塊。與預警相關的方法、功能
  7. dolphinscheduler-rpc 日志查看。提供日志實時查看rpc功能
  8. dolphinscheduler-dist 與編譯、分發相關的模塊。沒有具體邏輯功能

源碼分析方法

  1. UI功能不分析
  2. 從與UI交互的API模塊開始着手看
  3. 重點分析核心功能
  4. 非核心功能僅做了解

模塊-dolphinscheduler-api

API接口層,主要負責處理前端UI層的請求。該服務統一提供RESTful api向外部提供請求服務。 接口包括工作流的創建、定義、查詢、修改、發布、下線、手工啟動、停止、暫停、恢復、從該節點開始執行等等。

涉及的API太多,不宜深入研究,只研究其大致框架、功能。具體的API列表及其使用方法可查看官方文檔

啟動入口

org.apache.dolphinscheduler.api下面有兩個類:ApiApplicationServer、CombinedApplicationServer。

從ApiApplicationServer來看就是啟動一個SpringBoot應用。

CombinedApplicationServer除了啟動一個SprintBoot應用之外,還啟動了LoggerServer、AlertServer。

@SpringBootApplication @ConditionalOnProperty(prefix = "server", name = "is-combined-server", havingValue = "true") @ServletComponentScan @ComponentScan("org.apache.dolphinscheduler") @Import({MasterServer.class, WorkerServer.class}) @EnableSwagger2 public class CombinedApplicationServer extends SpringBootServletInitializer { public static void main(String[] args) throws Exception { ApiApplicationServer.main(args); LoggerServer server = new LoggerServer(); server.start(); AlertServer alertServer = AlertServer.getInstance(); alertServer.start(); } } 

CombinedApplicationServer與ApiApplicationServer的區別:是否內嵌LoggerServer、AlertServer。而且當server.is-combined-server為true時,會自動啟動CombinedApplicationServer。

也不知道是否內嵌的意義在哪里,直接內嵌不好么?

對於SpringBoot應用,接口一般都在controller中。org.apache.dolphinscheduler.api.controller包有以下幾個Controller: AccessTokenController ProcessInstanceController AlertGroupController ProjectController BaseController QueueController DataAnalysisController ResourcesController DataSourceController SchedulerController ExecutorController TaskInstanceController LoggerController TaskRecordController LoginController TenantController MonitorController UsersController ProcessDefinitionController WorkerGroupController

因為在DolphinScheduler調度中最重要的一個概念就是流程定義,所以我們從ProcessDefinitionController入手簡要分析這個模塊的基本功能。

ProcessDefinitionController

流程定義接口API列表

官方文檔中,可以看到org.apache.dolphinscheduler.api.controller.ProcessDefinitionController大概有14個接口。

ProcessDefinitionController中只有一個字段ProcessDefinitionService,從名稱以及自身經驗來看,可以知道ProcessDefinitionController會負責HTTP請求的參數解析、參數校驗、返回值等等,與業務無關的邏輯;具體的業務邏輯會交給ProcessDefinitionService類處理。

由此我們可以類比分析其他所有的controller,都會有一個對應的service處理業務相關的邏輯。

public Result createProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "name", required = true) String name, @RequestParam(value = "processDefinitionJson", required = true) String json, @RequestParam(value = "locations", required = true) String locations, @RequestParam(value = "connects", required = true) String connects, @RequestParam(value = "description", required = false) String description) { try { logger.info("login user {}, create process definition, project name: {}, process definition name: {}, " + "process_definition_json: {}, desc: {} locations:{}, connects:{}", loginUser.getUserName(), projectName, name, json, description, locations, connects); Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, json, description, locations, connects); return returnDataList(result); } catch (Exception e) { logger.error(Status.CREATE_PROCESS_DEFINITION.getMsg(), e); return error(Status.CREATE_PROCESS_DEFINITION.getCode(), Status.CREATE_PROCESS_DEFINITION.getMsg()); } } 

上面是createProcessDefinition的源碼,邏輯比較清晰,就是接收、校驗HTTP的參數,然后調用processDefinitionService.createProcessDefinition函數,返回結果、處理異常。

但這段代碼有一個controller與service分隔不清的地方:HTTP返回的結果由誰處理。此處返回結果是由service負責的,service會創建一個Map<String, Object>類型的result字段,然后調用result.put("processDefinitionId",processDefine.getId());設置最終返回的數據。其實個人是不敢苟同這種做法的,嚴格來說,service只返回與業務相關的實體,HTTP具體返回什么信息應該交由controller處理。

ProcessDefinitionService

org.apache.dolphinscheduler.api.service.ProcessDefinitionService承擔流程定義具體的CURD邏輯,調用各種mapper、dao。

public Map<String, Object> createProcessDefinition(User loginUser, String projectName, String name, String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException { Map<String, Object> result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); // check project auth Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return checkResult; } ProcessDefinition processDefine = new ProcessDefinition(); Date now = new Date(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { return checkProcessJson; } processDefine.setName(name); processDefine.setReleaseState(ReleaseState.OFFLINE); processDefine.setProjectId(project.getId()); processDefine.setUserId(loginUser.getId()); processDefine.setProcessDefinitionJson(processDefinitionJson); processDefine.setDescription(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); processDefine.setTimeout(processData.getTimeout()); processDefine.setTenantId(processData.getTenantId()); //custom global params List<Property> globalParamsList = processData.getGlobalParams(); if (globalParamsList != null && globalParamsList.size() > 0) { Set<Property> globalParamsSet = new HashSet<>(globalParamsList); globalParamsList = new ArrayList<>(globalParamsSet); processDefine.setGlobalParamList(globalParamsList); } processDefine.setCreateTime(now); processDefine.setUpdateTime(now); processDefine.setFlag(Flag.YES); processDefineMapper.insert(processDefine); putMsg(result, Status.SUCCESS); result.put("processDefinitionId",processDefine.getId()); return result; } 

研讀上面代碼我們知道createProcessDefinition大概有以下功能:

  1. 校驗當前用戶是否擁有所屬項目的權限
  2. 校驗流程定義JSON是否合法。例如是否有環
  3. 構造ProcessDefinition對象插入數據庫
  4. 設置HTTP返回結果

因為這些都不是核心邏輯,都不再深入展開。

ProcessDefinitionService的功能非常不合理,居然還有鑒權的功能,按照我的理解,有一個校驗、插入數據庫的功就可以了,其他的功能都可以拋出去

dolphinscheduler-api其他的功能都不在分析,因為到此流程定義信息已經寫入到了數據庫,跟API模塊已經沒有關系了。但需要知道ProcessDefinition對象插入到了哪張表,這樣才知道如何查詢、更新這個表的。這個表就是前后台邏輯交互的關鍵。從ProcessDefinition定義可以看出,數據最終插入了t_ds_process_definition表。

@Data @TableName("t_ds_process_definition") public class ProcessDefinition 

其實也可以不用關注具體插入到了哪張表,好像只需要關系哪個地方用ProcessDefinitionMapper查詢了數據就行了。

但根據之前的概念定義,我們知道每個流程定義是需要靠“定時”周期性觸發的,這樣的話我們可以猜測,系統並不會直接用ProcessDefinitionMapper查詢流程定義,而是會根據定時關聯的ProcessDefinition來調起DAG。這一點在MasterSchedulerThread的分析中已經可以看出來了。

SchedulerController、SchedulerService

考慮到Controler邏輯非常簡單(不合理),此處將controller和service合並分析。

同樣SchedulerController幾乎沒有什么邏輯,全都交給了SchedulerService層。這里只分析SchedulerService.insertSchedule,簡單瀏覽代碼后,可以發現它跟createProcessDefinition邏輯差不多:

  1. 校驗當前用戶是否擁有所屬項目的權限
  2. 校驗流程定義JSON是否合法。例如是否有環
  3. 構造Schedule對象插入數據庫
  4. 設置HTTP返回結果

當然除了上面4點還查詢、更新了ProcessDefinition,主要是將Schedule和ProcessDefinition進行關聯。

MasterSchedulerThread

MasterSchedulerThread

以上是MasterSchedulerThread類的概覽圖。

MasterSchedulerThread實現Runnable接口,很明顯主要的邏輯應該在run方法內,而且根據經驗以及前面的分析可以知道,這個方法內是一個“死”循環,且為了避免CPU飆升,會休眠一小段時間。

下面我們逐步展開、分析MasterSchedulerThread類

 MasterSchedulerThread-run

從上圖簡單分析,總結一下run的邏輯:

  1. 調用OSUtils.checkResource,檢查當前資源(內存、CPU)。
  2. 資源超出閾值,則休眠1秒進入下一次循環。
  3. 檢查zookeeper是否連接成功
  4. 獲取一個InterProcessMutex鎖(分布式的公平可重入互斥鎖)。也就是只有一個master可以獲取到這個鎖
  5. 查詢一個Command,不為null時進行后續邏輯。
  6. 休眠1秒,進入下一次循環
  7. 進入下一次循環之前,釋放InterProcessMutex鎖

在深入分析run之前,先簡單分析一下 Stopper.isRunning() 的邏輯。

/** * if the process closes, a signal is placed as true, and all threads get this flag to stop working */ public class Stopper { private static volatile AtomicBoolean signal = new AtomicBoolean(false); public static final boolean isStoped(){ return signal.get(); } public static final boolean isRunning(){ return !signal.get(); } public static final void stop(){ signal.getAndSet(true); } } 

其邏輯非常簡單,就是用一個原子布爾值,標志當前進程是否要退出。如果收到了退出信號,則signal為true,該進程內所有的線程都退出當前循環。

下面我們來分析查詢到一個Command之后的邏輯:

if (command != null) { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance,processDao)); } }catch (Exception e){ logger.error("scan command error ", e); processDao.moveToErrorCommand(command, e.toString()); } } 

其實就是根據Command創建了一個ProcessInstance(流程實例),之前也分析過,流程定義是由Scheduler自動創建的,而Quartz已經根據Schedule信息創建了Command保存到了數據庫。至此,流程定義與定時的關聯邏輯就已經串起來了。

創建流程實例的時候傳入了當前可用(masterExecThreadNum - activeCount)的線程數量,如果滿足當前dag,則返回ProcessInstance,否則返回null。

ProcessInstance最終交由MasterExecThread去執行。

至此MasterSchedulerThread類的主要邏輯如下:

  1. 調用OSUtils.checkResource,檢查當前資源(內存、CPU)。
  2. 資源超出閾值,則休眠1秒進入下一次循環。
  3. 檢查zookeeper是否連接成功
  4. 獲取一個InterProcessMutex鎖(分布式的公平可重入互斥鎖)。也就是只有一個master可以獲取到這個鎖
  5. 查詢一個Command,如果當前線程數夠用,則創建一個流程實例(ProcessInstance),交給MasterExecThread線程處理。
  6. 休眠1秒,進入下一次循環
  7. 進入下一次循環之前,釋放InterProcessMutex鎖

在結束MasterExecThread的源碼分析之前,我們再簡要分析一下這個類比較重要的一個字段:processDao。

ProcessDao

這個類,可以看成是與流程定義相關的操作集合,與流程定義存儲相關的操作、邏輯的集合。

processDao.moveToErrorCommand需要稍微注意一下,在異常情況下,它把Command從原來的表中刪除,然后插入到了t_ds_error_command表。

但個人感覺其定義不是非常清晰。如果是mapper的一個全集,則其他任何地方都不應該再調用mapper,事實又不是這樣;如果只是流程定義相關的操作, 其功能又過於大。

MasterExecThread

與MasterSchedulerThread一樣,MasterExecThread也是實現了Runnable的線程類,不過我們先來看MasterExecThread的構造函數。

public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){ this.processDao = processDao; this.processInstance = processInstance; int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS, Constants.defaultMasterTaskExecNum); this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread", masterTaskExecNum); } 

taskExecService這個字段非常重要,它是一個固定大小(20)的后台線程池。這意味着,一個DAG最大的並發任務數就是20。

另外細心的讀者發現,conf字段是一個static字段,在static代碼塊初始化的。為啥不從MasterSchedulerThread傳過來呢?我還以為要自動reload呢,結果也沒有。

static {
    try {
        conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
    }catch (ConfigurationException e){
        logger.error("load configuration failed : " + e.getMessage(),e); System.exit(1); } } 

配置這種字段,完全可以全局唯一,到處傳參,沒必要在new一個。一般情況下這個類的內容也不會修改。

下面分析該類的run方法。

@Override public void run() { // process instance is null  if (processInstance == null){ logger.info("process instance is not exists"); return; } // check to see if it's done  if (processInstance.getState().typeIsFinished()){ logger.info("process instance is done : {}",processInstance.getId()); return; } try {  if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){ // sub process complement data executeComplementProcess(); }else{ // execute flow executeProcess(); } }catch (Exception e){ logger.error("master exec thread exception: " + e.getMessage(), e); logger.error("process execute failed, process id:{}", processInstance.getId()); processInstance.setState(ExecutionStatus.FAILURE); processInstance.setEndTime(new Date()); processDao.updateProcessInstance(processInstance); }finally { taskExecService.shutdown(); // post handle postHandle(); } } 

分析源碼后,簡要總結其邏輯如下:

  1. 判斷processInstance是否為null。為null則退出
  2. 判斷processInstance是否已經完成(成功、報錯、取消、暫停、等待)
  3. 判斷是否為補數。是則走補數的邏輯
  4. 執行當前流程定義實例(executeProcess)
  5. 調用taskExecService.shutdown(),等待所有線程正常退出

感覺第一步有點多此一舉。

executeProcess按順序調用了prepareProcess、runProcess、endProcess三個方法,簡單來說就是初始化、執行、釋放資源。 prepareProcess又按順序調用了initTaskQueue、buildFlowDag。

initTaskQueue就是一些資源的初始化操作,比如通過流程定義ID查詢到當前的任務實例。下面是其核心邏輯,可以發現,就是查詢了完成的任務列表,報錯且不能重試的任務列表。

List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){ if(task.isTaskComplete()){ completeTaskList.put(task.getName(), task); } if(task.getState().typeIsFailure() && !task.taskCanRetry()){ errorTaskList.put(task.getName(), task); } } 

buildFlowDag看名字應該是生成DAG實例的,代碼雖短,但調用了好幾個函數,我們只重點分析最后一個函數調用。

private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson()); // generate process to get DAG info List<String> recoveryNameList = getRecoveryNodeNameList(); List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(), startNodeNameList, recoveryNameList, processInstance.getTaskDependType()); if(processDag == null){ logger.error("processDag is null"); return; } // generate process dag dag = DagHelper.buildDagGraph(processDag); } 

DagHelper.buildDagGraph生成了一個DAG對象實例,根據名字和注釋猜測,這應該是對有向無環圖的一個抽象。

/** * the object of DAG */ private DAG<String,TaskNode,TaskNodeRelation> dag; 

來看下DAG類的定義

/** * analysis of DAG * Node: node * NodeInfo:node description information * EdgeInfo: edge description information */ public class DAG<Node, NodeInfo, EdgeInfo> 

DAG有三個類型參數,分別代表節點key、節點信息、邊信息。

下面是TaskNode的字段

 TaskNode

如果讀者用過DolphinScheduler的UI的話,發現TaskNode的字段跟UI一一對應。

TaskNodeRelation

TaskNodeRelation代表邊的信息,字段比較少,只有startNode、endNode兩個String類型的字段。這其實是DAG類的第一個類型參數,節點的key。

public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) { DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>(); /** * add vertex */ if (CollectionUtils.isNotEmpty(processDag.getNodes())){ for (TaskNode node : processDag.getNodes()){ dag.addNode(node.getName(),node); } } /** * add edge */ if (CollectionUtils.isNotEmpty(processDag.getEdges())){ for (TaskNodeRelation edge : processDag.getEdges()){ dag.addEdge(edge.getStartNode(),edge.getEndNode()); } } return dag; } 

上面是buildDagGraph的源碼。可以看出,增加節點時,第一個參數是TaskNode的getName。跟猜測的一樣,DAG的第一個參數就是node的key,而key就是名稱。

細心的讀者一定發現,DAG對象是根據ProcessDag來創建的,二者有啥區別呢?

ProcessDag

其實個人感覺區別不大,非要說區別的話就是DAG把節點、邊的一個List轉化成了一個Graph。

初始化完成之后,來看一下具體如何執行流程定義的。

runProcess

這個方法源碼很長,我們首先從整體簡要分析。

  1. submitPostNode(null)
  2. 起一個while循環,直至流程定義實例停止(成功、失敗、取消、暫停、等待)
  3. 首先判斷是否超時,超時則發送預警郵件
  4. 獲取當前活動的任務節點的Map。key是MasterBaseTaskExecThread對象,value是Future<Boolean>。value其實是MasterBaseTaskExecThread線程的當前狀態。
  5. 如果當前任務實例已經結束,則從Map中移除
  6. 如果當前任務實例成功,則put到completeTaskList且調用submitPostNode(task.getName())
  7. 如果當前任務實例失敗,則重試;否則直接結束(比如手動停止或暫停)
  8. 更新當前流程定義實例的狀態,進入下一個循環

其中activeTaskNode是一個非常重要的對象,從上面的分析中,可以猜測,activeTaskNode是由submitPostNode間接生成賦值的,並通過while循環驅動了整個流程實例的執行。

private void submitPostNode(String parentNodeName){ List<TaskInstance> submitTaskList = null; if(parentNodeName == null){ submitTaskList = getStartSubmitTaskList(); }else{ submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName); } // if previous node success , post node submit for(TaskInstance task : submitTaskList){ if(readyToSubmitTaskList.containsKey(task.getName())){ continue; } if(completeTaskList.containsKey(task.getName())){ logger.info("task {} has already run success", task.getName()); continue; } if(task.getState().typeIsPause() || task.getState().typeIsCancel()){ logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString()); }else{ addTaskToStandByList(task); } } } 

submitPostNode的源碼細節不再深入分析,大概就是從dag對象中找出入度為0的節點,放入到准備隊列中。其實在runProcess方法中,還調用了submitStandByTask方法,該方法最終調起了可以執行的節點。從這點來看,整個流程實例由submitPostNode、submitStandByTask和while驅動。

那么問題來了,流程實例的任務具體是怎么調起來的呢?下面是submitStandByTask方法中調用的最重要的函數,也是由它調起來的。

/**
 * submit task to execute * @param taskInstance task instance * @return TaskInstance */ private TaskInstance submitTaskExec(TaskInstance taskInstance) { MasterBaseTaskExecThread abstractExecThread = null; if(taskInstance.isSubProcess()){  abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance); }else {  abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance); } Future<Boolean> future = taskExecService.submit(abstractExecThread); activeTaskNode.putIfAbsent(abstractExecThread, future);  return abstractExecThread.getTaskInstance(); } 

邏輯也比較簡單,就是把TaskInstance交給MasterTaskExecThread去執行;taskExecService提交之后,放到activeTaskNode列表,交由主邏輯判斷任務是否完成。

MasterTaskExecThread

根據其定義,我們知道MasterTaskExecThread繼承了MasterBaseTaskExecThread,且構造函數簡單的調用了父類的構造函數。

public class MasterTaskExecThread extends MasterBaseTaskExecThread 

MasterBaseTaskExecThread的構造函數也比較簡單,給幾個關鍵的字段賦初始值。

/**
 * constructor of MasterBaseTaskExecThread * @param taskInstance task instance * @param processInstance process instance */ public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ this.processDao = BeanContext.getBean(ProcessDao.class); this.alertDao = BeanContext.getBean(AlertDao.class); this.processInstance = processInstance; this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); this.cancel = false; this.taskInstance = taskInstance; } 

但processDao、alertDao居然是通過BeanContext.getBean獲取到的!!!個人感覺這是一個非常惡心的設計。一個優秀的設計,應該是類的創建者負責子類的參數及其功能的邊界。BeanContext.getBean擴展了所有類與SpringBoot的ApplicationContext間接打交道的能力,而且無法控制,因為只要調用BeanContext.getBean都可以獲取到對應的bean進行操作。

MasterBaseTaskExecThread實現了Callable<Boolean>接口,call方法又調用了submitWaitComplete,MasterTaskExecThread類中對改方法進行了覆蓋。

submitWaitComplete根據名稱及其注釋說明可以知道,它提交了一個任務實例,然后等待其完成。

/** * submit task instance and wait complete * @return true is task quit is true */ @Override public Boolean submitWaitComplete() { Boolean result = false; this.taskInstance = submit(); if(!this.taskInstance.getState().typeIsFinished()) { result = waitTaskQuit(); } taskInstance.setEndTime(new Date()); processDao.updateTaskInstance(taskInstance); logger.info("task :{} id:{}, process id:{}, exec thread completed ", this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() ); return result; } 

該函數的邏輯簡單來說就是,提交一個任務實例,等待任務完成,更新任務結束時間到數據。

我們可以看出,每個任務實例都可以更新數據庫,加上其他線程,對數據庫的壓力可能很大。如果任務非常多,並發非常大的情況下,jdbc連接線程池需要適當調大。否則,數據庫會成為系統瓶頸。如果worker節點個數過多,這種壓力又會幾何倍數的增長。

首先來看看作業是如何提交的,好像也比較簡單,就是調用了processDao.submitTask。

protected TaskInstance submit(){ Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES, Constants.defaultMasterCommitRetryTimes); Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL, Constants.defaultMasterCommitRetryInterval); int retryTimes = 1; while (retryTimes <= commitRetryTimes){ try { TaskInstance task = processDao.submitTask(taskInstance, processInstance); if(task != null){ return task; } logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes); Thread.sleep(commitRetryInterval); } catch (Exception e) { logger.error("task commit to mysql and queue failed : " + e.getMessage(),e); } retryTimes += 1; } return null; } 

根據前面的分析我們知道processDao就是跟數據庫打交道的,難道這里就是把任務實例插入到了數據?

@Transactional(rollbackFor = Exception.class) public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){ logger.info("start submit task : {}, instance id:{}, state: {}, ", taskInstance.getName(), processInstance.getId(), processInstance.getState() ); processInstance = this.findProcessInstanceDetailById(processInstance.getId()); //submit to mysql TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance); if(task.isSubProcess() && !task.getState().typeIsFinished()){ ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task); TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams()); Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task); }else if(!task.getState().typeIsFinished()){ //submit to task queue task.setProcessInstancePriority(processInstance.getProcessInstancePriority()); submitTaskToQueue(task); } logger.info("submit task :{} state:{} complete, instance id:{} state: {} ", taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } 

這段代碼優點復雜,但只需要看其主干邏輯就好了。也就是調用submitTaskInstanceToMysql把任務實例插入到數據庫,然后調用submitTaskToQueue(目前還看不出插入到了哪里)。

submitTaskInstanceToMysql不再貼源碼分析,與函數名差不多,就是把instance插入到數據庫。

submitTaskToQueue主干邏輯就是把taskInstance添加到了TaskQueue。

public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ // task cannot submit when running if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){ logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName())); return true; } if(checkTaskExistsInTaskQueue(taskInstance)){ logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName())); return true; } logger.info("task ready to queue: {}" , taskInstance); taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) ); return true; }catch (Exception e){ logger.error("submit task to queue Exception: ", e); logger.error("task queue error : %s", JSONUtils.toJson(taskInstance)); return false; } } 

taskQueue是TaskQueueFactory.getTaskQueueInstance()創建的。

protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance(); 

getTaskQueueInstance其實就是調用了TaskQueueZkImpl.getInstance(),這應該是一個遺留接口。估計設計初期是想根據配置創建不同的任務隊列,比如redis或者其他,目前只支持zookeeper。

public static ITaskQueue getTaskQueueInstance() { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { logger.info("task queue impl use zookeeper "); return TaskQueueZkImpl.getInstance(); }else{ logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit "); System.exit(-1); } return null; } 

既然只支持zookeeper,這段冗余代碼應該刪除的

這樣來看submitTaskToQueue就是調用TaskQueueZkImpl.add方法,把任務實例插入到了zookeeper實現的隊列中。

/** * add task to tasks queue * * @param key task queue name * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override public void add(String key, String value) { try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); } catch (Exception e) { logger.error("add task to tasks queue exception",e); } } 

從上下文我們知道,這里的key就是tasks_queue;根據注釋,value就是 ${processInstancePriority}${processInstanceId}${taskInstancePriority}_${taskId}_host1,host2...

這樣來看,add就是在zk的tasks_queue父節點下創建子節點,子節點的data就是value的值。

submit的邏輯分析完畢,來繼續分析submitWaitComplete的剩余主要邏輯:waitTaskQuit。

waitTaskQuit

waitTaskQuit代碼比較多,先從整體來分析其邏輯:

  1. 通過taskInstance.id查詢taskInstance。其實就是查詢taskInstance的最新狀態。
  2. 通過參數判斷是否啟用超時檢查
  3. 一個while“死循環”。
  4. while中判斷任務是否執行結束,是則退出
  5. 獲取任務實例、流程實例最新狀態
  6. 休眠1秒,進入下一次while循環

簡單來說waitTaskQuit就是循環查看任務實例的狀態,直至其成功。

MasterTaskExecThread的功能整體來看就是把任務實例信息插入到數據庫,並放到zookeeper隊列,然后循環等待任務實例的狀態變成完成,並沒有任何具體的執行邏輯。

Stopper.isRunning()作為一個全局變量,控制了N多的線程,每個線程都處於一個while“死循環”中。雖然都sleep一段時間,但感覺還是有點浪費。

至此,master涉及的5個主要線程,已經分析了四個(SubProcessTaskExecThread沒有分析),主要功能分析結束。下面就分析一下master的啟動過程。

MasterServer

MasterServer

先看MasterServer源碼概覽,它是一個SpringBoot普通應用,可以有Autowired字段。有三個主要的方法:run/stop/heartBeatThread。根據經驗和注釋大膽猜測一下,run是master的主要啟動邏輯;stop負責優雅退出(銷毀資源、容災等);heartBeatThread負責與zk的心跳。

這次分析,我們先從非主干邏輯分析,那就是heartBeatThread。

private Runnable heartBeatThread(){ Runnable heartBeatThread = new Runnable() { @Override public void run() { if(Stopper.isRunning()) { // send heartbeat to zk if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server"); return; } zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX); } } }; return heartBeatThread; } 

heartBeatThread創建了一個線程,該線程就是調用了zkMasterClient.heartBeatForZk。

public void heartBeatForZk(String znode, String serverType){ try { //check dead or not in zookeeper if(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){ stoppable.stop("i was judged to death, release resources and stop myself"); return; } byte[] bytes = zkClient.getData().forPath(znode); String resInfoStr = new String(bytes); String[] splits = resInfoStr.split(Constants.COMMA); if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ return; } String str = splits[0] + Constants.COMMA + splits[1] + Constants.COMMA + OSUtils.cpuUsage() + Constants.COMMA + OSUtils.memoryUsage() + Constants.COMMA + OSUtils.loadAverage() + Constants.COMMA + splits[5] + Constants.COMMA + DateUtils.dateToString(new Date()); zkClient.setData().forPath(znode,str.getBytes()); } catch (Exception e) { logger.error("heartbeat for zk failed : " + e.getMessage(), e); stoppable.stop("heartbeat for zk exception, release resources and stop myself"); } } 

zkMasterClient.heartBeatForZk就是在master對應的zookeeper目錄下,更新data值,data主要包含當前系統的資源信息:CPU、內存、平均負載。還有最后一次更新的時間。

我們注意到zkMasterClient的類型是ZKMasterClient,那是不是還會有一個功能類似的ZKWorkerClient?也是用來匯報worker節點的系統資源信息的?

public synchronized void stop(String cause) { try { //execute only once if(Stopper.isStoped()){ return; } logger.info("master server is stopping ..., cause : {}", cause); // set stop signal is true Stopper.stop(); try { //thread sleep 3 seconds for thread quitely stop Thread.sleep(3000L); }catch (Exception e){ logger.warn("thread sleep exception:" + e.getMessage(), e); } try { heartbeatMasterService.shutdownNow(); }catch (Exception e){ logger.warn("heartbeat service stopped exception"); } logger.info("heartbeat service stopped"); //close quartz try{ QuartzExecutors.getInstance().shutdown(); }catch (Exception e){ logger.warn("Quartz service stopped exception:{}",e.getMessage()); } logger.info("Quartz service stopped"); try { ThreadPoolExecutors.getInstance().shutdown(); }catch (Exception e){ logger.warn("threadpool service stopped exception:{}",e.getMessage()); } logger.info("threadpool service stopped"); try { masterSchedulerService.shutdownNow(); }catch (Exception e){ logger.warn("master scheduler service stopped exception:{}",e.getMessage()); } logger.info("master scheduler service stopped"); try { zkMasterClient.close(); }catch (Exception e){ logger.warn("zookeeper service stopped exception:{}",e.getMessage()); } logger.info("zookeeper service stopped"); } catch (Exception e) { logger.error("master server stop exception : " + e.getMessage(), e); System.exit(-1); } } 

來看stop,它是一個同步方法(synchronized)。為了線程安全,這一點還是比較謹慎的。還會調用Stopper.isStoped(),以便只能執行一次。

后面的邏輯就比較簡單了,總結如下:

  1. Stopper.stop()。關閉全部線程的循環標志
  2. 休眠3秒
  3. heartbeatMasterService.shutdownNow
  4. QuartzExecutors.getInstance().shutdown
  5. ThreadPoolExecutors.getInstance().shutdown
  6. masterSchedulerService.shutdownNow
  7. zkMasterClient.close

讀者要細心的分析shutdownNow和shutdown的區別。一些重要的線程還是要等待其全部執行完才能退出的,比如ThreadPoolExecutors。

但上面退出的順序就值得商榷了。假如ThreadPoolExecutors等了很久才退出,就會造成zkMasterClient退出時間也非常久。現在還不知道其他master節點是怎么進行容災的。假如通過HeartBeat,此時heartBeat已經停止了,應該容災,但任務線程池還在執行,其他節點又重復啟動了流程定義實例是否會有影響呢?如果通過zookeeper心跳,此時任務也沒有結束,zookeeper還在連接,貌似也沒法容災吧。

從上面的分析來看,在各個while循環處理Stopper.isRunning()時,並沒有做相應的退出動作,所以此處的stop並不優雅。不是說master優雅退出了,其他節點就是優雅的退出。

考慮到run源碼比較長,且都是一些線程初始化、提交的邏輯,下面只分析最后一段代碼。

// start QuartzExecutors // what system should do if exception try { ProcessScheduleJob.init(processDao); QuartzExecutors.getInstance().start(); } catch (Exception e) { try { QuartzExecutors.getInstance().shutdown(); } catch (SchedulerException e1) { logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1); } logger.error("start Quartz failed : " + e.getMessage(), e); } /** * register hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { if (zkMasterClient.getActiveMasterNum() <= 1) { for (int i = 0; i < Constants.DOLPHINSCHEDULER_WARN_TIMES_FAILOVER; i++) { zkMasterClient.getAlertDao().sendServerStopedAlert( 1, OSUtils.getHost(), "Master-Server"); } } stop("shutdownhook"); } })); 

ProcessScheduleJob.init就是給ProcessScheduleJob一個static字段賦值,也就是給所有的ProcessScheduleJob一個全局的processDao

public static void init(ProcessDao processDao) { ProcessScheduleJob.processDao = processDao; } 

感覺源碼中關於processDao的處理有點模糊不清,比較隨意。有些是傳參,有些是getBean,有些又是全局變量。好亂,好亂。

addShutdownHook的邏輯就比較清晰了,就是添加了進程退出的hook。先發送預警信息,然后調用stop“優雅”退出。

FetchTaskThread

worker涉及的線程主要有兩個FetchTaskThread、TaskScheduleThread。

FetchTaskThread從名稱上來看,應該是從zk隊列拉取任務信息的。它也是Runnable的一個實現類,還是從run方法入手分析。

@Override
public void run() { while (Stopper.isRunning()){ InterProcessMutex mutex = null; try { ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; //check memory and cpu usage and threads boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor); Thread.sleep(Constants.SLEEP_TIME_MILLIS); if(!runCheckFlag) { continue; } //whether have tasks, if no tasks , no need lock //get all tasks List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); if (CollectionUtils.isEmpty(tasksQueueList)){ continue; } // creating distributed locks, lock path /dolphinscheduler/lock/worker mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(), zkWorkerClient.getWorkerLockPath()); // task instance id str List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum); for(String taskQueueStr : taskQueueStrArr){ if (StringUtils.isEmpty(taskQueueStr)) { continue; } if (!checkThreadCount(poolExecutor)) { break; } // get task instance id taskInstId = getTaskInstanceId(taskQueueStr); // mainly to wait for the master insert task to succeed waitForTaskInstance(); taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId); // verify task instance is null if (verifyTaskInstanceIsNull(taskInstance)) { logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr); removeNodeFromTaskQueue(taskQueueStr); continue; } Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), taskInstance.getProcessDefine().getUserId()); // verify tenant is null if (verifyTenantIsNull(tenant)) { logger.warn("remove task queue : {} due to tenant is null", taskQueueStr); removeNodeFromTaskQueue(taskQueueStr); continue; } // set queue for process instance, user-specified queue takes precedence over tenant queue String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); logger.info("worker fetch taskId : {} from queue ", taskInstId); if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ continue; } // local execute path String execLocalPath = getExecLocalPath(); logger.info("task instance local execute path : {} ", execLocalPath); // init task taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath); // check and create Linux users FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode(), logger); logger.info("task : {} ready to submit to task scheduler thread",taskInstId); // submit task workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); // remove node from zk removeNodeFromTaskQueue(taskQueueStr); } }catch (Exception e){ logger.error("fetch task thread failure" ,e); }finally { AbstractZKClient.releaseMutex(mutex); } } } 

run還是一個while“死循環”,首先檢查了當前資源是否超閾值、線程數是否夠用,然后休眠1秒,判斷前面的結果,為false則進入下一個循環。

真惡心的寫法,居然是先獲取runCheckFlag標志,休眠后再判斷這個值。

調用taskQueue.getAllTasks獲取當前所有的任務列表,為空則進入下一次循環。

難道不應該用hasTask這樣的接口判斷嗎?此處只是判斷是否有作業,獲取全部的任務列表就不合適了,優點浪費內存。

申請InterProcessMutex鎖,這樣同一時刻只有一個worker節點可以從隊列中poll任務。這意味着,任務會隨機的在worker節點執行。分配任務的算法多少有點簡單,難道不應該哪個節點資源多,搶占鎖的可能性大一點嗎?

其實可以基於zookeeper實現一個具有優先級的分布式鎖,申請鎖時會設置當前客戶端的權重,權重大的搶到鎖的可能性隨之增大。

搶占到鎖之后,每次poll固定數量的任務。還記得之前TaskQueue中插入的是什么樣的數據嗎?

${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2... 

雖然作者說,百萬級別的時候處理這樣的字符串沒有啥性能損耗,但簡化點不好嗎?特別是前面還有一個getAll。

獲取到任務列表之后,就是一個for循環,依次處理任務。下面簡單總結一下其邏輯。

  1. 判斷taskQueueStr是否為空。感覺有點多此一舉。
  2. 判斷當前線程數是否夠用
  3. 從taskQueueStr中取到任務ID。就是按照_分隔之后的第四個字段。
  4. 等待任務實例信息插入到數據庫。循環30次,每次等待1秒。注釋說數據庫操作會被延遲,不知道哪里會延遲。
  5. 通過任務id,獲取任務實例信息。
  6. 通過任務實例,獲取租戶信息。
  7. 通過任務實例,獲取用戶隊列信息。為啥不在查詢任務實例信息的時候,直接獲取到呢?或者在getTaskInstanceDetailByTaskId一次性獲取到?
  8. 判斷任務實例是否可以在當前節點執行,不能則繼續下一個任務處理。這為啥不提前判斷呢?調了2次db查詢才來判斷?
  9. 任務實例初始化
  10. 檢查目錄、用戶是否存在。不存在則創建用戶、目錄。為啥不是提前建好?每次還要檢查一遍。
  11. 提交任務,交給TaskScheduleThread線程執行。
  12. 刪除taskQueue中對應的任務節點。

FetchTaskThread功能就是搶占zk鎖,從TaskQueue獲取任務,然后創建TaskScheduleThread線程去執行。

TaskScheduleThread

TaskScheduleThread的功能應該是比較簡單了,畢竟到這一步就要執行具體的邏輯了。

@Override public void run() { try { // update task state is running according to task type updateTaskState(taskInstance.getTaskType()); logger.info("script path : {}", taskInstance.getExecutePath()); // task node TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); // copy hdfs/minio file to local copyHdfsToLocal(processDao, taskInstance.getExecutePath(), createProjectResFiles(taskNode), logger); // get process instance according to tak instance ProcessInstance processInstance = taskInstance.getProcessInstance(); // set task props TaskProps taskProps = new TaskProps(taskNode.getParams(), taskInstance.getExecutePath(), processInstance.getScheduleTime(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getId(), CommonUtils.getSystemEnvPath(), processInstance.getTenantCode(), processInstance.getQueue(), taskInstance.getStartTime(), getGlobalParamsMap(), taskInstance.getDependency(), processInstance.getCmdTypeIfComplement()); // set task timeout setTaskTimeout(taskProps, taskNode); taskProps.setTaskAppId(String.format("%s_%s_%s", taskInstance.getProcessDefine().getId(), taskInstance.getProcessInstance().getId(), taskInstance.getId())); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskInstance.getProcessDefine().getId(), taskInstance.getProcessInstance().getId(), taskInstance.getId())); task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); // task init task.init(); // task handle task.handle(); // task result process task.after(); }catch (Exception e){ logger.error("task scheduler failure", e); kill(); // update task instance state processDao.changeTaskState(ExecutionStatus.FAILURE, new Date(), taskInstance.getId()); } logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus()); // update task instance state processDao.changeTaskState(task.getExitStatus(), new Date(), taskInstance.getId()); } 

還是一步步分析run方法。

  1. 更新任務狀態為ExecutionStatus.RUNNING_EXEUTION
  2. 從任務實例獲取任務節點信息。
  3. 從HDFS復制文件到本地。包括一些用戶上傳的資源文件,jar包、SQL文件、配置文件等等。
  4. 構造TaskProps對象。
  5. 初始化任務日志對象。
  6. 構造AbstractTask實例
  7. 依次調用AbstractTask的init、handle、after。
  8. 更新任務實例的狀態。異常失敗或成功等。

TaskManager.newTask還是比較重要的,它創建了最終的、具體的、可執行的任務實例。

public static AbstractTask newTask(String taskType, TaskProps props, Logger logger) throws IllegalArgumentException { switch (EnumUtils.getEnum(TaskType.class,taskType)) { case SHELL: return new ShellTask(props, logger); case PROCEDURE: return new ProcedureTask(props, logger); case SQL: return new SqlTask(props, logger); case MR: return new MapReduceTask(props, logger); case SPARK: return new SparkTask(props, logger); case FLINK: return new FlinkTask(props, logger); case PYTHON: return new PythonTask(props, logger); case DEPENDENT: return new DependentTask(props, logger); case HTTP: return new HttpTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); } 

至此,終於找到了前端配置任務的具體實現類。其實吧,這個異常拋的沒有道理。這個taskType肯定是保存在數據庫的,保存之前應該做校驗了吧,畢竟是enum轉過去的。

ShellTask

AbstractTask的子類現在有9個,為了簡單下面只分析ShellTask,這是一個常見且簡單的任務類型。

public ShellTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); this.taskDir = taskProps.getTaskDir(); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), taskProps.getTaskAppId(), taskProps.getTaskInstId(), taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), logger); this.processDao = SpringApplicationContext.getBean(ProcessDao.class); } 

先看其構造函數,有兩個字段的初始化比較重要:shellCommandExecutor、processDao。ShellCommandExecutor是shell腳本的執行器,具體功能后面再分析。processDao的初始化方法是不是比較熟悉?又是通過SpringApplicationContext.getBean獲取到的,傳個參數多好。把這些dao等其他類型的全局或局部變量封裝到TaskContenxt多好,如果任務之間傳遞變量,就可以用TaskContenxt了。

根據init、handle、after的名稱來看,具體的執行應該是在handle。

public void handle() throws Exception { try { // construct process exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao); } catch (Exception e) { logger.error("shell task failure", e); exitStatusCode = -1; } } 

handle就是簡單的調用了shellCommandExecutor.run,如果出現異常,則exitStatusCode賦值-1

shellCommandExecutor.run的代碼比較多,不再深入分析,此處只簡單的分析shellCommandExecutor的buildProcess的方法。

private void buildProcess(String commandFile) throws IOException { //init process builder ProcessBuilder processBuilder = new ProcessBuilder(); // setting up a working directory processBuilder.directory(new File(taskDir)); // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile); process = processBuilder.start(); // print command printCommand(processBuilder); } 

它根據commandFile創建了一個ProcessBuilder,返回了Process對象。當然了,是通過sudo -u執行的shell命令。

DependentTask

DependentTask雖然是AbstractTask的一個子類,雖然與shell屬於同一個層級的類,但由於其功能的特殊性,此處單獨拿出來做分析。

public void handle(){ // set the name of the current thread String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); try{ TaskInstance taskInstance = null; while(Stopper.isRunning()){ taskInstance = processDao.findTaskInstanceById(this.taskProps.getTaskInstId()); if(taskInstance == null){ exitStatusCode = -1; break; } if(taskInstance.getState() == ExecutionStatus.KILL){ this.cancel = true; } if(this.cancel || allDependentTaskFinish()){ break; } Thread.sleep(Constants.SLEEP_TIME_MILLIS); } if(cancel){ exitStatusCode = Constants.EXIT_CODE_KILL; }else{ DependResult result = getTaskDependResult(); exitStatusCode = (result == DependResult.SUCCESS) ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; } }catch (Exception e){ logger.error(e.getMessage(),e); exitStatusCode = -1; } } 

又是一個“死循環”,作者很鍾情於這個設計啊。

邏輯也比較簡單,總結如下:

  1. 通過任務實例id,獲取當前最新的任務實例信息
  2. 判斷狀態是否為kill,是則退出
  3. 判斷所有依賴任務是否完成,是則退出
  4. 休眠1秒,進入下一次循環。

allDependentTaskFinish是一個非常重要的邏輯。

private boolean allDependentTaskFinish(){ boolean finish = true; for(DependentExecute dependentExecute : dependentTaskList){ for(Map.Entry<String, DependResult> entry: dependentExecute.getDependResultMap().entrySet()) { if(!dependResultMap.containsKey(entry.getKey())){ dependResultMap.put(entry.getKey(), entry.getValue()); //save depend result to log logger.info("dependent item complete {} {},{}", DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString()); } } if(!dependentExecute.finish(dependentDate)){ finish = false; } } return finish; } 

它遍歷了dependentTaskList,通過dependentExecute.finish(dependentDate)判斷了依賴的作業是否全部完成,任意一個沒有完成,則退出循環,返回false。

dependentDate的值也很重要,它其實是任務的調度時間或者啟動時間(補數時間)

if(taskProps.getScheduleTime() != null){ this.dependentDate = taskProps.getScheduleTime(); }else{ this.dependentDate = taskProps.getTaskStartTime(); } 

通過一層層追蹤分析DependentExecute.finish,我們定位到了DependentExecute.calculateResultForTasks,這是用來判斷某個依賴項的依賴結果的。

/** * calculate dependent result for one dependent item. * @param dependentItem dependent item * @param dateIntervals date intervals * @return dateIntervals */ private DependResult calculateResultForTasks(DependentItem dependentItem, List<DateInterval> dateIntervals) { DependResult result = DependResult.FAILED; for(DateInterval dateInterval : dateIntervals){ ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval); if(processInstance == null){ logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() ); return DependResult.FAILED; } if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ result = getDependResultByState(processInstance.getState()); }else{ TaskInstance taskInstance = null; List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId()); for(TaskInstance task : taskInstanceList){ if(task.getName().equals(dependentItem.getDepTasks())){ taskInstance = task; break; } } if(taskInstance == null){ // cannot find task in the process instance // maybe because process instance is running or failed. result = getDependResultByState(processInstance.getState()); }else{ result = getDependResultByState(taskInstance.getState()); } } if(result != DependResult.SUCCESS){ break; } } return result; } 

總結並簡化其重要的邏輯,大概是如果依賴整個DAG,則判斷流程定義實例的狀態;否則依次判斷依賴任務實例的狀態。

DependentTask的邏輯簡單清晰,就是循環等待所有的任務結束。但感覺這樣設計不太好,畢竟把它當成一個普通的Task,就意味着它會占用整體的可調用的線程池。如果項目多、任務多、依賴也多的話,這個浪費還是有點大的。個人覺得DependentTask可以單獨設計成一個線程,或者放到獨立的線程池去運行。畢竟對於一個調度系統來說,“依賴”還是一個非常重要的概念的。

WorkerServer

最后我們分析WorkerServer,這是與master同級的類。與master分析思路一致,還是先來看stop方法。

此處補貼代碼,只總結stop邏輯。

  1. 調用Stopper.stop設置全局變量。停止所有線程的“死”循環
  2. 休眠3秒
  3. 停止worker心跳。heartbeatWorkerService.shutdownNow
  4. 停止worker任務線程池。ThreadPoolExecutors.getInstance().shutdown
  5. 停止killExecutor線程池。killExecutorService.shutdownNow
  6. 停止fetchTask線程池。fetchTaskExecutorService.shutdownNow
  7. 停止zookeeper客戶端。zkWorkerClient.close

heartBeatThread不再分析,其邏輯與master基本一致,就是上報worker的當前資源使用情況。

ZKWorkerClient

最后我們再來看ZKWorkerClient的邏輯,它與worker的容災有很大關系。這是一個非常重要的邏輯和概念,下面會逐步深入分析。

private ZKWorkerClient(){ init(); } /** * init */ private void init(){ // init system znode this.initSystemZNode(); // monitor worker this.listenerWorker(); // register worker this.registWorker(); } 

先來看其初始化過程,就是一次調用initSystemZNode、listenerWorker、registWorker。

protected void initSystemZNode(){ try { createNodePath(getMasterZNodeParentPath()); createNodePath(getWorkerZNodeParentPath()); createNodePath(getDeadZNodeParentPath()); } catch (Exception e) { logger.error("init system znode failed : " + e.getMessage(),e); } } private void createNodePath(String zNodeParentPath) throws Exception { if(null == zkClient.checkExists().forPath(zNodeParentPath)){ zkClient.create().creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath); } } 

根據initSystemZNode源碼,以及涉及到的三個函數來看,就是在zookeeper中依次創建了3個節點。令人詫異的是,在worker節點初始化過程中居然會創建master相關的子節點。

下面我們先分析registWorker,因為我覺得就是應該先注冊worker節點,在開啟監聽啊。

registWorker的源碼不再貼出來,它主要就是調用registerServer(ZKNodeType.WORKER)注冊了當前節點。

public String registerServer(ZKNodeType zkNodeType) throws Exception { String registerPath = null; String host = OSUtils.getHost(); if(checkZKNodeExists(host, zkNodeType)){ logger.error("register failure , {} server already started on host : {}" , zkNodeType.toString(), host); return registerPath; } registerPath = createZNodePath(zkNodeType); // handle dead server handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); return registerPath; } 

registerServer首先檢查了當前節點是否存在,存在則退出;不存在則創建節點。最后調用handleDeadServer,其實就是查找死掉的節點,然后從zk中刪除。

private void listenerWorker(){ workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory); try { workerPathChildrenCache.start(); workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: logger.info("node added : {}" ,event.getData().getPath()); break; case CHILD_REMOVED: String path = event.getData().getPath(); //find myself dead String serverHost = getHostByEventDataPath(path); if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ return; } break; case CHILD_UPDATED: break; default: break; } } }); }catch (Exception e){ logger.error("monitor worker failed : " + e.getMessage(),e); } } 

listenerWorker就是監聽worker的CHILD_REMOVED事件,監聽到該事件之后,調用了checkServerSelfDead。worker本身並不會對其他worker節點的移除進行啥具體邏輯。

protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) { if (serverHost.equals(OSUtils.getHost())) { logger.error("{} server({}) of myself dead , stopping...", zkNodeType.toString(), serverHost); stoppable.stop(String.format(" {} server {} of myself dead , stopping...", zkNodeType.toString(), serverHost)); return true; } return false; } 

checkServerSelfDead判斷是否為當前節點,如果是則調用stoppable.stop,而stoppable是在WorkerServer.run函數中設置的。

zkWorkerClient.setStoppable(this); 

listenerWorker就是監聽當前節點是否超時被zookeeper刪除,刪除后則調用stop方法,優雅退出。

ZKMasterClient

ZKWorkerClient好像沒啥邏輯,就是用來優雅的退出,下面來分析ZKMasterClient。

private ZKMasterClient(ProcessDao processDao){
	this.processDao = processDao;
	init();
}
 public void init(){ // init dao this.initDao(); InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master String znodeLock = getMasterStartUpLockPath(); mutex = new InterProcessMutex(zkClient, znodeLock); mutex.acquire(); // init system znode this.initSystemZNode(); // monitor master this.listenerMaster(); // monitor worker this.listenerWorker(); // register master this.registerMaster(); // check if fault tolerance is required,failure and tolerance  if (getActiveMasterNum() == 1) { failoverWorker(null, true); failoverMaster(null); } }catch (Exception e){ logger.error("master start up exception : " + e.getMessage(),e); }finally { releaseMutex(mutex); } } 

代碼比較長,總結一下其邏輯:

  1. initDao。其實就是初始化alertDao,調用DaoFactory.getDaoInstance(AlertDao.class)。好惡心的初始化方法,processDao是傳進來的,alertDao又是這樣創建的。
  2. 申請/dolphinscheduler/lock/failover/master路徑的分布式鎖。
  3. 申請到鎖之后,依次調用initSystemZNode、listenerMaster、listenerWorker、registerMaster
  4. 如果當前活動的master個數為1則進行容災。暫時還不知道為啥。

initSystemZNode不再分析;listenerMaster不再貼源碼,它主要邏輯就是監聽到其他master節點被移除后調用removeZKNodePath,如果是當前節點,則優雅退出。

removeZKNodePath也不再貼源碼,它主要是處理死掉的節點、進行預警,如果要進行故障轉移,就調用failoverServerWhenDown。這是一個非常重要的方法,它在里面按照不同情況調用了failoverMaster或failoverWorker。也就是說master和worker的故障轉移都是在master處理的。

private void failoverMaster(String masterHost) { logger.info("start master failover ..."); List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost); //updateProcessInstance host is null and insert into command for(ProcessInstance processInstance : needFailoverProcessInstanceList){ processDao.processNeedFailoverProcessInstances(processInstance); } logger.info("master failover end"); } 

failoverMaster查詢了指定master節點運行的流程定義實例,然后調動processNeedFailoverProcessInstances進行處理。

@Transactional(rollbackFor = Exception.class) public void processNeedFailoverProcessInstances(ProcessInstance processInstance){ //1 update processInstance host is null processInstance.setHost("null"); processInstanceMapper.updateById(processInstance); //2 insert into recover command Command cmd = new Command(); cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId()); cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId())); cmd.setExecutorId(processInstance.getExecutorId()); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); createCommand(cmd); } 

processNeedFailoverProcessInstances邏輯就是更新為當前流程定義實例的host字段為null字符串,插入一條類型為RECOVER_TOLERANCE_FAULT_PROCESS的Command,參數是流程定義實例id。

這里只是插入了一條Command,具體是在哪里處理的呢?這里全局搜索一下。

recover_tolerance_fault_process

排除之后定位到ProcessDao.constructProcessInstance,如果是RECOVER_TOLERANCE_FAULT_PROCESS類型,則調用processInstance.setRecovery(Flag.YES)。依然不知道怎么處理的,還得搜索判斷為recovery為true時的邏輯。

搜來搜去你會驚奇的發現,沒有地方會對RECOVER_TOLERANCE_FAULT_PROCESS或processInstance.getRecovery為true做特殊處理!!!

那就得回到cmd的構造上,它設置了4個值,就得研究下這跟普通的Command的其他區別了。

Command command = new Command(); command.setCommandType(CommandType.SCHEDULER); command.setExecutorId(schedule.getUserId()); command.setFailureStrategy(schedule.getFailureStrategy()); command.setProcessDefinitionId(schedule.getProcessDefinitionId()); command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); command.setWorkerGroupId(schedule.getWorkerGroupId()); command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); 

現在回過頭去ProcessScheduleJob.execute看一下,普通Command是如何構造的。大家會發現RECOVER_TOLERANCE_FAULT_PROCESS的Command多了一個commandParam的設置,這個param的key是Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING。

通過全局搜索,定位到ProcessDao.constructProcessInstance,這里有對Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING的處理。

commandRecover

處理也比較簡單,就是獲取到Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING的值,然后查詢到流程定義實例對象而不是重新創建一個。后續邏輯就是把流程定義實例的參數,全都put到cmdParam中進行后續的邏輯判斷。

根據流程實例id查詢,而不是重新創建。這樣的好處是可以避免已經執行成功的任務不再重復執行。那處於running狀態的任務呢?這個就要結合master的優雅退出來看了。

還記得master如何優雅退出的嗎?好像沒有涉及到這部分邏輯啊?沒錯,就是沒有涉及,其實master會等待所調度的所有流程定義實例中的任務直至結束的。也就是說,調度的任務應該會被正常的執行完畢的,狀態最終會被更新成失敗或者成功。

那如果宕掉的master沒有能成功更新作業的狀態呢?很不幸,DolphinScheduler沒有處理這部分異常。此時的作業只能是永久處於running狀態,除非手動干預。

這部分邏輯最好處理一下啊,要不然就真的永久處於running狀態了。

分析到這里大家會發現,其實master的故障轉移很簡單,就是把某個master節點的流程定義實例交由其他master節點去驅動,原來的流程定義實例中的任務狀態沒有任何改變。

下面來看看worker的故障轉移。

/** * failover worker tasks * * 1. kill yarn job if there are yarn jobs in tasks. * 2. change task state from running to need failover. * 3. failover all tasks when workerHost is null * @param workerHost worker host * @param needCheckWorkerAlive need check worker alive * @throws Exception exception */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { logger.info("start worker[{}] failover ...", workerHost); List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost); for(TaskInstance taskInstance : needFailoverTaskInstanceList){ if(needCheckWorkerAlive){ if(!checkTaskInstanceNeedFailover(taskInstance)){ continue; } } ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if(instance!=null){ taskInstance.setProcessInstance(instance); } // only kill yarn job if exists , the local thread has exited ProcessUtils.killYarnJob(taskInstance); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processDao.saveTaskInstance(taskInstance); } logger.info("end worker[{}] failover ...", workerHost); } 

首先根據worker的IP找到對應的任務實例,如果該實例不需要故障轉移則繼續下一個任務實例的檢測;如果需要,則找到對應的流程定義實例,將其與任務實例關聯;只殺掉yarn作業,作者說其他作業的本地線程已經退出(誰知道呢);設置任務實例的狀態為ExecutionStatus.NEED_FAULT_TOLERANCE。

看到這里發現worker的故障轉移也沒啥太復雜功能,就是設置任務實例的狀態。那是如何處理ExecutionStatus.NEED_FAULT_TOLERANCE狀態的任務實例的呢?

很不幸,沒有具體的邏輯來處理這個狀態。還記得worker是如何優雅stop的嗎?就是等對應的線程池安全退出,其實就是讓正在執行的任務實例繼續執行完畢。

needFaultTolerance

全局搜索之后,發現對ExecutionStatus.NEED_FAULT_TOLERANCE做過的唯一比較重要的判斷就是:如果作業失敗且需要故障轉移,就把他放到recoverToleranceFaultTaskList列表中。但跟蹤代碼才發現recoverToleranceFaultTaskList就是用來預警的。

剛開始,taskCanRetry中對NEED_FAULT_TOLERANCE的判斷被我忽略了。

/** * determine if you can try again * @return can try result */ public boolean taskCanRetry() { if(this.isSubProcess()){ return false; } if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ return true; }else { return (this.getState().typeIsFailure() && this.getRetryTimes() < this.getMaxRetryTimes()); } } 

taskCanRetry中,如果是ExecutionStatus.NEED_FAULT_TOLERANCE狀態,則不管重試了多少次,一定可以重試。有啥用呢?

其實就是如果發現某個作業是故障轉移狀態,則失敗的時候一定可以重試。

if(task.getState().typeIsFailure()){ if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ this.recoverToleranceFaultTaskList.add(task); } if(task.taskCanRetry()){ addTaskToStandByList(task); }else{ // node failure, based on failure strategy errorTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task); if(processInstance.getFailureStrategy() == FailureStrategy.END){ killTheOtherTasks(); } } continue; } 

上面是MasterExecThread.runProcess中的一段代碼,其邏輯就是如果當前作業失敗,且可以重試,就把作業添加到readyToSubmitTaskList隊列中再次執行。而ExecutionStatus.NEED_FAULT_TOLERANCE就是屬於可以重試。

至此簡單總結一下DolphinScheduler故障轉移的邏輯:

  1. worker如果與zookeeper連接超時,則停止心跳,停止獲取任務,等待所有任務實例執行結束(正常或失敗)並更新數據庫狀態
  2. master如果與zookeeper連接超時,則停止心跳,停止獲取流程定義實例,停止調度所有流程定義實例
  3. master如果發現某個流程定義實例中的任務實例失敗且屬於ExecutionStatus.NEED_FAULT_TOLERANCE狀態,則重新運行。

但這里有一個不太合理的假設:如果master/worker與zookeeper的連接超時,則master/worker出現了問題,應該發生故障轉移。

master/worker與zookeeper的連接超時有兩種可能,master/worker的網絡有問題、zookeeper有問題。如果是master/worker的網絡有問題則MySQL的讀寫也會有問題,意味着任務實例的狀態更新可能有問題,此時發生故障轉移沒問題;如果是zookeeper服務本身有問題,則所有的master/worker可能都會有問題,即使發生故障轉移意義不是特別大。

其實個人覺得,既然DolphinScheduler把流程定義、實例等信息保存到了數據庫,那么心跳應該去度量與數據庫的連接,而不是去度量與zookeeper的連接。zookeeper存在的意義就是分布式鎖,而不應該是度量心跳!

引用

  1. Dolphin Scheduler Api Docs
  2. DolphinScheduler內部原理和架構設計.ppt


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM