[源碼解析]Oozie來龍去脈之內部執行


[源碼解析]Oozie來龍去脈之內部執行

0x00 摘要

Oozie由Cloudera公司貢獻給Apache的基於工作流引擎的開源框架,是用於Hadoop平台的開源的工作流調度引擎,用來管理Hadoop作業,進行。本文是系列的第二篇,介紹Oozie的內部執行階段。

前文[源碼解析]Oozie的來龍去脈 --- (1)提交任務階段 已經為大家展示了用戶提交一個Oozie Job之后做了什么,本文將沿着一個Workflow的執行流程為大家繼續剖析Oozie接下來做什么。

大致如下:

  • 在Oozie中准備Yarn Application Master
  • 介紹新舊兩版本的Yarn Application Master區別
  • 介紹Hive on Yarn
  • Tez是如何亂入到這個流程中的
  • Java on Yarn會是如何執行
  • Yarn Job結束之后如何返回Oozie

0x01 Oozie階段

1.1 ActionStartXCommand

我們假設Workflow在start之后,就進入到了一個Hive命令。

ActionStartXCommand的主要作用就是和Yarn交互,最后提交一個Yarn Application Master

ActionStartXCommand是 WorkflowXCommand的子類。重點函數還是loadState和execute。

public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
    private String jobId = null;
    protected String actionId = null;
    protected WorkflowJobBean wfJob = null;
    protected WorkflowActionBean wfAction = null;
    private JPAService jpaService = null;
    private ActionExecutor executor = null;
    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
    private List<JsonBean> insertList = new ArrayList<JsonBean>();
    protected ActionExecutorContext context = null;  
}

loadState 的作用就是從數據庫中獲取 WorkflowJobBean 和 WorkflowActionBean 信息

protected void loadState() throws CommandException {
    try {
        jpaService = Services.get().get(JPAService.class);
        if (jpaService != null) {
            if (wfJob == null) {
                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
            }
            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
        }
    }
}

execute函數如下。其主要業務就是executor.start(context, wfAction); 這里的executor是HiveActionExecutor。

@Override
protected ActionExecutorContext execute() throws CommandException {
    Configuration conf = wfJob.getWorkflowInstance().getConf();
    try {
        if(!caught) {
            // 這里是業務重點,就是啟動任務
            executor.start(context, wfAction);
          
            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                } else {
                    wfAction.setPending();
                    if (!(executor instanceof ControlNodeActionExecutor)) {
                        queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
                    }
                    else {
                        execSynchronous = true;
                    }
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
        }
    }
    finally {
            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
            ......
            if (execSynchronous) {
                // Changing to synchronous call from asynchronous queuing to prevent
                // undue delay from ::start:: to action due to queuing
                callActionEnd();
            }
        }
    }
    return null;
}

ActionExecutor.start是異步的,還需要檢查Action執行狀態來推進流程,oozie通過兩種方式來檢查任務是否完成。

  • 回調:當一個任務和一個計算被啟動后,會為任務提供一個回調url,該任務執行完成后,會執行回調來通知oozie

  • 輪詢:在任務執行回調失敗的情況下,無論任何原因,都支持以輪詢的方式進行查詢。

oozie提供這兩種方式來控制任務。后續我們會再提到。

1.2 HiveActionExecutor

上面代碼中 executor.start(context, wfAction); 就是啟動任務。

HiveActionExecutor繼承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor繼承 JavaActionExecutor,所以后續很多函數執行的是JavaActionExecutor中的函數。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {}

ActionExecutor.start就是執行的JavaActionExecutor.start()。

其會檢查文件系統,比如hdfs是不是支持,Action Dir是否ready,然后會submitLauncher。

public void start(Context context, WorkflowAction action) throws ActionExecutorException {
        FileSystem actionFs = context.getAppFileSystem();
        prepareActionDir(actionFs, context);
        submitLauncher(actionFs, context, action); // 這里是業務
        check(context, action);
}

submitLauncher主要功能是:

  • 1)對於某些類型job,調用injectActionCallback配置回調Action
  • 2)配置 action job
  • 3)調用createLauncherConf配置LauncherAM, 即Application Master
    • 3.1)配置回調conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
    • 3.2)設置"launcher Main Class"。LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
  • 4)調用HadoopAccessorService.createYarnClient來創建一個YarnClient
  • 5)調用UserGroupInformation繼續配置
  • 6)調用yarnClient.createApplication創建一個YarnClientApplication
  • 7)記錄ApplicationId
  • 8)調用createAppSubmissionContext建立Yarn App的執行環境
    • 8.1)appContext.setApplicationType("Oozie Launcher");
    • 8.2)設置容器信息 ContainerLaunchContext
    • 8.3)vargs.add(LauncherAM.class.getCanonicalName()); 比如設置AM啟動類
    • 8.4)return appContext;
  • 9)提交App,yarnClient.submitApplication(appContext); appContext就是前面return的。

具體代碼如下:

public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)throws ActionExecutorException {
    YarnClient yarnClient = null;
    try {
        // action job configuration
        Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
        setupActionConf(actionConf, context, actionXml, appPathRoot);
        addAppNameContext(context, action);
        setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
				// 配置回調Action
        injectActionCallback(context, actionConf);

        Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
        yarnClient = createYarnClient(context, launcherConf);
      
        //繼續配置各種Credentials
        if (UserGroupInformation.isSecurityEnabled()) {
           ......
        }

        if (alreadyRunning && !isUserRetry) {
          ......
        }
        else {
            YarnClientApplication newApp = yarnClient.createApplication();
            ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
            ApplicationSubmissionContext appContext =
                    createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml);
            // 這里正式與 Yarn 交互。
            yarnClient.submitApplication(appContext);

            launcherId = appId.toString();
            ApplicationReport appReport = yarnClient.getApplicationReport(appId);
            consoleUrl = appReport.getTrackingUrl();
        }

        String jobTracker = launcherConf.get(HADOOP_YARN_RM);
        context.setStartData(launcherId, jobTracker, consoleUrl);
    }
}

protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException {
        String user = context.getWorkflow().getUser();
        return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
}

0x2 舊版本LauncherMapper

這里我們有必要提一下舊版本的實現:LauncherMapper。

網上關於Oozie的文章很多都是基於舊版本,所以基本都提到了 LauncherMapper,比如:

Oozie本質就是一個作業協調工具(底層原理是通過將xml語言轉換成mapreduce程序來做,但只是在集中map端做處理,避免shuffle的過程)。

Oozie執行Action時,即ActionExecutor(最主要的子類是JavaActionExecutor,hive、spark等action都是這個類的子類),JavaActionExecutor首先會提交一個LauncherMapper(map任務)到yarn,其中會執行LauncherMain(具體的action是其子類,比如JavaMain、SparkMain等),spark任務會執行SparkMain,在SparkMain中會調用org.apache.spark.deploy.SparkSubmit來提交任務。其實訴我的map任務就是識別你是什么樣的任務(hive,shell,spark等),並通過該任務來啟動任務所需要的環境來提交任務。提供了提交任務的接口(如hive任務,啟動hive客戶端或beeline等)

從文檔看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko) 這時候被移除了。

我們從舊版本代碼中大致看看LauncherMapper的實現。

LauncherMapper繼承了 import org.apache.hadoop.mapred.Mapper;,實現了 map 函數。其內部就是調用用戶代碼的主函數。

import org.apache.hadoop.mapred.Mapper;

public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
   @Override
    public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
        SecurityManager initialSecurityManager = System.getSecurityManager();
        try {
            else {
                String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);

                    new LauncherSecurityManager();
                    setupHeartBeater(reporter);
                    setupMainConfiguration();
                    // Propagating the conf to use by child job.
                    propagateToHadoopConf();

                    executePrepare();
                    Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
                    Method mainMethod = klass.getMethod("main", String[].class);
                    mainMethod.invoke(null, (Object) args);
             }
        }
    }
}

在LauncherMapperHelper中,會設置LauncherMapper為啟動函數。

public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
        launcherConf.setMapperClass(LauncherMapper.class);
}

在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient

import org.apache.hadoop.mapred.JobClient;

public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
            jobClient = createJobClient(context, launcherJobConf);
            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML);

            // Set the launcher Main Class
            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); 
            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
            ......
  
            runningJob = jobClient.submitJob(launcherJobConf);  // 這里進行了提交
}      

綜上所述,舊版本 LauncherMapper 實現了一個 import org.apache.hadoop.mapred.Mapper;,具體是org.apache.hadoop.mapred.JobClient 負責與hadoop交互

0x3 新版本Yarn Application Master

新版本的Oozie是和Yarn深度綁定的,所以我們需要先介紹Yarn。

3. 1 YARN簡介

YARN 是 Hadoop 2.0 中的資源管理系統,它的基本設計思想是將 MRv1 中的 JobTracker拆分成了兩個獨立的服務:一個全局的資源管理器 ResourceManager 和每個應用程序特有的ApplicationMaster。 其中 ResourceManager 負責整個系統的資源管理和分配, 而 ApplicationMaster負責單個應用程序的管理。

YARN 總體上仍然是 Master/Slave 結構,在整個資源管理框架中,ResourceManager 為Master,NodeManager 為 Slave,ResourceManager 負責對各個 NodeManager 上的資源進行統一管理和調度。

當用戶提交一個應用程序時,需要提供一個用以跟蹤和管理這個程序的ApplicationMaster, 它負責向 ResourceManager 申請資源,並要求 NodeManager 啟動可以占用一定資源的任務。 由於不同的ApplicationMaster 被分布到不同的節點上,因此它們之間不會相互影響。

3.2 ApplicationMaster

用戶提交的每個應用程序均包含一個 AM,主要功能包括:

  • 與 RM 調度器協商以獲取資源(用 Container 表示);
  • 將得到的任務進一步分配給內部的任務;
  • 與 NM 通信以啟動 / 停止任務;
  • 監控所有任務運行狀態,並在任務運行失敗時重新為任務申請資源以重啟任務。

當用戶向 YARN 中提交一個應用程序后, YARN 將分兩個階段運行該應用程序 :

  • 第一個階段是啟動 ApplicationMaster ;
  • 第二個階段是由 ApplicationMaster 創建應用程序, 為它申請資源, 並監控它的整個運行過程, 直到運行完成。

工作流程分為以下幾個步驟:

  1. 用 戶 向 YARN 中 提 交 應 用 程 序, 其 中 包 括 ApplicationMaster 程 序、 啟 動ApplicationMaster 的命令、 用戶程序等。
  2. ResourceManager 為 該 應 用程 序 分 配 第 一 個 Container, 並 與 對應 的 NodeManager 通信,要求它在這個 Container 中啟動應用程序的 ApplicationMaster。
  3. ApplicationMaster 首 先 向 ResourceManager 注 冊, 這 樣 用 戶 可 以 直 接 通 過ResourceManage 查看應用程序的運行狀態, 然后它將為各個任務申請資源, 並監控它的運行狀態, 直到運行結束, 即重復步驟 4~7。
  4. ApplicationMaster 采用輪詢的方式通過 RPC 協議向 ResourceManager 申請和領取資源。
  5. 一旦 ApplicationMaster 申請到資源后, 便與對應的 NodeManager 通信, 要求它啟動任務。
  6. NodeManager 為任務設置好運行環境(包括環境變量、 JAR 包、 二進制程序等) 后, 將任務啟動命令寫到一個腳本中, 並通過運行該腳本啟動任務。
  7. 各個任務通過某個 RPC 協議向 ApplicationMaster 匯報自己的狀態和進度, 以讓 ApplicationMaster 隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。在應用程序運行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應用程序的當前運行狀態。
  8. 應用程序運行完成后,ApplicationMaster 向 ResourceManager 注銷並關閉自己。

3.3 LauncherAM

LauncherAM就是Oozie的ApplicationMaster實現。LauncherAM.main就是Yarn調用之處。

public class LauncherAM {
  
    public static void main(String[] args) throws Exception {
        final LocalFsOperations localFsOperations = new LocalFsOperations();
        final Configuration launcherConf = readLauncherConfiguration(localFsOperations);
        UserGroupInformation.setConfiguration(launcherConf);
        // MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+
        // SecurityUtil.setConfiguration(launcherConf);
        UserGroupInformation ugi = getUserGroupInformation(launcherConf);
        // Executing code inside a doAs with an ugi equipped with correct tokens.
        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                  LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(),
                        new AMRMCallBackHandler(),
                        new HdfsOperations(new SequenceFileWriterFactory()),
                        new LocalFsOperations(),
                        new PrepareActionsHandler(new LauncherURIHandlerFactory(null)),
                        new LauncherAMCallbackNotifierFactory(),
                        new LauncherSecurityManager(),
                        sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()),
                        launcherConf);
                    launcher.run();
                    return null;
            }
        });
    }  
}

launcher.run主要完成

通過registerWithRM調用AMRMClientAsync來注冊到Resource Manager

  • executePrepare / setupMainConfiguration 完成初始化,准備和配置
  • runActionMain會根據配置調用具體的main函數,比如HiveMain
    • Class<?> klass = launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
    • Method mainMethod = klass.getMethod("main", String[].class);
    • mainMethod.invoke(null, (Object) mainArgs);
  • 調用uploadActionDataToHDFS同步HDFS
  • 調用unregisterWithRM從RM解綁
  • 調用LauncherAMCallbackNotifier.notifyURL通知Oozie

具體代碼如下:

public void run() throws Exception {
    try {
        actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH));
        registerWithRM(amrmCallBackHandler);
        // Run user code without the AM_RM_TOKEN so users can't request containers
        UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME);

        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                executePrepare(errorHolder);
                setupMainConfiguration();
                runActionMain(errorHolder); // 會根據配置調用具體的main函數,比如HiveMain
                return null;
            }
        });
    } 
    finally {
        try {
            actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString());
            hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData);
        } finally {
            try {
                unregisterWithRM(actionResult, errorHolder.getErrorMessage());
            } finally {
                LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);
            }
        }
    }
}

但是你會發現,對比之前所說的ApplicationMaster應該實現的功能,LauncherAM 做得恁少了點,這是個疑問! 我們在后續研究中會為大家揭開這個秘密。

0x4 Hive on Yarn

上文提到,runActionMain會根據配置調用具體的main函數。我們假設是hive action,則對應的是HiveMain。

Hive job的入口函數是在HIVE_MAIN_CLASS_NAME配置的。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {
    private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";

	  @Override
    public List<Class<?>> getLauncherClasses() {
        List<Class<?>> classes = new ArrayList<Class<?>>();
        classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 這里配置了 HiveMain
        return classes;
    }  
}

HiveMain后續調用如下

HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);

最后調用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操作,大致有:

  • 設定參數;
  • 如果有腳本,則設定腳本路徑;
  • 如果有之前的yarn child jobs,殺掉;
  • 執行hive;
  • 寫log;

具體如下:

public class HiveMain extends LauncherMain {
    public static void main(String[] args) throws Exception {
        run(HiveMain.class, args);
    }
  
   @Override
    protected void run(String[] args) throws Exception {
        Configuration hiveConf = setUpHiveSite();
        List<String> arguments = new ArrayList<String>();

        String logFile = setUpHiveLog4J(hiveConf);
        arguments.add("--hiveconf");
        arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath());
        arguments.add("--hiveconf");
        arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath());

        //setting oozie workflow id as caller context id for hive
        String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID);
        arguments.add("--hiveconf");
        arguments.add("hive.log.trace.id=" + callerId);

        String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT);
        String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY);
        if (scriptPath != null) {
            ......
            // print out current directory & its contents
            File localDir = new File("dummy").getAbsoluteFile().getParentFile();
            String[] files = localDir.list();

            // Prepare the Hive Script
            String script = readStringFromFile(scriptPath);
            arguments.add("-f");
            arguments.add(scriptPath);
        } else if (query != null) {
            String filename = createScriptFile(query);
            arguments.add("-f");
            arguments.add(filename);
        } 

        // Pass any parameters to Hive via arguments
        ......
        String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
        for (String hiveArg : hiveArgs) {
            arguments.add(hiveArg);
        }
        LauncherMain.killChildYarnJobs(hiveConf);

        try {
            runHive(arguments.toArray(new String[arguments.size()]));
        }
        finally {
            writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive");
        }
    }  
}

因此我們能看到,Oozie ApplicationMaster 在被Yarn調用之后,就是通過org.apache.hadoop.hive.cli.CliDriver 給Hive發送命令讓其執行,沒有什么再和ResourceManager / NodeManager 交互的過程,這真的很奇怪。這個秘密要由下面的Tez來解答。

0x5 Tez計算框架

Tez是Apache開源的支持DAG作業的計算框架,它直接源於MapReduce框架,核心思想是將Map和Reduce兩個操作進一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,這樣,這些分解后的元操作可以任意靈活組合,產生新的操作,這些操作經過一些控制程序組裝后,可形成一個大的DAG作業。

Tez有以下特點:

  • Apache二級開源項目
  • 運行在YARN之上
  • 適用於DAG(有向圖)應用(同Impala、Dremel和Drill一樣,可用於替換Hive/Pig等)

可以看到,Tez也是和Yarn深度綁定的。

5.1 DAGAppMaster

首先我們就找到了Tez對應的Application Master,即Tez DAG Application Master

public class DAGAppMaster extends AbstractService {
  public String submitDAGToAppMaster(DAGPlan dagPlan,
      Map<String, LocalResource> additionalResources) throws TezException {
      startDAG(dagPlan, additionalResources);
    }
  }  
}

我們能看到提交Application Master代碼。

public class TezYarnClient extends FrameworkClient {
  @Override
  public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
      throws YarnException, IOException, TezException {
   	ApplicationId appId= yarnClient.submitApplication(appSubmissionContext);
    ApplicationReport appReport = getApplicationReport(appId);
    return appId;
  }
}

這里是建立Application Master context 代碼,設置了Application Maste類和Container。

  public static ApplicationSubmissionContext createApplicationSubmissionContext(
      ApplicationId appId, DAG dag, String amName,
      AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
      Credentials sessionCreds, boolean tezLrsAsArchive,
      TezApiVersionInfo apiVersionInfo,
      ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
      throws IOException, YarnException {

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<String>(8);
    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

    String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability);
    vargs.add(amOpts);

    // 這里設置了 Application Master
    vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS);

    // 這里設置了命令行參數 
    Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());

    // 設置了container
    // Setup ContainerLaunchContext for AM container
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(amLocalResources, environment,
            vargsFinal, serviceData, securityTokens, acls);

    // Set up the ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);

    appContext.setAMContainerSpec(amContainer);

    return appContext;
}

5.2 與Resource Manager交互

這里只摘要部分代碼,能看到Tez實現了與Yarn Resource Manager交互

YarnTaskSchedulerService實現了AMRMClientAsync.CallbackHandler,其功能是處理由Resource Manager收到的消息,其實現了方法

import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;

public class YarnTaskSchedulerService extends TaskScheduler
                             implements AMRMClientAsync.CallbackHandler {
  @Override
  public void onContainersAllocated(List<Container> containers) {
      if (!shouldReuseContainers) {
        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
        assignedContainers = assignNewlyAllocatedContainers(
            modifiableContainerList);
      } 
    }
    // upcall to app must be outside locks
    informAppAboutAssignments(assignedContainers);
  }

  @Override
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    synchronized (this) {
      for(ContainerStatus containerStatus : statuses) {
        ContainerId completedId = containerStatus.getContainerId();
        HeldContainer delayedContainer = heldContainers.get(completedId);

        Object task = releasedContainers.remove(completedId);
        appContainerStatus.put(task, containerStatus);
        continue;
       }

        // not found in released containers. check currently allocated containers
        // no need to release this container as the RM has already completed it
        task = unAssignContainer(completedId, false);
        if (delayedContainer != null) {
          heldContainers.remove(completedId);
          Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
        } 
        if(task != null) {
          // completion of a container we have allocated currently
          // an allocated container completed. notify app. This will cause attempt to get killed
          appContainerStatus.put(task, containerStatus);
          continue;
        }
      }
    }

    // upcall to app must be outside locks
    for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
      getContext().containerCompleted(entry.getKey(), entry.getValue());
    }
  }
}
  • onContainersAllocated : 當有新的Container 可以使用。這里時啟動container 的代碼。
  • onContainersCompleted 是Container 運行結束。 在onContainersCompleted 中,如果是失敗的Container,我們需要重新申請並啟動Container,成功的將做記錄既可以。

由此我們可以看到,Oozie是一個甩手掌櫃,他只管啟動Hive,具體后續如何與RM交互,則完全由Tez搞定。這就解答了之前我們所有疑惑

最后總結下新流程:

  1. Oozie提交LauncherAM到Yarn;
  2. LauncherAM運行HiveMain,其調用CliDriver.main給Hive提交任務;
  3. Hive on Tez,所以Tez准備DAGAppMaster;
  4. Yarn與Tez交互:Tez提交DAGAppMaster到Yarn,Tez解析運行Hive命令;
  5. Hive運行結束后,調用回調 url 通知Oozie;

原諒我用這種辦法畫圖,因為我最討厭看到一篇好文,結果發現圖沒了......

+---------+                       +----------+                       +-----------+
|         | 1-submit LauncherAM   |          | 2.CliDriver.main      |           |  
|         |---------------------->| HiveMain |---------------------> |           |
|         |                       |          |                       |           |--+
| [Oozie] |                       |  [Yarn]  |                       |   [Hive]  |  | 3.Run 
|         |                       |          |                       |           |  | Hive     
|         | 5-notifyURL of Oozie  |          | 4-submit DAGAppMaster |           |<-+
|         |<----------------------|          | <-------------------->|    Tez    |
|         |                       |          |                       |           |
+---------+                       +----------+                       +-----------+

0x6 Java on Yarn

下面我們看看如果Oozie執行一個Java程序,是如何進行的。

Java程序的主執行函數是 JavaMain,這個就簡單多了,就是直接調用用戶的Java主函數。

public class JavaMain extends LauncherMain {
    public static final String JAVA_MAIN_CLASS = "oozie.action.java.main";

   /**
    * @param args Invoked from LauncherAM:run()
    * @throws Exception in case of error when running the application
    */
    public static void main(String[] args) throws Exception {
        run(JavaMain.class, args);
    }

    @Override
    protected void run(String[] args) throws Exception {

        Configuration actionConf = loadActionConf();
        setYarnTag(actionConf);
        setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
        setApplicationTags(actionConf, SPARK_YARN_TAGS);

        LauncherMain.killChildYarnJobs(actionConf);

        Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
        Method mainMethod = klass.getMethod("main", String[].class);
        mainMethod.invoke(null, (Object) args);
    }
}

0x7 Yarn job 執行結束

7.1 檢查任務機制

前面提到,ActionExecutor.start是異步的,還需要檢查Action執行狀態來推進流程,oozie通過兩種方式來檢查任務是否完成。

  • 回調:當一個任務和一個計算被啟動后,會為任務提供一個回調url,該任務執行完成后,會執行回調來通知oozie
  • 輪詢:在任務執行回調失敗的情況下,無論任何原因,都支持以輪詢的方式進行查詢。

oozie提供這兩種方式來控制任務。

7.2 回調機制

LauncherAM 在用戶程序執行完成之后,會做如下調用,以通知Oozie。這就用到了“回調”機制。

LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);

Oozie的CallbackServlet會響應這個調用。可以看到,DagEngine.processCallback是Oozie處理程序結束之處。

public class CallbackServlet extends JsonRestServlet {
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String queryString = request.getQueryString();
        CallbackService callbackService = Services.get().get(CallbackService.class);

        String actionId = callbackService.getActionId(queryString);

        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();

        dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
        }
    }
}

DagEngine.processCallback主要是使用CompletedActionXCommand來進行。可以看到這個命令是放到 CallableQueueService 的 queue中,所以下面我們需要介紹 CallableQueueService

 public void processCallback(String actionId, String externalStatus, Properties actionData)
          throws DagEngineException {
      XCallable<Void> command = new CompletedActionXCommand(actionId, externalStatus,
      actionData, HIGH_PRIORITY);
      if (!Services.get().get(CallableQueueService.class).queue(command)) {
          LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
      }
}

7.3 異步執行

7.3.1 CallableQueueService

Oozie 使用 CallableQueueService 來異步執行操作;

public class CallableQueueService implements Service, Instrumentable {
    private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
    private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
    private Set<String> interruptTypes;
    private int interruptMapMaxSize;
    private int maxCallableConcurrency;
    private int queueAwaitTerminationTimeoutSeconds;
    private int queueSize;
    private PriorityDelayQueue<CallableWrapper<?>> queue;
    private ThreadPoolExecutor executor;
    private Instrumentation instrumentation;
    private boolean newImpl = false;
    private AsyncXCommandExecutor asyncXCommandExecutor; 
  
    public void init(Services services) {
          queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
                    MAX_CALLABLE_WAITTIME_MS,
                    TimeUnit.MILLISECONDS,
                    queueSize) {
                @Override
                protected boolean eligibleToPoll(QueueElement<?> element) {
                    if (element != null) {
                        CallableWrapper wrapper = (CallableWrapper) element;
                        if (element.getElement() != null) {
                            return callableReachMaxConcurrency(wrapper.getElement());
                        }
                    }
                    return false;
                }
            };  
    }
}

特點:

  • 加入執行隊列的任務可能是可以立即被吊起的,也可能是未來某個時間才觸發的。
  • 執行線程池根據 任務的執行時間和任務的優先級別來選取任務吊起。
  • 執行線程池的任務隊列大小可配置,當到達隊列最大值,線程池將不再接收任務。

7.3.3 PriorityDelayQueue

線程池選取的隊列是oozie自定義的隊列 PriorityDelayQueue:

特點:

根據隊列中元素的延時時間以及其執行優先級出隊列:

實現策略:

PriorityDelayQueue 中為每個優先級別的任務設置一個 延時隊列 DelayQueue
因為使用的是jdk自帶的延時隊列 DelayQueue,可以保證的是如果任務在該隊列中的延時時間滿足條件,我們
通過poll()方法即可得到滿足延時條件的任務,如果 poll()得到的是null,說明該隊列的中任務沒有滿足時間條件的任務。

如何編排多個優先級的隊列:
每次從PriorityDelayQueue去選取任務,都優先從最高優先級的隊列來poll出任務,如果最高的優先級隊列中沒有滿足條件的任務,則次優先級隊列poll出任務,如果仍未獲取
將按照隊列優先等級以此類推。
餓死現象:假如高優先級中的任務在每次獲取的時候都滿足條件,這樣容易將低優先級的隊列中滿足條件的任務活活餓死,為了防止這種情況的產生,在每次選取任務之前,遍歷
低優先級隊列任務,如果任務早已經滿足出隊列條件,如果超時時間超過了我們設定的最大值,我們會為這個任務提高優先級,將這個任務優先級加一,添加到上個優先級隊列中進行
排隊。

7.3.3 PollablePriorityDelayQueue

特點:

在從隊列中選取任務的時候,先判斷滿足時間的任務是否滿足並發等限制,如果滿足再從隊列中取出,而不是像PriorityDelayQueue那樣,先取出如果不滿足並發等限制,再將該任務重新放置回去。

任務類型:

使用線程池異步執行任務,任務和任務之間是無序的,針對具體的業務場景,可能執行的單元是需要串序執行的。oozie中封裝了 CompositeCallable 和 一般的 XCallable的任務類型,前者是XCallable的一個集合,它能保證的是這個集合里面的XCallable是順序執行的。

7.4 跳轉下一個操作

CompletedActionXCommand 當Workflow command結束時候會執行,且只執行一次。對於程序結束,會在異步隊列中加入一個 ActionCheckXCommand。

public class CompletedActionXCommand extends WorkflowXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {
        if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) {
           .....
        } else {    // RUNNING
            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType());
            // this is done because oozie notifications (of sub-wfs) is send
            // every status change, not only on completion.
            if (executor.isCompleted(externalStatus)) {
                queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1));
            }
        }
        return null;
    }  
}

異步調用到ActionCheckXCommand,其主要作用是:

  • 如果有重試機制,則做相應配置
  • 調用 executor.check(context, wfAction); 來檢查環境信息
  • 更新數據庫中的任務信息
  • 因為已經結束了,所以用ActionEndXCommand來執行結束
public class ActionCheckXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        ActionExecutorContext context = null;
        boolean execSynchronous = false;
        try {
            boolean isRetry = false; // 如果有重試機制,則做相應配置
            if (wfAction.getRetries() > 0) {
                isRetry = true;
            }
            boolean isUserRetry = false;
            context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
          
            executor.check(context, wfAction); // 檢查環境信息

            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                    generateEvent = true;
                } else {
                    wfAction.setPending();
                    execSynchronous = true;
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
            updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
                    wfJob));
        }
        finally {
                // 更新數據庫中的任務信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
                if (generateEvent && EventHandlerService.isEnabled()) {
                    generateEvent(wfAction, wfJob.getUser());
                }
                if (execSynchronous) {
                    // 用ActionEndXCommand來執行結束
                    new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
                }
        }
        return null;
    }
}

調用到 JavaActionExecutor.check

  • 根據配置信息建立 yarnClient = createYarnClient(context, jobConf);
  • 獲取程序報告信息 ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
  • 獲取程序數據 Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
  • 設置各種信息
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
    boolean fallback = false;
    YarnClient yarnClient = null;
    try {
        Element actionXml = XmlUtils.parseXml(action.getConf());
        Configuration jobConf = createBaseHadoopConf(context, actionXml);
        FileSystem actionFs = context.getAppFileSystem();
        yarnClient = createYarnClient(context, jobConf); // 根據配置信息建立
        FinalApplicationStatus appStatus = null;
        try {
            final String effectiveApplicationId = findYarnApplicationId(context, action);
            final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId);
            final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 獲取程序報告信息
            final YarnApplicationState appState = appReport.getYarnApplicationState();
            if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
                    || appState == YarnApplicationState.KILLED) {
                appStatus = appReport.getFinalApplicationStatus();
            }
        } 
        if (appStatus != null || fallback) {
            Path actionDir = context.getActionDir();
            // load sequence file into object
            Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);   // 獲取程序數據
            if (fallback) {
                String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
                if (finalStatus != null) {
                    appStatus = FinalApplicationStatus.valueOf(finalStatus);
                } else {
                    context.setExecutionData(FAILED, null);
                }
            }

            String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
            if (externalID != null) {
                context.setExternalChildIDs(externalID);
             }

           // Multiple child IDs - Pig or Hive action
            String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
            if (externalIDs != null) {
                context.setExternalChildIDs(externalIDs);
             }

            // 設置各種信息
            context.setExecutionData(appStatus.toString(), null);
            if (appStatus == FinalApplicationStatus.SUCCEEDED) {
                if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
                    context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
                            .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
                }
                else {
                    context.setExecutionData(SUCCEEDED, null);
                }
                if (LauncherHelper.hasStatsData(actionData)) {
                    context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
                }
                getActionData(actionFs, action, context);
            }
            else {
                ......
                context.setExecutionData(FAILED_KILLED, null);
            }
        }
    }
    finally {
        if (yarnClient != null) {
            IOUtils.closeQuietly(yarnClient);
        }
    }
}

ActionEndXCommand會進行結束和跳轉:

  • 調用Executor來完成結束操作 executor.end(context, wfAction);
  • 更新數據庫的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete
  • 用 SignalXCommand 來進行跳轉,進行下一個Action的執行
public class ActionEndXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        Configuration conf = wfJob.getWorkflowInstance().getConf();

        if (!(executor instanceof ControlNodeActionExecutor)) {
            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
        }

        executor.setMaxRetries(maxRetries);
        executor.setRetryInterval(retryInterval);

        boolean isRetry = false;
        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
            isRetry = true;
        }
        boolean isUserRetry = false;
        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
        try {
          
            executor.end(context, wfAction); // 調用Executor來完成結束操作

            if (!context.isEnded()) {
                failJob(context);
            } else {
                wfAction.setRetries(0);
                wfAction.setEndTime(new Date());

                boolean shouldHandleUserRetry = false;
                Status slaStatus = null;
                switch (wfAction.getStatus()) {
                    case OK:
                        slaStatus = Status.SUCCEEDED;
                        break;
                    ......
                }
                if (!shouldHandleUserRetry || !handleUserRetry(context, wfAction)) {
                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus,
                            SlaAppType.WORKFLOW_ACTION);
                    if(slaEvent != null) {
                        insertList.add(slaEvent);
                    }
                }
            }
            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
            DagELFunctions.setActionInfo(wfInstance, wfAction);
            wfJob.setWorkflowInstance(wfInstance);

            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
            wfJob.setLastModifiedTime(new Date());
            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
        }
        finally {
            try { 
                // 更新數據庫的job信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
            }
            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                generateEvent(wfAction, wfJob.getUser());
            }
            new SignalXCommand(jobId, actionId).call(); // 進行跳轉,進行下一個Action的執行
        }
        return null;
    }  
}

0xFF 參考

大數據之Oozie——源碼分析(一)程序入口

什么是Oozie——大數據任務調度框架

Oozie基礎小結

【原創】大數據基礎之Oozie(1)簡介、源代碼解析

【原創】大叔經驗分享(6)Oozie如何查看提交到Yarn上的任務日志

Oozie和Azkaban的技術選型和對比

Oozie-TransitionXCommand

Oozie-Service-CallableQueueService

YARN基本框架分析

Oozie任務調度阻塞及內存優化方法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM