Azkaban3.45
一 簡介
1 官網
Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.
Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.
Azkaban是由LinkedIn為了解決Hadoop環境下任務依賴問題而開發的,LinkedIn團隊有很多任務需要按照順序運行,包括ETL任務以及數據分析任務;
Azkaban一開始是單server方案,現在已經演化為一個更健壯的方案;(可惜當前版本的WebServer還是單點)
Azkaban consists of 3 key components:
- Relational Database (MySQL)
- AzkabanWebServer
- AzkabanExecutorServer
Azkaban有3個核心組件:Mysql、WebServer、ExecutorServer;
2 部署
3 數據庫表結構
projects:項目
project_flows:工作流定義
execution_flows:工作流實例
execution_jobs:任務實例
triggers:調度定義
ps:表中很多數據都是編碼的,enc_type是編碼類型(對應的枚舉為EncodingType),2是gzip編碼,其他為無編碼,2需要調用GZIPUtils.transformBytesToObject解析得到原始字符串;
4 概念
l Job:最小的執行單元,作為DAG的一個結點,即任務
l Flow:由多個Job組成,並通過dependent配置Job的依賴屬性,即工作流
l Tirgger:根據指定Cron信息觸發Flow,即調度
二 代碼解析
1 啟動過程
Web Server
AzkabanWebServer.main
launch
prepareAndStartServer
configureRoutes
TriggerManager.start
FlowTriggerService.start
recoverIncompleteTriggerInstances
SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))
FlowTriggerScheduler.start
ExecutorManager
setupExecutors
loadRunningFlows
QueueProcessorThread.run
ExecutingManagerUpdaterThread.run
Executor Server
AzkabanExecutorServer.main
launch
AzkabanExecutorServer.start
insertExecutorEntryIntoDB
2 工作流執行過程
Web Server兩個入口:
ExecuteFlowAction.doAction
ExecutorServlet.ajaxExecuteFlow
Web Server分配任務:
ExecutorManager.submitExecutableFlow
JdbcExecutorLoader.uploadExecutableFlow
INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)
ExecutorLoader.addActiveExecutableReference
INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)
queuedFlows.enqueue
QueueProcessorThread.run
processQueuedFlows
ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)
selectExecutor
dispatch
JdbcExecutorLoader.assignExecutor
UPDATE execution_flows SET executor_id=? where exec_id=?
ExecutorApiGateway.callWithExecutable (調用Executor Server)
Executor Server執行任務:
ExecutorServlet.doGet
handleAjaxExecute
FlowRunnerManager.submitFlow
JdbcExecutorLoader.fetchExecutableFlow
SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?
FlowPreparer.setup
FlowRunner.run
setupFlowExecution
updateFlow
UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?
runFlow
progressGraph
runReadyJob
runExecutableNode
JobRunner.run
uploadExecutableNode
INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)
prepareJob
runJob
Job.run (ProcessJob, JavaJob)
Web Server輪詢流程狀態:
ExecutingManagerUpdaterThread.run
getFlowToExecutorMap
ExecutorApiGateway.callWithExecutionId
updateExecution
3 調度執行過程
TriggerManager.start
loadTriggers
SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers
TriggerScannerThread.start
checkAllTriggers
onTriggerTrigger
TriggerAction.doAction
ExecuteFlowAction.doAction
PS:還有另一套完全獨立的定時任務邏輯,通過azkaban.server.schedule.enable_quartz控制(默認false),以下為register job到quartz:
ProjectManagerServlet.ajaxHandleUpload
SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true
ProjectManager.loadAllProjectFlows
SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?
FlowTriggerScheduler.scheduleAll
SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?
SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?
registerJob
以下為quartz job執行:
FlowTriggerQuartzJob.execute
FlowTriggerService.startTrigger
TriggerInstanceProcessor.processSucceed
TriggerInstanceProcessor.executeFlowAndUpdateExecID
ExecutorManager.submitExecutableFlow
4 任務執行過程
Job是任務的核心接口,所有具體任務都是該接口的子類:
Job
AbstractJob
AbstractProcessJob
ProcessJob (Shell任務)
JavaProcessJob (Java任務)
JavaJob