1. ConnectorParams (interface): 定義了各種常量參數,沒有聲明任何方法。
2. ExecutorServlet.java類
2.1 繼承類HttpServlet和接口ConnectorParams,用於處理Http請求,主要是Get請求,處理方式都寫在doGet方法中。
2.2 init()方法:創建AzkabanExecutorServer實例,通過該executor server實例獲取flowRunnerManager,以及jobRunnerManager。
2.3 doGet(HttpServletRequest req, HttpServletResponse resp)方法:處理具體的請求,並返回resp。 針對不同的action,進行不同的處理。
action可以分為兩類:
第一類不需要獲取execid和user,有三個action,分別是:update,ping,reloadJobTypePlugins;
第二類action,會先獲取execid和user,包含:metadata,metadata_jobRunnerMgr,log,attachments,
execute
,status,cancel,pause,resume,modifyExecution,job_execute,job_cancel
action=execute時,ExecutorServlet類調handleAjaxExecute()方法去調flowRunnerManager.submitFlow(execId)來執行該工作流。
3. FlowRunnerManager.java類 實現EventListener接口
private Map<Future<?>, Integer>
submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();
private Map<Integer, FlowRunner>
runningFlows = new ConcurrentHashMap<Integer, FlowRunner>(); 記錄當前正在執行的Flows,key是execId
private Map<Integer, ExecutableFlow>
recentlyFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
void submitFlow(int execId) 方法:
先判斷runningFlows是不是已經包含該execId對應的作業流
如果已經包含:拋異常
如果不包含-》獲取execId對應的executableFlow實例flow-》然后執行setupFlow(flow)配置flow(創建項目和執行的目錄等)-》獲取執行配置(ExecutionOptions)-》判斷pipelineExecId是否為null,如果不為null-》判斷該pipelineExecId對應的flowRunner在不在runningFlows中
如果在runningFlows中:起一個LocalFlowWatcher檢測該pipeline流的執行情況
如果不在runningFlows中:起一個RemoteFlowWatcher檢測該pipeline流的執行情況。實際是起了一個RemoteUpdaterThread來每隔一定時間(默認為60秒)通過讀取數據庫的記錄監控流的狀態。
-》判斷流執行設置里是否包含參數flow.num.job.threads,如果存在該參數,且小於默認的值10,則將該流並行執行的job線程數設置為該參數值。-》創建新的FlowRunner實例runner-》對線程做一些配置,configureFlowLevelMetrics(runner); -》再次check runningflows是否包含execId對應的線程-》將runner加入runningflows-》將這個執行流對應的future加入到submittedFlows里用於跟蹤流的執行,修改最后提交時間
void handleEvent方法:當流Finish時,在
recentlyFinishedFlows 里加入該流,將流從runningFlows里去除
4. EventListener接口,聲明了一個方法:void handleEvent(Event event)
5. EventHandler.java類:包含一個HashSet<EventListener>,包含方法addListener,fireEventListener(該方法調用每個listener.handleEvent()),removeListener。
6. ExecutableFlow.java: 包含可執行流的相關信息和設置信息的方法。
7.pipelineExecId:pipeline就是並發策略里的流水線,該execId對應的flow正在執行的執行流中最后次提交的執行流execId
8. ExecutorManager.java類:
private ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>
runningFlows = new ConcurrentHashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
private ConcurrentHashMap<Integer, ExecutableFlow>
recentlyFinished = new ConcurrentHashMap<Integer, ExecutableFlow>();
8.1 public String
submitExecutableFlow(ExecutableFlow exflow, String userId)方法:將作業流提交到執行隊列
方法過程:
根據exflow獲取flowId -》判斷queuedFlows是否滿-》
queuedFlows滿了-》提交失敗,打出log提示error
queuedFlows未滿-》獲取該次執行流對應的作業流所有正在跑的實例running,獲取流執行設置(ExecutionOptions)-》獲取流的執行參數(是否enable,如果enable則將參數生效)-》判斷runningflows是否為空,如果不為空-》獲取並發設置
並發設置:流水線(pipeline)-》設置流水線執行Id(PipelineExecutionId)為正在執行的最后次提交的執行流id,獲取pipeline level
並發設置:忽略本次執行(skip)-》拋出異常ExecutorManagerException,給出提示該流已經有實例已經在執行,本次執行被skip了
並發設置:並行執行-》僅修改message提示
-》白名單設置?options.setMemoryCheck(memoryCheck);-》判斷是否多節點模式(isMultiExecutorMode)
多節點模式:是-》將該flow記錄為正在執行的flow(executorLoader.addActiveExecutableReference(reference);),將作業流放入隊列queuedFlows.enqueue(exflow, reference);
多節點模式:否-》將該flow記錄為正在執行的flow,選擇本地executor,
下發作業流dispatch(reference, exflow, choosenExecutor);
9. ExecutableFlow.java類:一個可執行流的相關信息。
10.ExecutionReference.java類:存儲execId,executor,updateTime,nextCheckTime,numErrors信息;一個具體的執行實例
11. FlowWatcher.java類:檢測某個execId的各個作業的執行狀態。
private int execId;
private ExecutableFlow flow;
private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>(); 該map用於存儲各個job的執行狀態,key為jobId,value為job的狀態
12. BlockingStatus.java類:管理特定作業的狀態,以同步的方式改變作業的狀態。當狀態處於block狀態時,線程會處於等待狀態,等待其他線程的通知(notify),最多等待時長為5分鍾。
private static final long WAIT_TIME = 5 * 60 * 1000;
private final int execId;
private final String jobId;
private Status status;
13. FlowRunner.java 繼承EventHandler,實現Runnable接口。
public void
run():
