【原創】大數據基礎之Azkaban(1)簡介、源代碼解析


Azkaban3.45

一 簡介

1 官網

https://azkaban.github.io/

Azkaban was implemented at LinkedIn to solve the problem of Hadoop job dependencies. We had jobs that needed to run in order, from ETL jobs to data analytics products.

Initially a single server solution, with the increased number of Hadoop users over the years, Azkaban has evolved to be a more robust solution.

Azkaban是由LinkedIn為了解決Hadoop環境下任務依賴問題而開發的,LinkedIn團隊有很多任務需要按照順序運行,包括ETL任務以及數據分析任務;

Azkaban一開始是單server方案,現在已經演化為一個更健壯的方案;(可惜當前版本的WebServer還是單點)

Azkaban consists of 3 key components:

  • Relational Database (MySQL)
  • AzkabanWebServer
  • AzkabanExecutorServer

Azkaban有3個核心組件:Mysql、WebServer、ExecutorServer;

2 部署

 

3 數據庫表結構

 

projects:項目

project_flows:工作流定義

execution_flows:工作流實例

execution_jobs:任務實例

triggers:調度定義

ps:表中很多數據都是編碼的,enc_type是編碼類型(對應的枚舉為EncodingType),2是gzip編碼,其他為無編碼,2需要調用GZIPUtils.transformBytesToObject解析得到原始字符串;

4 概念

l  Job:最小的執行單元,作為DAG的一個結點,即任務

l  Flow:由多個Job組成,並通過dependent配置Job的依賴屬性,即工作流

l  Tirgger:根據指定Cron信息觸發Flow,即調度

二 代碼解析

1 啟動過程

Web Server

AzkabanWebServer.main

         launch

                  prepareAndStartServer

                          configureRoutes

                                   TriggerManager.start

                          FlowTriggerService.start

                                   recoverIncompleteTriggerInstances

                                            SELECT %s FROM execution_dependencies WHERE trigger_instance_id in (SELECT trigger_instance_id FROM execution_dependencies WHERE dep_status = %s or dep_status = %s or (dep_status = %s and flow_exec_id = %s))

                          FlowTriggerScheduler.start

 

ExecutorManager

         setupExecutors

         loadRunningFlows

QueueProcessorThread.run

ExecutingManagerUpdaterThread.run

Executor Server

AzkabanExecutorServer.main

         launch

                  AzkabanExecutorServer.start

                          insertExecutorEntryIntoDB

 

2 工作流執行過程

Web Server兩個入口:

ExecuteFlowAction.doAction

ExecutorServlet.ajaxExecuteFlow

 

Web Server分配任務:

ExecutorManager.submitExecutableFlow

         JdbcExecutorLoader.uploadExecutableFlow

                  INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time) values (?,?,?,?,?,?,?)

         ExecutorLoader.addActiveExecutableReference

                  INSERT INTO active_executing_flows (exec_id, update_time) values (?,?)

         queuedFlows.enqueue

 

QueueProcessorThread.run

         processQueuedFlows

                  ExecutorManager.selectExecutorAndDispatchFlow (get from queuedFlows)

                          selectExecutor

                          dispatch

                                   JdbcExecutorLoader.assignExecutor

                                            UPDATE execution_flows SET executor_id=? where exec_id=?

                                   ExecutorApiGateway.callWithExecutable (調用Executor Server)

 

Executor Server執行任務:

ExecutorServlet.doGet

         handleAjaxExecute

                  FlowRunnerManager.submitFlow

                          JdbcExecutorLoader.fetchExecutableFlow

                                 SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?

                          FlowPreparer.setup

                          FlowRunner.run

                                   setupFlowExecution

                                   updateFlow

                                            UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? WHERE exec_id=?

                                   runFlow

                                            progressGraph

                                                     runReadyJob

                                                             runExecutableNode

                                                                      JobRunner.run

                                                                               uploadExecutableNode

                                                                                        INSERT INTO execution_jobs (exec_id, project_id, version, flow_id, job_id, start_time, end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)

                                                                               prepareJob

                                                                               runJob

                                                                                        Job.run (ProcessJob, JavaJob)

 

Web Server輪詢流程狀態:

ExecutingManagerUpdaterThread.run

         getFlowToExecutorMap

         ExecutorApiGateway.callWithExecutionId

         updateExecution

 

3 調度執行過程

TriggerManager.start

         loadTriggers

                  SELECT trigger_id, trigger_source, modify_time, enc_type, data FROM triggers

         TriggerScannerThread.start

                  checkAllTriggers

                          onTriggerTrigger

                                   TriggerAction.doAction

                                            ExecuteFlowAction.doAction

 

PS:還有另一套完全獨立的定時任務邏輯,通過azkaban.server.schedule.enable_quartz控制(默認false),以下為register job到quartz:

ProjectManagerServlet.ajaxHandleUpload

         SELECT id, name, active, modified_time, create_time, version, last_modified_by, description, enc_type, settings_blob FROM projects WHERE name=? AND active=true

         ProjectManager.loadAllProjectFlows

                  SELECT project_id, version, flow_id, modified_time, encoding_type, json FROM project_flows WHERE project_id=? AND version=?

         FlowTriggerScheduler.scheduleAll

                  SELECT MAX(flow_version) FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=?

                  SELECT flow_file FROM project_flow_files WHERE project_id=? AND project_version=? AND flow_name=? AND flow_version=?

                  registerJob

 

以下為quartz job執行:

FlowTriggerQuartzJob.execute

         FlowTriggerService.startTrigger

                  TriggerInstanceProcessor.processSucceed

                          TriggerInstanceProcessor.executeFlowAndUpdateExecID

                                   ExecutorManager.submitExecutableFlow

 

4 任務執行過程

Job是任務的核心接口,所有具體任務都是該接口的子類:

Job

         AbstractJob

                  AbstractProcessJob

                          ProcessJob (Shell任務)

                                   JavaProcessJob (Java任務)

                                            JavaJob


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM