[源碼解析]Oozie來龍去脈之內部執行
0x00 摘要
Oozie由Cloudera公司貢獻給Apache的基於工作流引擎的開源框架,是用於Hadoop平台的開源的工作流調度引擎,用來管理Hadoop作業,進行。本文是系列的第二篇,介紹Oozie的內部執行階段。
前文[源碼解析]Oozie的來龍去脈 --- (1)提交任務階段 已經為大家展示了用戶提交一個Oozie Job之后做了什么,本文將沿着一個Workflow的執行流程為大家繼續剖析Oozie接下來做什么。
大致如下:
- 在Oozie中准備Yarn Application Master
- 介紹新舊兩版本的Yarn Application Master區別
- 介紹Hive on Yarn
- Tez是如何亂入到這個流程中的
- Java on Yarn會是如何執行
- Yarn Job結束之后如何返回Oozie
0x01 Oozie階段
1.1 ActionStartXCommand
我們假設Workflow在start之后,就進入到了一個Hive命令。
ActionStartXCommand的主要作用就是和Yarn交互,最后提交一個Yarn Application Master。
ActionStartXCommand是 WorkflowXCommand的子類。重點函數還是loadState和execute。
public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
private String jobId = null;
protected String actionId = null;
protected WorkflowJobBean wfJob = null;
protected WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
protected ActionExecutorContext context = null;
}
loadState 的作用就是從數據庫中獲取 WorkflowJobBean 和 WorkflowActionBean 信息
protected void loadState() throws CommandException {
try {
jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
if (wfJob == null) {
this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
}
this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId);
}
}
}
execute函數如下。其主要業務就是executor.start(context, wfAction); 這里的executor是HiveActionExecutor。
@Override
protected ActionExecutorContext execute() throws CommandException {
Configuration conf = wfJob.getWorkflowInstance().getConf();
try {
if(!caught) {
// 這里是業務重點,就是啟動任務
executor.start(context, wfAction);
if (wfAction.isExecutionComplete()) {
if (!context.isExecuted()) {
failJob(context);
} else {
wfAction.setPending();
if (!(executor instanceof ControlNodeActionExecutor)) {
queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
}
else {
execSynchronous = true;
}
}
}
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction));
}
}
finally {
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
......
if (execSynchronous) {
// Changing to synchronous call from asynchronous queuing to prevent
// undue delay from ::start:: to action due to queuing
callActionEnd();
}
}
}
return null;
}
ActionExecutor.start是異步的,還需要檢查Action執行狀態來推進流程,oozie通過兩種方式來檢查任務是否完成。
-
回調:當一個任務和一個計算被啟動后,會為任務提供一個回調url,該任務執行完成后,會執行回調來通知oozie
-
輪詢:在任務執行回調失敗的情況下,無論任何原因,都支持以輪詢的方式進行查詢。
oozie提供這兩種方式來控制任務。后續我們會再提到。
1.2 HiveActionExecutor
上面代碼中 executor.start(context, wfAction); 就是啟動任務。
HiveActionExecutor繼承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor繼承 JavaActionExecutor,所以后續很多函數執行的是JavaActionExecutor中的函數。
public class HiveActionExecutor extends ScriptLanguageActionExecutor {}
ActionExecutor.start就是執行的JavaActionExecutor.start()。
其會檢查文件系統,比如hdfs是不是支持,Action Dir是否ready,然后會submitLauncher。
public void start(Context context, WorkflowAction action) throws ActionExecutorException {
FileSystem actionFs = context.getAppFileSystem();
prepareActionDir(actionFs, context);
submitLauncher(actionFs, context, action); // 這里是業務
check(context, action);
}
submitLauncher主要功能是:
- 1)對於某些類型job,調用injectActionCallback配置回調Action
- 2)配置 action job
- 3)調用createLauncherConf配置LauncherAM, 即Application Master
- 3.1)配置回調conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
- 3.2)設置"launcher Main Class"。LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
- 4)調用HadoopAccessorService.createYarnClient來創建一個YarnClient
- 5)調用UserGroupInformation繼續配置
- 6)調用yarnClient.createApplication創建一個YarnClientApplication
- 7)記錄ApplicationId
- 8)調用createAppSubmissionContext建立Yarn App的執行環境
- 8.1)appContext.setApplicationType("Oozie Launcher");
- 8.2)設置容器信息 ContainerLaunchContext
- 8.3)vargs.add(LauncherAM.class.getCanonicalName()); 比如設置AM啟動類
- 8.4)return appContext;
- 9)提交App,yarnClient.submitApplication(appContext); appContext就是前面return的。
具體代碼如下:
public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)throws ActionExecutorException {
YarnClient yarnClient = null;
try {
// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
setupActionConf(actionConf, context, actionXml, appPathRoot);
addAppNameContext(context, action);
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
// 配置回調Action
injectActionCallback(context, actionConf);
Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
yarnClient = createYarnClient(context, launcherConf);
//繼續配置各種Credentials
if (UserGroupInformation.isSecurityEnabled()) {
......
}
if (alreadyRunning && !isUserRetry) {
......
}
else {
YarnClientApplication newApp = yarnClient.createApplication();
ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
ApplicationSubmissionContext appContext =
createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml);
// 這里正式與 Yarn 交互。
yarnClient.submitApplication(appContext);
launcherId = appId.toString();
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
consoleUrl = appReport.getTrackingUrl();
}
String jobTracker = launcherConf.get(HADOOP_YARN_RM);
context.setStartData(launcherId, jobTracker, consoleUrl);
}
}
protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException {
String user = context.getWorkflow().getUser();
return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
}
0x2 舊版本LauncherMapper
這里我們有必要提一下舊版本的實現:LauncherMapper。
網上關於Oozie的文章很多都是基於舊版本,所以基本都提到了 LauncherMapper,比如:
Oozie本質就是一個作業協調工具(底層原理是通過將xml語言轉換成mapreduce程序來做,但只是在集中map端做處理,避免shuffle的過程)。
Oozie執行Action時,即ActionExecutor(最主要的子類是JavaActionExecutor,hive、spark等action都是這個類的子類),JavaActionExecutor首先會提交一個LauncherMapper(map任務)到yarn,其中會執行LauncherMain(具體的action是其子類,比如JavaMain、SparkMain等),spark任務會執行SparkMain,在SparkMain中會調用org.apache.spark.deploy.SparkSubmit來提交任務。其實訴我的map任務就是識別你是什么樣的任務(hive,shell,spark等),並通過該任務來啟動任務所需要的環境來提交任務。提供了提交任務的接口(如hive任務,啟動hive客戶端或beeline等)
從文檔看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko) 這時候被移除了。
我們從舊版本代碼中大致看看LauncherMapper的實現。
LauncherMapper繼承了 import org.apache.hadoop.mapred.Mapper;,實現了 map 函數。其內部就是調用用戶代碼的主函數。
import org.apache.hadoop.mapred.Mapper;
public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
@Override
public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException {
SecurityManager initialSecurityManager = System.getSecurityManager();
try {
else {
String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);
new LauncherSecurityManager();
setupHeartBeater(reporter);
setupMainConfiguration();
// Propagating the conf to use by child job.
propagateToHadoopConf();
executePrepare();
Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
Method mainMethod = klass.getMethod("main", String[].class);
mainMethod.invoke(null, (Object) args);
}
}
}
}
在LauncherMapperHelper中,會設置LauncherMapper為啟動函數。
public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
launcherConf.setMapperClass(LauncherMapper.class);
}
在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient
import org.apache.hadoop.mapred.JobClient;
public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
jobClient = createJobClient(context, launcherJobConf);
LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML);
// Set the launcher Main Class
LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
......
runningJob = jobClient.submitJob(launcherJobConf); // 這里進行了提交
}
綜上所述,舊版本 LauncherMapper 實現了一個 import org.apache.hadoop.mapred.Mapper;,具體是org.apache.hadoop.mapred.JobClient 負責與hadoop交互。
0x3 新版本Yarn Application Master
新版本的Oozie是和Yarn深度綁定的,所以我們需要先介紹Yarn。
3. 1 YARN簡介
YARN 是 Hadoop 2.0 中的資源管理系統,它的基本設計思想是將 MRv1 中的 JobTracker拆分成了兩個獨立的服務:一個全局的資源管理器 ResourceManager 和每個應用程序特有的ApplicationMaster。 其中 ResourceManager 負責整個系統的資源管理和分配, 而 ApplicationMaster負責單個應用程序的管理。
YARN 總體上仍然是 Master/Slave 結構,在整個資源管理框架中,ResourceManager 為Master,NodeManager 為 Slave,ResourceManager 負責對各個 NodeManager 上的資源進行統一管理和調度。
當用戶提交一個應用程序時,需要提供一個用以跟蹤和管理這個程序的ApplicationMaster, 它負責向 ResourceManager 申請資源,並要求 NodeManager 啟動可以占用一定資源的任務。 由於不同的ApplicationMaster 被分布到不同的節點上,因此它們之間不會相互影響。
3.2 ApplicationMaster
用戶提交的每個應用程序均包含一個 AM,主要功能包括:
- 與 RM 調度器協商以獲取資源(用 Container 表示);
- 將得到的任務進一步分配給內部的任務;
- 與 NM 通信以啟動 / 停止任務;
- 監控所有任務運行狀態,並在任務運行失敗時重新為任務申請資源以重啟任務。
當用戶向 YARN 中提交一個應用程序后, YARN 將分兩個階段運行該應用程序 :
- 第一個階段是啟動 ApplicationMaster ;
- 第二個階段是由 ApplicationMaster 創建應用程序, 為它申請資源, 並監控它的整個運行過程, 直到運行完成。
工作流程分為以下幾個步驟:
- 用 戶 向 YARN 中 提 交 應 用 程 序, 其 中 包 括 ApplicationMaster 程 序、 啟 動ApplicationMaster 的命令、 用戶程序等。
- ResourceManager 為 該 應 用程 序 分 配 第 一 個 Container, 並 與 對應 的 NodeManager 通信,要求它在這個 Container 中啟動應用程序的 ApplicationMaster。
- ApplicationMaster 首 先 向 ResourceManager 注 冊, 這 樣 用 戶 可 以 直 接 通 過ResourceManage 查看應用程序的運行狀態, 然后它將為各個任務申請資源, 並監控它的運行狀態, 直到運行結束, 即重復步驟 4~7。
- ApplicationMaster 采用輪詢的方式通過 RPC 協議向 ResourceManager 申請和領取資源。
- 一旦 ApplicationMaster 申請到資源后, 便與對應的 NodeManager 通信, 要求它啟動任務。
- NodeManager 為任務設置好運行環境(包括環境變量、 JAR 包、 二進制程序等) 后, 將任務啟動命令寫到一個腳本中, 並通過運行該腳本啟動任務。
- 各個任務通過某個 RPC 協議向 ApplicationMaster 匯報自己的狀態和進度, 以讓 ApplicationMaster 隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。在應用程序運行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應用程序的當前運行狀態。
- 應用程序運行完成后,ApplicationMaster 向 ResourceManager 注銷並關閉自己。
3.3 LauncherAM
LauncherAM就是Oozie的ApplicationMaster實現。LauncherAM.main就是Yarn調用之處。
public class LauncherAM {
public static void main(String[] args) throws Exception {
final LocalFsOperations localFsOperations = new LocalFsOperations();
final Configuration launcherConf = readLauncherConfiguration(localFsOperations);
UserGroupInformation.setConfiguration(launcherConf);
// MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+
// SecurityUtil.setConfiguration(launcherConf);
UserGroupInformation ugi = getUserGroupInformation(launcherConf);
// Executing code inside a doAs with an ugi equipped with correct tokens.
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(),
new AMRMCallBackHandler(),
new HdfsOperations(new SequenceFileWriterFactory()),
new LocalFsOperations(),
new PrepareActionsHandler(new LauncherURIHandlerFactory(null)),
new LauncherAMCallbackNotifierFactory(),
new LauncherSecurityManager(),
sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()),
launcherConf);
launcher.run();
return null;
}
});
}
}
launcher.run主要完成
通過registerWithRM調用AMRMClientAsync來注冊到Resource Manager
- executePrepare / setupMainConfiguration 完成初始化,准備和配置
- runActionMain會根據配置調用具體的main函數,比如HiveMain
- Class<?> klass = launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
- Method mainMethod = klass.getMethod("main", String[].class);
- mainMethod.invoke(null, (Object) mainArgs);
- 調用uploadActionDataToHDFS同步HDFS
- 調用unregisterWithRM從RM解綁
- 調用LauncherAMCallbackNotifier.notifyURL通知Oozie
具體代碼如下:
public void run() throws Exception {
try {
actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH));
registerWithRM(amrmCallBackHandler);
// Run user code without the AM_RM_TOKEN so users can't request containers
UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
executePrepare(errorHolder);
setupMainConfiguration();
runActionMain(errorHolder); // 會根據配置調用具體的main函數,比如HiveMain
return null;
}
});
}
finally {
try {
actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString());
hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData);
} finally {
try {
unregisterWithRM(actionResult, errorHolder.getErrorMessage());
} finally {
LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
cn.notifyURL(actionResult);
}
}
}
}
但是你會發現,對比之前所說的ApplicationMaster應該實現的功能,LauncherAM 做得恁少了點,這是個疑問! 我們在后續研究中會為大家揭開這個秘密。
0x4 Hive on Yarn
上文提到,runActionMain會根據配置調用具體的main函數。我們假設是hive action,則對應的是HiveMain。
Hive job的入口函數是在HIVE_MAIN_CLASS_NAME配置的。
public class HiveActionExecutor extends ScriptLanguageActionExecutor {
private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";
@Override
public List<Class<?>> getLauncherClasses() {
List<Class<?>> classes = new ArrayList<Class<?>>();
classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 這里配置了 HiveMain
return classes;
}
}
HiveMain后續調用如下
HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);
最后調用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操作,大致有:
- 設定參數;
- 如果有腳本,則設定腳本路徑;
- 如果有之前的yarn child jobs,殺掉;
- 執行hive;
- 寫log;
具體如下:
public class HiveMain extends LauncherMain {
public static void main(String[] args) throws Exception {
run(HiveMain.class, args);
}
@Override
protected void run(String[] args) throws Exception {
Configuration hiveConf = setUpHiveSite();
List<String> arguments = new ArrayList<String>();
String logFile = setUpHiveLog4J(hiveConf);
arguments.add("--hiveconf");
arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath());
arguments.add("--hiveconf");
arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath());
//setting oozie workflow id as caller context id for hive
String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID);
arguments.add("--hiveconf");
arguments.add("hive.log.trace.id=" + callerId);
String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT);
String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY);
if (scriptPath != null) {
......
// print out current directory & its contents
File localDir = new File("dummy").getAbsoluteFile().getParentFile();
String[] files = localDir.list();
// Prepare the Hive Script
String script = readStringFromFile(scriptPath);
arguments.add("-f");
arguments.add(scriptPath);
} else if (query != null) {
String filename = createScriptFile(query);
arguments.add("-f");
arguments.add(filename);
}
// Pass any parameters to Hive via arguments
......
String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
for (String hiveArg : hiveArgs) {
arguments.add(hiveArg);
}
LauncherMain.killChildYarnJobs(hiveConf);
try {
runHive(arguments.toArray(new String[arguments.size()]));
}
finally {
writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive");
}
}
}
因此我們能看到,Oozie ApplicationMaster 在被Yarn調用之后,就是通過org.apache.hadoop.hive.cli.CliDriver 給Hive發送命令讓其執行,沒有什么再和ResourceManager / NodeManager 交互的過程,這真的很奇怪。這個秘密要由下面的Tez來解答。
0x5 Tez計算框架
Tez是Apache開源的支持DAG作業的計算框架,它直接源於MapReduce框架,核心思想是將Map和Reduce兩個操作進一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,這樣,這些分解后的元操作可以任意靈活組合,產生新的操作,這些操作經過一些控制程序組裝后,可形成一個大的DAG作業。
Tez有以下特點:
- Apache二級開源項目
- 運行在YARN之上
- 適用於DAG(有向圖)應用(同Impala、Dremel和Drill一樣,可用於替換Hive/Pig等)
可以看到,Tez也是和Yarn深度綁定的。
5.1 DAGAppMaster
首先我們就找到了Tez對應的Application Master,即Tez DAG Application Master。
public class DAGAppMaster extends AbstractService {
public String submitDAGToAppMaster(DAGPlan dagPlan,
Map<String, LocalResource> additionalResources) throws TezException {
startDAG(dagPlan, additionalResources);
}
}
}
我們能看到提交Application Master代碼。
public class TezYarnClient extends FrameworkClient {
@Override
public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
throws YarnException, IOException, TezException {
ApplicationId appId= yarnClient.submitApplication(appSubmissionContext);
ApplicationReport appReport = getApplicationReport(appId);
return appId;
}
}
這里是建立Application Master context 代碼,設置了Application Maste類和Container。
public static ApplicationSubmissionContext createApplicationSubmissionContext(
ApplicationId appId, DAG dag, String amName,
AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
Credentials sessionCreds, boolean tezLrsAsArchive,
TezApiVersionInfo apiVersionInfo,
ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker)
throws IOException, YarnException {
// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability);
vargs.add(amOpts);
// 這里設置了 Application Master
vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS);
// 這里設置了命令行參數
Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());
// 設置了container
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(amLocalResources, environment,
vargsFinal, serviceData, securityTokens, acls);
// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
appContext.setAMContainerSpec(amContainer);
return appContext;
}
5.2 與Resource Manager交互
這里只摘要部分代碼,能看到Tez實現了與Yarn Resource Manager交互。
YarnTaskSchedulerService實現了AMRMClientAsync.CallbackHandler,其功能是處理由Resource Manager收到的消息,其實現了方法
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
public class YarnTaskSchedulerService extends TaskScheduler
implements AMRMClientAsync.CallbackHandler {
@Override
public void onContainersAllocated(List<Container> containers) {
if (!shouldReuseContainers) {
List<Container> modifiableContainerList = Lists.newLinkedList(containers);
assignedContainers = assignNewlyAllocatedContainers(
modifiableContainerList);
}
}
// upcall to app must be outside locks
informAppAboutAssignments(assignedContainers);
}
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
synchronized (this) {
for(ContainerStatus containerStatus : statuses) {
ContainerId completedId = containerStatus.getContainerId();
HeldContainer delayedContainer = heldContainers.get(completedId);
Object task = releasedContainers.remove(completedId);
appContainerStatus.put(task, containerStatus);
continue;
}
// not found in released containers. check currently allocated containers
// no need to release this container as the RM has already completed it
task = unAssignContainer(completedId, false);
if (delayedContainer != null) {
heldContainers.remove(completedId);
Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
}
if(task != null) {
// completion of a container we have allocated currently
// an allocated container completed. notify app. This will cause attempt to get killed
appContainerStatus.put(task, containerStatus);
continue;
}
}
}
// upcall to app must be outside locks
for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
getContext().containerCompleted(entry.getKey(), entry.getValue());
}
}
}
- onContainersAllocated : 當有新的Container 可以使用。這里時啟動container 的代碼。
- onContainersCompleted 是Container 運行結束。 在onContainersCompleted 中,如果是失敗的Container,我們需要重新申請並啟動Container,成功的將做記錄既可以。
由此我們可以看到,Oozie是一個甩手掌櫃,他只管啟動Hive,具體后續如何與RM交互,則完全由Tez搞定。這就解答了之前我們所有疑惑。
最后總結下新流程:
- Oozie提交LauncherAM到Yarn;
- LauncherAM運行HiveMain,其調用CliDriver.main給Hive提交任務;
- Hive on Tez,所以Tez准備DAGAppMaster;
- Yarn與Tez交互:Tez提交DAGAppMaster到Yarn,Tez解析運行Hive命令;
- Hive運行結束后,調用回調 url 通知Oozie;
原諒我用這種辦法畫圖,因為我最討厭看到一篇好文,結果發現圖沒了......
+---------+ +----------+ +-----------+
| | 1-submit LauncherAM | | 2.CliDriver.main | |
| |---------------------->| HiveMain |---------------------> | |
| | | | | |--+
| [Oozie] | | [Yarn] | | [Hive] | | 3.Run
| | | | | | | Hive
| | 5-notifyURL of Oozie | | 4-submit DAGAppMaster | |<-+
| |<----------------------| | <-------------------->| Tez |
| | | | | |
+---------+ +----------+ +-----------+
0x6 Java on Yarn
下面我們看看如果Oozie執行一個Java程序,是如何進行的。
Java程序的主執行函數是 JavaMain,這個就簡單多了,就是直接調用用戶的Java主函數。
public class JavaMain extends LauncherMain {
public static final String JAVA_MAIN_CLASS = "oozie.action.java.main";
/**
* @param args Invoked from LauncherAM:run()
* @throws Exception in case of error when running the application
*/
public static void main(String[] args) throws Exception {
run(JavaMain.class, args);
}
@Override
protected void run(String[] args) throws Exception {
Configuration actionConf = loadActionConf();
setYarnTag(actionConf);
setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
setApplicationTags(actionConf, SPARK_YARN_TAGS);
LauncherMain.killChildYarnJobs(actionConf);
Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
Method mainMethod = klass.getMethod("main", String[].class);
mainMethod.invoke(null, (Object) args);
}
}
0x7 Yarn job 執行結束
7.1 檢查任務機制
前面提到,ActionExecutor.start是異步的,還需要檢查Action執行狀態來推進流程,oozie通過兩種方式來檢查任務是否完成。
- 回調:當一個任務和一個計算被啟動后,會為任務提供一個回調url,該任務執行完成后,會執行回調來通知oozie
- 輪詢:在任務執行回調失敗的情況下,無論任何原因,都支持以輪詢的方式進行查詢。
oozie提供這兩種方式來控制任務。
7.2 回調機制
LauncherAM 在用戶程序執行完成之后,會做如下調用,以通知Oozie。這就用到了“回調”機制。
LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
cn.notifyURL(actionResult);
Oozie的CallbackServlet會響應這個調用。可以看到,DagEngine.processCallback是Oozie處理程序結束之處。
public class CallbackServlet extends JsonRestServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String queryString = request.getQueryString();
CallbackService callbackService = Services.get().get(CallbackService.class);
String actionId = callbackService.getActionId(queryString);
DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();
dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null);
}
}
}
DagEngine.processCallback主要是使用CompletedActionXCommand來進行。可以看到這個命令是放到 CallableQueueService 的 queue中,所以下面我們需要介紹 CallableQueueService。
public void processCallback(String actionId, String externalStatus, Properties actionData)
throws DagEngineException {
XCallable<Void> command = new CompletedActionXCommand(actionId, externalStatus,
actionData, HIGH_PRIORITY);
if (!Services.get().get(CallableQueueService.class).queue(command)) {
LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
}
}
7.3 異步執行
7.3.1 CallableQueueService
Oozie 使用 CallableQueueService 來異步執行操作;
public class CallableQueueService implements Service, Instrumentable {
private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>();
private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
private Set<String> interruptTypes;
private int interruptMapMaxSize;
private int maxCallableConcurrency;
private int queueAwaitTerminationTimeoutSeconds;
private int queueSize;
private PriorityDelayQueue<CallableWrapper<?>> queue;
private ThreadPoolExecutor executor;
private Instrumentation instrumentation;
private boolean newImpl = false;
private AsyncXCommandExecutor asyncXCommandExecutor;
public void init(Services services) {
queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,
MAX_CALLABLE_WAITTIME_MS,
TimeUnit.MILLISECONDS,
queueSize) {
@Override
protected boolean eligibleToPoll(QueueElement<?> element) {
if (element != null) {
CallableWrapper wrapper = (CallableWrapper) element;
if (element.getElement() != null) {
return callableReachMaxConcurrency(wrapper.getElement());
}
}
return false;
}
};
}
}
特點:
- 加入執行隊列的任務可能是可以立即被吊起的,也可能是未來某個時間才觸發的。
- 執行線程池根據 任務的執行時間和任務的優先級別來選取任務吊起。
- 執行線程池的任務隊列大小可配置,當到達隊列最大值,線程池將不再接收任務。
7.3.3 PriorityDelayQueue
線程池選取的隊列是oozie自定義的隊列 PriorityDelayQueue:
特點:
根據隊列中元素的延時時間以及其執行優先級出隊列:
實現策略:
PriorityDelayQueue 中為每個優先級別的任務設置一個 延時隊列 DelayQueue
因為使用的是jdk自帶的延時隊列 DelayQueue,可以保證的是如果任務在該隊列中的延時時間滿足條件,我們
通過poll()方法即可得到滿足延時條件的任務,如果 poll()得到的是null,說明該隊列的中任務沒有滿足時間條件的任務。
如何編排多個優先級的隊列:
每次從PriorityDelayQueue去選取任務,都優先從最高優先級的隊列來poll出任務,如果最高的優先級隊列中沒有滿足條件的任務,則次優先級隊列poll出任務,如果仍未獲取
將按照隊列優先等級以此類推。
餓死現象:假如高優先級中的任務在每次獲取的時候都滿足條件,這樣容易將低優先級的隊列中滿足條件的任務活活餓死,為了防止這種情況的產生,在每次選取任務之前,遍歷
低優先級隊列任務,如果任務早已經滿足出隊列條件,如果超時時間超過了我們設定的最大值,我們會為這個任務提高優先級,將這個任務優先級加一,添加到上個優先級隊列中進行
排隊。
7.3.3 PollablePriorityDelayQueue
特點:
在從隊列中選取任務的時候,先判斷滿足時間的任務是否滿足並發等限制,如果滿足再從隊列中取出,而不是像PriorityDelayQueue那樣,先取出如果不滿足並發等限制,再將該任務重新放置回去。
任務類型:
使用線程池異步執行任務,任務和任務之間是無序的,針對具體的業務場景,可能執行的單元是需要串序執行的。oozie中封裝了 CompositeCallable 和 一般的 XCallable的任務類型,前者是XCallable的一個集合,它能保證的是這個集合里面的XCallable是順序執行的。
7.4 跳轉下一個操作
CompletedActionXCommand 當Workflow command結束時候會執行,且只執行一次。對於程序結束,會在異步隊列中加入一個 ActionCheckXCommand。
public class CompletedActionXCommand extends WorkflowXCommand<Void> {
@Override
protected Void execute() throws CommandException {
if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) {
.....
} else { // RUNNING
ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType());
// this is done because oozie notifications (of sub-wfs) is send
// every status change, not only on completion.
if (executor.isCompleted(externalStatus)) {
queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1));
}
}
return null;
}
}
異步調用到ActionCheckXCommand,其主要作用是:
- 如果有重試機制,則做相應配置
- 調用 executor.check(context, wfAction); 來檢查環境信息
- 更新數據庫中的任務信息
- 因為已經結束了,所以用ActionEndXCommand來執行結束
public class ActionCheckXCommand extends ActionXCommand<Void> {
@Override
protected Void execute() throws CommandException {
ActionExecutorContext context = null;
boolean execSynchronous = false;
try {
boolean isRetry = false; // 如果有重試機制,則做相應配置
if (wfAction.getRetries() > 0) {
isRetry = true;
}
boolean isUserRetry = false;
context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
executor.check(context, wfAction); // 檢查環境信息
if (wfAction.isExecutionComplete()) {
if (!context.isExecuted()) {
failJob(context);
generateEvent = true;
} else {
wfAction.setPending();
execSynchronous = true;
}
}
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction));
updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
wfJob));
}
finally {
// 更新數據庫中的任務信息
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
if (generateEvent && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
if (execSynchronous) {
// 用ActionEndXCommand來執行結束
new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
}
}
return null;
}
}
調用到 JavaActionExecutor.check
- 根據配置信息建立 yarnClient = createYarnClient(context, jobConf);
- 獲取程序報告信息 ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
- 獲取程序數據 Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
- 設置各種信息
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
boolean fallback = false;
YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
Configuration jobConf = createBaseHadoopConf(context, actionXml);
FileSystem actionFs = context.getAppFileSystem();
yarnClient = createYarnClient(context, jobConf); // 根據配置信息建立
FinalApplicationStatus appStatus = null;
try {
final String effectiveApplicationId = findYarnApplicationId(context, action);
final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId);
final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 獲取程序報告信息
final YarnApplicationState appState = appReport.getYarnApplicationState();
if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
|| appState == YarnApplicationState.KILLED) {
appStatus = appReport.getFinalApplicationStatus();
}
}
if (appStatus != null || fallback) {
Path actionDir = context.getActionDir();
// load sequence file into object
Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); // 獲取程序數據
if (fallback) {
String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
if (finalStatus != null) {
appStatus = FinalApplicationStatus.valueOf(finalStatus);
} else {
context.setExecutionData(FAILED, null);
}
}
String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched
if (externalID != null) {
context.setExternalChildIDs(externalID);
}
// Multiple child IDs - Pig or Hive action
String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
if (externalIDs != null) {
context.setExternalChildIDs(externalIDs);
}
// 設置各種信息
context.setExecutionData(appStatus.toString(), null);
if (appStatus == FinalApplicationStatus.SUCCEEDED) {
if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
.get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
}
else {
context.setExecutionData(SUCCEEDED, null);
}
if (LauncherHelper.hasStatsData(actionData)) {
context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
}
getActionData(actionFs, action, context);
}
else {
......
context.setExecutionData(FAILED_KILLED, null);
}
}
}
finally {
if (yarnClient != null) {
IOUtils.closeQuietly(yarnClient);
}
}
}
ActionEndXCommand會進行結束和跳轉:
- 調用Executor來完成結束操作 executor.end(context, wfAction);
- 更新數據庫的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete
- 用 SignalXCommand 來進行跳轉,進行下一個Action的執行
public class ActionEndXCommand extends ActionXCommand<Void> {
@Override
protected Void execute() throws CommandException {
Configuration conf = wfJob.getWorkflowInstance().getConf();
if (!(executor instanceof ControlNodeActionExecutor)) {
maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
}
executor.setMaxRetries(maxRetries);
executor.setRetryInterval(retryInterval);
boolean isRetry = false;
if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
|| wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
isRetry = true;
}
boolean isUserRetry = false;
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
try {
executor.end(context, wfAction); // 調用Executor來完成結束操作
if (!context.isEnded()) {
failJob(context);
} else {
wfAction.setRetries(0);
wfAction.setEndTime(new Date());
boolean shouldHandleUserRetry = false;
Status slaStatus = null;
switch (wfAction.getStatus()) {
case OK:
slaStatus = Status.SUCCEEDED;
break;
......
}
if (!shouldHandleUserRetry || !handleUserRetry(context, wfAction)) {
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus,
SlaAppType.WORKFLOW_ACTION);
if(slaEvent != null) {
insertList.add(slaEvent);
}
}
}
WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
DagELFunctions.setActionInfo(wfInstance, wfAction);
wfJob.setWorkflowInstance(wfInstance);
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
wfJob.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
}
finally {
try {
// 更新數據庫的job信息
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
}
if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
generateEvent(wfAction, wfJob.getUser());
}
new SignalXCommand(jobId, actionId).call(); // 進行跳轉,進行下一個Action的執行
}
return null;
}
}
0xFF 參考
【原創】大叔經驗分享(6)Oozie如何查看提交到Yarn上的任務日志
