1.概覽
以下主要敘述Hadoop如何將用戶寫好的MR程序,以Job的形式提交
主要涉及的四個java類文件:
hadoop-mapreduce-client-core下的包org.apache.hadoop.mapreduce:
Job.java、JobSubmitter.java
hadoop-mapreduce-client-jobclient下的包org.apache.hadoop.mapred:
YARNRunner.java、ResourceMgrDelegate.java
2.代碼分析與執行邏輯過程
1).客戶運行寫好類下下面的程序,這里省去map和reduce的函數的實現:
Job job = new Job(new Configuration()); job.setJarByClass(MyJob.class); // Specify various job-specific parameters job.setJobName("myjob"); job.setInputPath(new Path("in")); job.setOutputPath(new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
2).客戶提交的客戶程序調用了Job中的waitForCompletion()函數
/** * Submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost */ public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
Job如果已經初始化好,立即調用submit()函數,之后調用monitorAndPrintJob()檢查Job和Task的運行狀況,或者自身進入循環,以一定的時間間隔輪詢檢查所提交的Job是是否執行完成。如果執行完成,跳出循環,調用isSuccessful()函數返回執行后的狀態。
2).waitForCompletion()函數調用submit(),進入submit()函數
/** * Submit the job to the cluster and return immediately. * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
submit函數主要先調用connect()來獲取需的調用協議(ClientProtocol)信息,連接信息,最后寫入Cluster對象中,之后調用JobSubmitter類下的submitJobInternal()函數,獲取其返回的狀態設置JobStatus為Running,最后直接退出。
3).進入JobSubmitter類下的submitJobInternal()函數
/** * Internal method for submitting jobs to the system. */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
Submit主要進行如下操作
- 檢查Job的輸入輸出是各項參數,獲取配置信息和遠程主機的地址,生成JobID,確定所需工作目錄(也是MRAppMaster.java所在目錄),執行期間設置必要的信息
- 拷貝所需要的Jar文件和配置文件信息到HDFS系統上的指定工作目錄,以便各個節點調用使用
- 計算並獲數去輸入分片(Input Split)的數目,以確定map的個數
- 調用YARNRunner類下的submitJob()函數,提交Job,傳出相應的所需參數(例如 JobID等)。
- 等待submit()執行返回Job執行狀態,最后刪除相應的工作目錄。
4).YARNRunner類下的submitJob()函數
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { /* check if we have a hsproxy, if not, no need */ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); if (hsProxy != null) { // JobClient will set this flag if getDelegationToken is called, if so, get // the delegation tokens for the HistoryServer also. if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER))); ts.addToken(hsDT.getService(), hsDT); } } // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); try { ts.writeTokenStorageFile(applicationTokensFile, conf); } catch (IOException e) { throw new YarnException(e); } // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); }
- 設置必要的配置信息,初始化Application上下文信息,其中上下文信息中包括MRAppMaster所需要的資源,執行MRAppMaster的命令得等。
- 然后調用ResourceMgrDelegate的submitApplication()方法,同時傳入Application上下文信息,提交Job到ResourceManager,函數執行最后返回已生成的ApplicationId(實際生成JobID的時候ApplicationId就已經生成)。
- 最后返回Job此時的狀態,函數退出。
5).ResourceMgrDelegate類下的submitApplication()函數
public ApplicationId submitApplication( ApplicationSubmissionContext appContext) throws IOException { appContext.setApplicationId(applicationId); SubmitApplicationRequest request = recordFactory.newRecordInstance(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); applicationsManager.submitApplication(request); LOG.info("Submitted application " + applicationId + " to ResourceManager" + " at " + rmAddress); return applicationId; }
這個函數很簡單
- 設置Application上下文中的ApplicationId,
- 將Application上下文信息設置到要請求的request信息當中去
- 最后用Hadoop RPC遠程調用ResourcesManager端的ClientRMService類下的submitApplication()方法,提交已經設置好的包含有Application上下文信息請求信息到ResourcesManager端。