Azkaban源碼學習筆記


1. ConnectorParams (interface): 定義了各種常量參數,沒有聲明任何方法。
2. ExecutorServlet.java類
  2.1 繼承類HttpServlet和接口ConnectorParams,用於處理Http請求,主要是Get請求,處理方式都寫在doGet方法中。
  2.2 init()方法:創建AzkabanExecutorServer實例,通過該executor server實例獲取flowRunnerManager,以及jobRunnerManager。
  2.3 doGet(HttpServletRequest req, HttpServletResponse resp)方法:處理具體的請求,並返回resp。 針對不同的action,進行不同的處理。
        action可以分為兩類:
        第一類不需要獲取execid和user,有三個action,分別是:update,ping,reloadJobTypePlugins;
        第二類action,會先獲取execid和user,包含:metadata,metadata_jobRunnerMgr,log,attachments, execute ,status,cancel,pause,resume,modifyExecution,job_execute,job_cancel
 
        action=execute時,ExecutorServlet類調handleAjaxExecute()方法去調flowRunnerManager.submitFlow(execId)來執行該工作流。 
 
3. FlowRunnerManager.java類  實現EventListener接口
    private Map<Future<?>, Integer>  submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();
    private Map<Integer, FlowRunner>  runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();   記錄當前正在執行的Flows,key是execId
    private Map<Integer, ExecutableFlow>  recentlyFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
     void submitFlow(int execId) 方法:
    先判斷runningFlows是不是已經包含該execId對應的作業流
    如果已經包含:拋異常
    如果不包含-》獲取execId對應的executableFlow實例flow-》然后執行setupFlow(flow)配置flow(創建項目和執行的目錄等)-》獲取執行配置(ExecutionOptions)-》判斷pipelineExecId是否為null,如果不為null-》判斷該pipelineExecId對應的flowRunner在不在runningFlows中
     如果在runningFlows中:起一個LocalFlowWatcher檢測該pipeline流的執行情況
     如果不在runningFlows中:起一個RemoteFlowWatcher檢測該pipeline流的執行情況。實際是起了一個RemoteUpdaterThread來每隔一定時間(默認為60秒)通過讀取數據庫的記錄監控流的狀態。
     -》判斷流執行設置里是否包含參數flow.num.job.threads,如果存在該參數,且小於默認的值10,則將該流並行執行的job線程數設置為該參數值。-》創建新的FlowRunner實例runner-》對線程做一些配置,configureFlowLevelMetrics(runner); -》再次check runningflows是否包含execId對應的線程-》將runner加入runningflows-》將這個執行流對應的future加入到submittedFlows里用於跟蹤流的執行,修改最后提交時間
 
     void handleEvent方法:當流Finish時,在 recentlyFinishedFlows 里加入該流,將流從runningFlows里去除
 
4. EventListener接口,聲明了一個方法:void handleEvent(Event event)
5. EventHandler.java類:包含一個HashSet<EventListener>,包含方法addListener,fireEventListener(該方法調用每個listener.handleEvent()),removeListener。
6. ExecutableFlow.java: 包含可執行流的相關信息和設置信息的方法。
 
7.pipelineExecId:pipeline就是並發策略里的流水線,該execId對應的flow正在執行的執行流中最后次提交的執行流execId
8. ExecutorManager.java類:
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>  runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
    private ConcurrentHashMap<Integer, ExecutableFlow>  recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
 
    8.1 public String  submitExecutableFlow(ExecutableFlow exflow, String userId)方法:將作業流提交到執行隊列
          方法過程:
          根據exflow獲取flowId -》判斷queuedFlows是否滿-》
            queuedFlows滿了-》提交失敗,打出log提示error
            queuedFlows未滿-》獲取該次執行流對應的作業流所有正在跑的實例running,獲取流執行設置(ExecutionOptions)-》獲取流的執行參數(是否enable,如果enable則將參數生效)-》判斷runningflows是否為空,如果不為空-》獲取並發設置
            並發設置:流水線(pipeline)-》設置流水線執行Id(PipelineExecutionId)為正在執行的最后次提交的執行流id,獲取pipeline level
            並發設置:忽略本次執行(skip)-》拋出異常ExecutorManagerException,給出提示該流已經有實例已經在執行,本次執行被skip了
            並發設置:並行執行-》僅修改message提示
            -》白名單設置?options.setMemoryCheck(memoryCheck);-》判斷是否多節點模式(isMultiExecutorMode)
            多節點模式:是-》將該flow記錄為正在執行的flow(executorLoader.addActiveExecutableReference(reference);),將作業流放入隊列queuedFlows.enqueue(exflow, reference);
            多節點模式:否-》將該flow記錄為正在執行的flow,選擇本地executor, 下發作業流dispatch(reference, exflow, choosenExecutor);            
 
9. ExecutableFlow.java類:一個可執行流的相關信息。                  
10.ExecutionReference.java類:存儲execId,executor,updateTime,nextCheckTime,numErrors信息;一個具體的執行實例
11. FlowWatcher.java類:檢測某個execId的各個作業的執行狀態。
    private int execId;
    private ExecutableFlow flow;
    private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();   該map用於存儲各個job的執行狀態,key為jobId,value為job的狀態
12. BlockingStatus.java類:管理特定作業的狀態,以同步的方式改變作業的狀態。當狀態處於block狀態時,線程會處於等待狀態,等待其他線程的通知(notify),最多等待時長為5分鍾。
      private static final long WAIT_TIME = 5 * 60 * 1000;
      private final int execId;
      private final String jobId;
      private Status status;
 
13. FlowRunner.java 繼承EventHandler,實現Runnable接口。
      public void  run(): 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM