從源碼看Azkaban作業流下發過程


上一篇零散地羅列了看源碼時記錄的一些類的信息,這篇完整介紹一個作業流在Azkaban中的執行過程,希望可以幫助剛剛接手Azkaban相關工作的開發、測試。

一、Azkaban簡介  

Azkaban作為開源的調度系統,在大數據中有廣泛地使用。它主要有三部分組成:Azkaban Webserver、Azkaban Executor、 DB。

              圖1 Azkaban架構

圖1所示的是Azkaban的基本架構:Webserver主要負責權限驗證、項目管理、作業流下發等工作;Executor主要負責作業流/作業的具體執行以及搜集執行日志等工作;MySQL用於存儲作業/作業流的執行狀態信息。圖中所示的是單executor場景,但是實際應用中大部分的項目使用的都是多executor場景。下面主要介紹多executor場景下的azkaban調度過程。

二、作業流執行過程

                     圖2 作業流執行過程

圖2展示的就是Azkaban作業流的執行過程:

1. 首先Webserver根據內存中緩存的各Executor的資源狀態(Webserver有一個線程會遍歷各個active executor,去發送http請求獲取其資源狀態信息緩存到內存中),按照選擇策略(包括executor資源狀態、最近執行流個數等)選擇一個executor下發作業流;

2. 然后executor判斷是否設置作業粒度分配,如果未設置作業粒度分配,則在當前executor執行所有作業;

3. 如果設置了作業粒度分配,則當前節點會成為作業分配的決策者,即分配節點;

4. 分配節點從zookeeper獲取各個executor的資源狀態信息,然后根據策略選擇一個executor分配作業;

5. 被分配到作業的executor即成為執行節點,執行作業,然后更新數據庫。

三、從源碼看作業流執行過程

首先是Webserver端:

1. ExecutorServlet類根據請求的ajax參數判斷,如果ajax=executeFlow,就去調ajaxAttemptExecuteFlow(req, resp, ret, session.getUser())方法

2. ajaxAttemptExecuteFlow方法里,首先調getProjectAjaxByPermission方法判斷用戶是否有執行權限,如果驗證權限通過,且Project和Flow都存在,就調ajaxExecuteFlow方法

3. ajaxExecuteFlow方法的主要作用就是構造ExecutableFlow對象,設定執行參數(通知機制,並發,失敗策略),然后去調executorManager.submitExecutableFlow方法

4. executorManager.submitExecutableFlow方法:判斷執行策略(流水線、忽略、並發);如果是多執行節點模式,則將作業流提交到執行隊列queue;如果是單執行節點模式,選擇唯一執行節點下發作業流。

5. ExecutorManager.submitExecutableFlow()方法是Webserver端下發作業流的主要實現邏輯,下面重點細述其內容:

    5.1 從exflow實例獲取作業流的flowId(就是作業流的名字),打日志(“開始提交流XXX by 某某某了”)。

    5.2 判斷queuedFlows是否滿,如果滿了打日志(“提交失敗,Azkaban過飽和啦”),return;如果未滿,繼續往下執行代碼

    5.3 獲取該作業流所有正在跑的實例的id, List<Integer> running
    5.4 獲取執行設置options
    5.5 從執行設置options里獲取流的執行參數(是否enable,是則將參數生效)
    5.6 判斷running是否為空,如果為空,即沒有並發的實例在跑
    5.7 如果running不為空,獲取並發設置getConcurrentOption()
         5.7.1 流水線(pipeline):設置pipelineExcutionId為running中最后提交的實例id
         5.7.2 忽略(skip):拋異常,“流已經在執行了,忽略本次執行”
         5.7.3 並發(ignore):僅修改日志
    5.8 根據白名單設置是否memoryCheck
    5.9 executorLoader.uploadExecutableFlow(exflow) 寫數據庫表execution_flows,狀態為preparing

    5.10 構造具體的執行實例ExecutionReference
    5.11 判斷是否多執行節點模式,如果不是,將該執行流的狀態標記為active,即寫數據庫表active_executing_flows,將流dispatch到唯一執行節點執行。
    5.12 如果是多執行節點模式,則將該執行流的狀態標記為active,然后將流放入執行隊列queuedFlows。

6. 如果是多執行節點模式,ExecutorManager類在構造函數里會調setupMultiExecutorMode()方法,該方法會建一個線程通過processQueuedFlows方法去持續地消費隊列里的首個作業流。processQueuedFlows方法的主要內容就是按照一定規則去refreshExecutors刷新執行節點的資源信息,以及selectExecutorAndDispatchFlow從activeExecutors中根據策略選擇一個executor下發作業流。refreshExecutors()方法實際上是通過遍歷每個active executor,去發請求獲取狀態信息,而不是通過zookeeper。

至此,Webserver端的工作已經完畢。

然后是Executor端:

1. 執行流到達Executor端,此時在數據庫中的狀態已經是preparing

2. ExecutorServlet類根據請求的action參數判斷,如果action=execute,就去調handleAjaxExecute(req, respMap, execid)方法

3. handleAjaxExecute方法里執行flowRunnerManager.submitFlow(execId),去調FlowRunnerManager的submitFlow(execId)方法來提交執行流。

4. FlowRunnerManager的兩個重要的數據結構:

    4.1 Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();

    4.2 Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();

    submittedFlows用於跟蹤當前executor所有處於preparing狀態的流的執行;runningFlows用於存數當前executor所有正在執行的流的信息,當需要執行cancling()或killing()的時候就可以找到這些流。

5. FlowRunnerManager.submitFlow(execId)方法是Executor端執行作業流的主要實現邏輯,下面重點細述其內容:

    5.1 先判斷runningFlows是否包含該execId對應的實例,如果已經包含,拋異常

    5.2 從executorLoader去獲取execId對應的執行實例(ExecutableFlow)flow

    5.3 執行setupFlow(flow),配置flow:創建項目和執行的目錄等
    5.4 獲取執行設置ExecutionOptions
    5.5 判斷pipelineExecId是否為null。如果不為null,就判斷pipelineExecId對應的flowRunner在不在runningFlows中。如果在runningFlows中,起一個LocalFlowWatcher去監控在flow中各個job的執行狀態;

    5.6 如果不在runningFlows中,起一個RemoteFlowWatcher去監控,即每隔一定時間(默認為60秒)通過讀取數據庫的記錄來監控流中各個job的狀態

    5.7 判斷執行參數里是否包含flow.num.job.threads,如果存在且小於默認值10,則修改該值。這個值代表該流可以同時執行的job線程數。
    5.8 構造一個新的FlowRunner實例runner
    5.9 configureFlowLevelMetrics(runner)配置runner 
    5.10 再次判斷runningFlows是否包含該次execId對應的執行實例,如果包含,拋異常
    5.11 將runner加入到runningFlows的map
    5.12 提交到TrackingThreadPool(工作線程池)
    5.13 加入到submittedFlows的map

6. 自此,我們就有了FlowRunner實例,下面我們看FlowRunner中都干了些什么事。

FlowRunner其實就是一個線程,它的run()方法的內容如下:

    6.1 Executors.newFixedThreadPool(numJobThreads) 創建flow內部job線程池flow
    6.2 setupFlowExecution()
    6.3 updateFlowReference()
    6.4 updateFlow() 更新flow的狀態信息,寫數據庫表execution_flows
    6.5 loadAllProperties()載入job參數和共享的參數
    6.6 判斷輸入參數是否包含job.dispatch(作業粒度分配),如果包含且為true,起一個新的線程jobEventUpdaterThread,用於跟蹤該作業流下各個作業的執行狀態。
    6.7 執行runFlow()
    6.8 runFlow()方法:根據DAG圖的算法依次執行job。從流的開始節點,遞歸調用runReadyjob()來執行作業,然后updateFlow();如果流還沒結束,根據重試設置,決定是否重跑失敗的作業。
    6.9 在runReadyjob()里會調runExecutableNode(node)方法,runExecutableNode方法再判斷job.dispatch參數,如果為false,則通過LocalJobRunner本地執行;如果為true,則再通過JobRunnerManager提交作業。
    6.10 JobRunnerManager通過submitExecutableNode方法構建RemoteJobRunner,RemoteJobRunner會根據各執行節點(包含本節點)的資源狀態去選擇一個節點執行作業。

 最后,整個過程可以總結成一個圖,如下圖所示:

                                                                       圖3 從源碼看作業流執行過程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM