一切從示例程序開始:
示例程序
Hadoop2.7 提供的示例程序WordCount.java
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { //繼承泛型類Mapper public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //定義hadoop數據類型IntWritable實例one,並且賦值為1 private final static IntWritable one = new IntWritable(1); //定義hadoop數據類型Text實例word private Text word = new Text(); //實現map函數 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //Java的字符串分解類,默認分隔符“空格”、“制表符(‘\t’)”、“換行符(‘\n’)”、“回車符(‘\r’)” StringTokenizer itr = new StringTokenizer(value.toString()); //循環條件表示返回是否還有分隔符。 while (itr.hasMoreTokens()) { /* nextToken():返回從當前位置到下一個分隔符的字符串 word.set()Java數據類型與hadoop數據類型轉換 */ word.set(itr.nextToken()); //hadoop全局類context輸出函數write; context.write(word, one); } } } //繼承泛型類Reducer public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //實例化IntWritable private IntWritable result = new IntWritable(); //實現reduce public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; //循環values,並記錄單詞個數 for (IntWritable val : values) { sum += val.get(); } //Java數據類型sum,轉換為hadoop數據類型result result.set(sum); //輸出結果到hdfs context.write(key, result); } } public static void main(String[] args) throws Exception { //實例化Configuration Configuration conf = new Configuration(); /* GenericOptionsParser是hadoop框架中解析命令行參數的基本類。 getRemainingArgs();返回數組【一組路徑】 */ /* 函數實現 public String[] getRemainingArgs() { return (commandLine == null) ? new String[]{} : commandLine.getArgs(); }*/ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //如果只有一個路徑,則輸出需要有輸入路徑和輸出路徑 if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } //實例化job Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); /* 指定CombinerClass類 這里很多人對CombinerClass不理解 */ job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); //rduce輸出Key的類型,是Text job.setOutputKeyClass(Text.class); // rduce輸出Value的類型 job.setOutputValueClass(IntWritable.class); //添加輸入路徑 for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //添加輸出路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); //提交job System.exit(job.waitForCompletion(true) ? 0 : 1); } }
1.Mapper
將輸入的鍵值對映射到一組中間的鍵值對。
映射將獨立的任務的輸入記錄轉換成中間的記錄。裝好的中間記錄不需要和輸入記錄保持同一種類型。一個給定的輸入對可以映射成0個或者多個輸出對。
Hadoop Map-Reduce框架為每個job產生的輸入格式(InputFormat)的InputSplit產生一個映射task。Mapper實現類通過JobConfigurable#configure(JobConf)獲取job的JobConf,並初始化自己。類似的,它們使用Closeable#close()方法消耗初始化。
然后,框架為該任務的InputSplit中的每個鍵值對調用map(Object, Object, OutputCollector, Reporter)方法。
所有關聯到給定輸出的中間值隨后由框架分組,並傳到Reducer來確定最終的輸出。用戶可通過指定一個比較器Compator來控制分組,Compator的指定通過JobConf#setOutputKeyComparatorClass(Class)完成。
分組的Mapper輸出每個Reducer一個分區。用戶可以通過實現自定義的分區來控制哪些鍵(和記錄)到哪個Reducer。
用戶可以選擇指定一個Combiner,通過JobConf#setCombinerClass(Class),來執行本地中間輸出的聚合,它可以幫助減少數據從Mapper到Reducer數據轉換的數量。
中間、分組的輸出保存在SequeceFile文件中,應用可以指定中間輸出是否和怎么樣壓縮,壓縮算法可以通過JobConf來設置CompressionCodec。
若job沒有reducer,Mapper的輸出直接寫到FileSystem,而不會根據鍵分組。
示例:
public class MyMapper<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements Mapper<K, V, K, V> {
static enum MyCounters { NUM_RECORDS } private String mapTaskId; private String inputFile; private int noRecords = 0; public void configure(JobConf job) { mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); inputFile = job.get(JobContext.MAP_INPUT_FILE); } public void map(K key, V val, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Process the <key, value> pair (assume this takes a while) // ... // ... // Let the framework know that we are alive, and kicking! // reporter.progress(); // Process some more // ... // ... // Increment the no. of <key, value> pairs processed ++noRecords; // Increment counters reporter.incrCounter(NUM_RECORDS, 1); // Every 100 records update application-level status if ((noRecords%100) == 0) { reporter.setStatus(mapTaskId + " processed " + noRecords + " from input-file: " + inputFile); } // Output the result output.collect(key, val); } }
上述應用自定義一個MapRunnable來對map處理過程進行更多的控制:如多線程Mapper等等。
或者示例:
public class TokenCounterMapper
extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
應用可以重新(org.apache.hadoop.mapreduce.Mapper.Context)的run方法來來對映射處理進行更精確的控制,例如多線程的Mapper等等。
Mapper的方法:
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
該方法將一個單獨的鍵值對輸入映射成一個中間鍵值對。
輸出鍵值對不需要和輸入鍵值對的類型保持一致,一個給定的數據鍵值對可以映射到0個或者多個輸出鍵值對。輸出鍵值對可以通過OutputCollector#collect(Object,Object)獲得的。
應用可以使用Reporter提供處理報告或者僅僅是標示它們的存活。在一個應用需要相當多的時間來處理單獨的鍵值對的場景中,Report就非常重要了,因為框架可能認為task已經超期,並殺死那個task。避免這種情況的辦法是設置mapreduce.task.timeout到一個足夠大的值(或者設置為0表示永遠不會超時)。
mapper的層次結構:
2.Reducer
將一組共享一個鍵的中間值減少到一小組值。
用戶通過JobConf#setNumReducerTask(int)方法來設置job的Reducer的數目。Reducer的實現類通過JobConfigurable#configure(JobConf)方法來獲取job,並初始化它們。類似的,可通過Closeable#close()方法來消耗初始化。
Reducer有是3個主要階段:
第一階段:洗牌,Reducer的輸入是Mapper的分組輸出。在這個階段,每個Reducer通過http獲取所有Mapper的相關分區的輸出。
第二階段:排序,在這個階段,框架根據鍵(因不同的Mapper可能產生相同的Key)將Reducer進行分組。洗牌和排序階段是同步發生的,例如:當取出輸出時,將合並它們。
二次排序,若分組中間值等價的鍵規則和reduce之前鍵分組的規則不同時,那么其中之一可以通過JobConf#setOutputValueGroupingComparator(Class)來指定一個Comparator。
JobConf#setOutputKeyComparatorClass(Class)可以用來控制中間鍵分組,可以用在模擬二次排序的值連接中。
示例:若你想找出重復的web網頁,並將他們全部標記為“最佳”網址的示例。你可以這樣創建job:
Map輸入的鍵:url
Map輸入的值:document
Map輸出的鍵:document checksum,url pagerank
Map輸出的值:url
分區:通過checksum
輸出鍵比較器:通過checksum,然后是pagerank降序。
輸出值分組比較器:通過checksum
Reduce
在此階段,為在分組書中的每個<key,value數組>對調用reduce(Object, Iterator, OutputCollector, Reporter)方法。
reduce task的輸出通常寫到寫到文件系統中,方法是:OutputCollector#collect(Object, Object)。
Reducer的輸出結果沒有重新排序。
示例:
public class MyReducer<K extends WritableComparable, V extends Writable> extends MapReduceBase implements Reducer<K, V, K, V> { static enum MyCounters { NUM_RECORDS } private String reduceTaskId; private int noKeys = 0; public void configure(JobConf job) { reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID); } public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Process int noValues = 0; while (values.hasNext()) { V value = values.next(); // Increment the no. of values for this key ++noValues; // Process the <key, value> pair (assume this takes a while) // ... // ... // Let the framework know that we are alive, and kicking! if ((noValues%10) == 0) { reporter.progress(); } // Process some more // ... // ... // Output the <key, value> output.collect(key, value); } // Increment the no. of <key, list of values> pairs processed ++noKeys; // Increment counters reporter.incrCounter(NUM_RECORDS, 1); // Every 100 keys update application-level status if ((noKeys%100) == 0) { reporter.setStatus(reduceTaskId + " processed " + noKeys); } } }
下圖來源:http://x-rip.iteye.com/blog/1541914
3. Job
3.1 上述示例程序最關鍵的一句:job.waitForCompletion(true)
/** * Submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost */ public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
3.2 提交的過程
/** * Submit the job to the cluster and return immediately. * @throws IOException */ public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
連接過程:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
其中,
ugi定義在JobContextImpl.java中:
/**
* The UserGroupInformation object that has a reference to the current user
*/
protected UserGroupInformation ugi;
Cluster類提供了一個訪問map/reduce集群的接口:
public static enum JobTrackerStatus {INITIALIZING, RUNNING}; private ClientProtocolProvider clientProtocolProvider; private ClientProtocol client; private UserGroupInformation ugi; private Configuration conf; private FileSystem fs = null; private Path sysDir = null; private Path stagingAreaDir = null; private Path jobHistoryDir = null;
4. JobSubmitter
/** * Internal method for submitting jobs to the system. * * <p>The job submission process involves: * <ol> * <li> * Checking the input and output specifications of the job. * </li> * <li> * Computing the {@link InputSplit}s for the job. * </li> * <li> * Setup the requisite accounting information for the * {@link DistributedCache} of the job, if necessary. * </li> * <li> * Copying the job's jar and configuration to the map-reduce system * directory on the distributed file-system. * </li> * <li> * Submitting the job to the <code>JobTracker</code> and optionally * monitoring it's status. * </li> * </ol></p> * @param job the configuration to submit * @param cluster the handle to the Cluster * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { int keyLen = CryptoUtils.isShuffleEncrypted(conf) ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
上面所說,job的提交有如下過程:
1. 檢查job的輸入/輸出規范
2. 計算job的InputSplit
3. 如需要,計算job的DistributedCache所需要的前置計算信息
4. 復制job的jar和配置文件到分布式文件系統的map-reduce系統目錄
5. 提交job到JobTracker,還可以監視job的執行狀態。
若當前JobClient (0.22 hadoop) 運行在YARN.則job提交任務運行在YARNRunner
Hadoop Yarn 框架原理及運作機制
主要步驟
- 作業提交
- 作業初始化
- 資源申請與任務分配
- 任務執行
具體步驟
在運行作業之前,Resource Manager和Node Manager都已經啟動,所以在上圖中,Resource Manager進程和Node Manager進程不需要啟動
- 1. 客戶端進程通過runJob(實際中一般使用waitForCompletion提交作業)在客戶端提交Map Reduce作業(在Yarn中,作業一般稱為Application應用程序)
- 2. 客戶端向Resource Manager申請應用程序ID(application id),作為本次作業的唯一標識
- 3. 客戶端程序將作業相關的文件(通常是指作業本身的jar包以及這個jar包依賴的第三方的jar),保存到HDFS上。也就是說Yarn based MR通過HDFS共享程序的jar包,供Task進程讀取
- 4. 客戶端通過runJob向ResourceManager提交應用程序
- 5.a/5.b. Resource Manager收到來自客戶端的提交作業請求后,將請求轉發給作業調度組件(Scheduler),Scheduler分配一個Container,然后Resource Manager在這個Container中啟動Application Master進程,並交由Node Manager對Application Master進程進行管理
- 6. Application Master初始化作業(應用程序),初始化動作包括創建監聽對象以監聽作業的執行情況,包括監聽任務匯報的任務執行進度以及是否完成(不同的計算框架為集成到YARN資源調度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架為了運行在Yarn之上,它們都提供了ApplicationMaster)
- 7. Application Master根據作業代碼中指定的數據地址(數據源一般來自HDFS)進行數據分片,以確定Mapper任務數,具體每個Mapper任務發往哪個計算節點,Hadoop會考慮數據本地性,本地數據本地性、本機架數據本地性以及最后跨機架數據本地性)。同時還會計算Reduce任務數,Reduce任務數是在程序代碼中指定的,通過job.setNumReduceTask顯式指定的
- 8.如下幾點是Application Master向Resource Manager申請資源的細節
- 8.1 Application Master根據數據分片確定的Mapper任務數以及Reducer任務數向Resource Manager申請計算資源(計算資源主要指的是內存和CPU,在Hadoop Yarn中,使用Container這個概念來描述計算單位,即計算資源是以Container為單位的,一個Container包含一定數量的內存和CPU內核數)。
- 8.2 Application Master是通過向Resource Manager發送Heart Beat心跳包進行資源申請的,申請時,請求中還會攜帶任務的數據本地性等信息,使得Resource Manager在分配資源時,不同的Task能夠分配到的計算資源盡可能滿足數據本地性
- 8.3 Application Master向Resource Manager資源申請時,還會攜帶內存數量信息,默認情況下,Map任務和Reduce任務都會分陪1G內存,這個值是可以通過參數mapreduce.map.memory.mb and mapreduce.reduce.memory.mb進行修改。
5. YARNRunner
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
調用YarnClient的submitApplication()方法,其實現如下:
6. YarnClientImpl
@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet<YarnApplicationState> waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet<YarnApplicationState> failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }
7. ClientRMService
ClientRMService是resource manager的客戶端接口。這個模塊處理從客戶端到resource mananger的rpc接口。
@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.", ie); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, ie.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw RPCUtil.getRemoteException(ie); } // Check whether app has already been put into rmContext, // If it is, simply return the response if (rmContext.getRMApps().get(applicationId) != null) { LOG.info("This is an earlier submitted application: " + applicationId); return SubmitApplicationResponse.newInstance(); } if (submissionContext.getQueue() == null) { submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } if (submissionContext.getApplicationName() == null) { submissionContext.setApplicationName( YarnConfiguration.DEFAULT_APPLICATION_NAME); } if (submissionContext.getApplicationType() == null) { submissionContext .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); } else { if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { submissionContext.setApplicationType(submissionContext .getApplicationType().substring(0, YarnConfiguration.APPLICATION_TYPE_LENGTH)); } } try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); } catch (YarnException e) { LOG.info("Exception in submitting application with id " + applicationId.getId(), e); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw e; } SubmitApplicationResponse response = recordFactory .newRecordInstance(SubmitApplicationResponse.class); return response; }
調用RMAppManager來直接提交application
@SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user); ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { try { this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we haven't yet informed the // scheduler about the existence of the application assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); throw RPCUtil.getRemoteException(e); } } else { // Dispatcher is not yet started at this time, so these START events // enqueued should be guaranteed to be first processed when dispatcher // gets started. this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START)); } }
8.RMAppManager
@SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user); ApplicationId appId = submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()) { try { this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we haven't yet informed the // scheduler about the existence of the application assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); throw RPCUtil.getRemoteException(e); } } else { // Dispatcher is not yet started at this time, so these START events // enqueued should be guaranteed to be first processed when dispatcher // gets started. this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START)); } }
9. 異步增加Application--DelegationTokenRenewer
/** * Asynchronously add application tokens for renewal. * @param applicationId added application * @param ts tokens * @param shouldCancelAtEnd true if tokens should be canceled when the app is * done else false. * @param user user */ public void addApplicationAsync(ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd, String user) { processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent( applicationId, ts, shouldCancelAtEnd, user)); }
調用如下:
private void processDelegationTokenRenewerEvent( DelegationTokenRenewerEvent evt) { serviceStateLock.readLock().lock(); try { if (isServiceStarted) { renewerService.execute(new DelegationTokenRenewerRunnable(evt)); } else { pendingEventQueue.add(evt); } } finally { serviceStateLock.readLock().unlock(); } }
從上面可以看到,通過鎖形式來讓線程池來處理事件或者放入到事件隊列中中。
新啟一個線程:
@Override public void run() { if (evt instanceof DelegationTokenRenewerAppSubmitEvent) { DelegationTokenRenewerAppSubmitEvent appSubmitEvt = (DelegationTokenRenewerAppSubmitEvent) evt; handleDTRenewerAppSubmitEvent(appSubmitEvt); } else if (evt.getType().equals( DelegationTokenRenewerEventType.FINISH_APPLICATION)) { DelegationTokenRenewer.this.handleAppFinishEvent(evt); } }
@SuppressWarnings("unchecked") private void handleDTRenewerAppSubmitEvent( DelegationTokenRenewerAppSubmitEvent event) { /* * For applications submitted with delegation tokens we are not submitting * the application to scheduler from RMAppManager. Instead we are doing * it from here. The primary goal is to make token renewal as a part of * application submission asynchronous so that client thread is not * blocked during app submission. */ try { // Setup tokens for renewal DelegationTokenRenewer.this.handleAppSubmitEvent(event); rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START)); } catch (Throwable t) { LOG.warn( "Unable to add the application to the delegation token renewer.", t); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we havne't yet informed the // Scheduler about the existence of the application rmContext.getDispatcher().getEventHandler().handle( new RMAppRejectedEvent(event.getApplicationId(), t.getMessage())); } } }
private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) throws IOException, InterruptedException { ApplicationId applicationId = evt.getApplicationId(); Credentials ts = evt.getCredentials(); boolean shouldCancelAtEnd = evt.shouldCancelAtEnd(); if (ts == null) { return; // nothing to add } if (LOG.isDebugEnabled()) { LOG.debug("Registering tokens for renewal for:" + " appId = " + applicationId); } Collection<Token<?>> tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); // find tokens for renewal, but don't add timers until we know // all renewable tokens are valid // At RM restart it is safe to assume that all the previously added tokens // are valid appTokens.put(applicationId, Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>())); Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>(); boolean hasHdfsToken = false; for (Token<?> token : tokens) { if (token.isManaged()) { if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { LOG.info(applicationId + " found existing hdfs token " + token); hasHdfsToken = true; } DelegationTokenToRenew dttr = allTokens.get(token); if (dttr == null) { dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token, getConfig(), now, shouldCancelAtEnd, evt.getUser()); try { renewToken(dttr); } catch (IOException ioe) { throw new IOException("Failed to renew token: " + dttr.token, ioe); } } tokenList.add(dttr); } } if (!tokenList.isEmpty()) { // Renewing token and adding it to timer calls are separated purposefully // If user provides incorrect token then it should not be added for // renewal. for (DelegationTokenToRenew dtr : tokenList) { DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr); if (currentDtr != null) { // another job beat us currentDtr.referringAppIds.add(applicationId); appTokens.get(applicationId).add(currentDtr); } else { appTokens.get(applicationId).add(dtr); setTimerForTokenRenewal(dtr); } } } if (!hasHdfsToken) { requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(), shouldCancelAtEnd); } }
RM:resourceManager
AM:applicationMaster
NM:nodeManager
簡單的說,yarn涉及到3個通信協議:
ApplicationClientProtocol:client通過該協議與RM通信,以后會簡稱其為CR協議
ApplicationMasterProtocol:AM通過該協議與RM通信,以后會簡稱其為AR協議
ContainerManagementProtocol:AM通過該協議與NM通信,以后會簡稱其為AN協議
---------------------------------------------------------------------------------------------------------------------
通常而言,客戶端向RM提交一個程序,流程是這樣滴:
step1:創建一個CR協議的客戶端
rmClient=(ApplicationClientProtocol)rpc.getProxy(ApplicationClientProtocol,rmAddress,conf)
step2:客戶端通過CR協議#getNewApplication從RM獲取唯一的應用程序ID,簡化過的代碼:
//GetNewApplicationRequest包含兩項信息:ApplicationId 和 最大可申請的資源量
//Records.newRecord(...)是一個靜態方法,通過序列化框架生成一些RPC過程需要的對象(yarn默認采用ProtocolBuffers(序列化框架,google ProtocolBuffers這些東東,麻煩大家google下呀,喵))
GetNewApplicationRequest request=Records.newRecord(GetNewApplicationRequest.class);
繼續看代碼(代碼都是簡化過的,親們原諒):
GetNewApplicationResponse newApp =rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();
step3:客戶端通過CR協議#submitApplication將AM提交到RM上,簡化過的代碼:
// 客戶端將啟動AM需要的所有信息打包到ApplicationSubmissionContext 中
ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class);
。。。。//設置應用程序名稱,優先級,隊列名稱雲雲
context.setApplicationName(appName);
//構造一個AM啟動上下文對象
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext .class)
。。。//設置AM相關的變量
amContainer.setLocalResource(localResponse);//設置AM啟動所需要的本地資源
amContainer.setEnvironment(env);
context.setAMContainerSpec(amContainer);
context.setApplicationId(appId);
SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(request);
rmClien.submitApplication(request);//將應用程序提交到RM上
--------------------------------------------------------------------------------------------------------------------------------------------------
通常而言,AM向RM注冊自己,申請資源,請求NM啟動Container的流程是這樣滴:
AM-RM流程:
step1:創建一個AR協議的客戶端
ApplicationMasterProtocol rmClient = (ApplicationMasterProtocol)rpc.getProxy(ApplicationMasterProtocol.class,rmAddress,conf);
step2:AM向RM注冊自己
//這里的 recordFactory.newRecordInstance(。。。)與上面的Records.newRecord(。。。)作用一樣,都屬於靜態調用
RegisterApplicationMasterRequest request =recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setHost(host);
request.setRpcPort(port);
request.setTrackingUrl(appTrackingUrl)
RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request);//完成注冊
step3:AM向RM請求資源
一段簡化的代碼如下(感興趣的朋友,還請親自閱讀源碼):
synchronized(this){
askList =new ArrayList<ResourceRequest>(ask);
releaseList = new ArrayList<ContainerId>(release);
allocateRequest = BuilderUtils.newAllocateRequest(....);構造一個 allocateRequest 對象
}
//向RM申請資源,同時領取新分配的資源(CPU,內存等)
allocateResponse = rmClient.allocate(allocateRequest ) ;
//根據RM的應答信息設計接下來的邏輯(資源分配)
.....
step4:AM告訴RM應用程序執行完畢,並退出
//構造請求對象
FinishApplicationMasterRequest request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class );
request.setFinishApplicationStatus(appStatus);
..//設置診斷信息
..//設置trackingUrl
//通知RM自己退出
rmclient.finishApplicationMaster(request);
--------------------------------------------------------------------------------------------------------------------------------------------
AM-NM流程 :
step1:構造AN協議客戶端,並啟動Container
String cmIpPortStr = container.getNodeId().getHost()+":"+container.getNodeId().getPort();
InetSocketAddress cmAddress=NetUtils.createSocketAddr(cmIpPortStr);
anClient = (ContainerManagementProtocol)rpc.getProxy(ContainerManagementProtocol.class,cmAddress,conf)
ContainerLaunchContext ctx=Records.newRecord(ContainerLaunchContext.class);
。。。//設置ctx變量
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
request.setContainerLaunchContext(ctx);
request.setContainer(container);
anClient.startContainer(request);
Step2:為了實時掌握各個Container運行狀態,AM可通過AN協議#getContainerStatus向NodeManager詢問Container運行狀態
Step3:一旦一個Container運行完成后,AM可通過AN協議#stopContainer釋放Container
===============================================================================================
參考文獻:
【1】http://www.aboutyun.com/thread-14277-1-1.html
【2】http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/
【3】http://www.bigdatas.cn/thread-59001-1-1.html
【4】http://bit1129.iteye.com/blog/2186238
【5】http://x-rip.iteye.com/blog/1541914