hadoop2.7之Mapper/reducer源碼分析


一切從示例程序開始:

示例程序

Hadoop2.7 提供的示例程序WordCount.java

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    //繼承泛型類Mapper
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    //定義hadoop數據類型IntWritable實例one,並且賦值為1
    private final static IntWritable one = new IntWritable(1);
    //定義hadoop數據類型Text實例word
    private Text word = new Text();
    //實現map函數    
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
        //Java的字符串分解類,默認分隔符“空格”、“制表符(‘\t’)”、“換行符(‘\n’)”、“回車符(‘\r’)”
      StringTokenizer itr = new StringTokenizer(value.toString());
      //循環條件表示返回是否還有分隔符。
      while (itr.hasMoreTokens()) {
       /*
    nextToken():返回從當前位置到下一個分隔符的字符串
    word.set()Java數據類型與hadoop數據類型轉換
    */
        word.set(itr.nextToken());
        //hadoop全局類context輸出函數write;
        context.write(word, one);
      }
    }
  }
  
  //繼承泛型類Reducer
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {

    //實例化IntWritable
    private IntWritable result = new IntWritable();
    //實現reduce
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      //循環values,並記錄單詞個數
      for (IntWritable val : values) {
        sum += val.get();
      }
      //Java數據類型sum,轉換為hadoop數據類型result
      result.set(sum);
      //輸出結果到hdfs
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    //實例化Configuration
    Configuration conf = new Configuration();
    /*
      GenericOptionsParser是hadoop框架中解析命令行參數的基本類。
      getRemainingArgs();返回數組【一組路徑】
      */
    /*
      函數實現
      public String[] getRemainingArgs() {
        return (commandLine == null) ? new String[]{} : commandLine.getArgs();
      }*/
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    //如果只有一個路徑,則輸出需要有輸入路徑和輸出路徑
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    //實例化job
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    /*
      指定CombinerClass類
      這里很多人對CombinerClass不理解
      */
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    //rduce輸出Key的類型,是Text
    job.setOutputKeyClass(Text.class);
    // rduce輸出Value的類型
    job.setOutputValueClass(IntWritable.class);
    //添加輸入路徑
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    //添加輸出路徑
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    //提交job
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

1.Mapper

  將輸入的鍵值對映射到一組中間的鍵值對。

  映射將獨立的任務的輸入記錄轉換成中間的記錄。裝好的中間記錄不需要和輸入記錄保持同一種類型。一個給定的輸入對可以映射成0個或者多個輸出對。

  Hadoop Map-Reduce框架為每個job產生的輸入格式(InputFormat)的InputSplit產生一個映射task。Mapper實現類通過JobConfigurable#configure(JobConf)獲取job的JobConf,並初始化自己。類似的,它們使用Closeable#close()方法消耗初始化。

  然后,框架為該任務的InputSplit中的每個鍵值對調用map(Object, Object, OutputCollector, Reporter)方法。

  所有關聯到給定輸出的中間值隨后由框架分組,並傳到Reducer來確定最終的輸出。用戶可通過指定一個比較器Compator來控制分組,Compator的指定通過JobConf#setOutputKeyComparatorClass(Class)完成。

  分組的Mapper輸出每個Reducer一個分區。用戶可以通過實現自定義的分區來控制哪些鍵(和記錄)到哪個Reducer。

  用戶可以選擇指定一個Combiner,通過JobConf#setCombinerClass(Class),來執行本地中間輸出的聚合,它可以幫助減少數據從Mapper到Reducer數據轉換的數量。

  中間、分組的輸出保存在SequeceFile文件中,應用可以指定中間輸出是否和怎么樣壓縮,壓縮算法可以通過JobConf來設置CompressionCodec。

  若job沒有reducer,Mapper的輸出直接寫到FileSystem,而不會根據鍵分組。

示例:

  

     public class MyMapper<K extends WritableComparable, V extends Writable> 
      extends MapReduceBase implements Mapper<K, V, K, V> {
      
        static enum MyCounters { NUM_RECORDS } private String mapTaskId; private String inputFile; private int noRecords = 0; public void configure(JobConf job) { mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); inputFile = job.get(JobContext.MAP_INPUT_FILE); } public void map(K key, V val, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Process the <key, value> pair (assume this takes a while) // ... // ... // Let the framework know that we are alive, and kicking! // reporter.progress(); // Process some more // ... // ... // Increment the no. of <key, value> pairs processed ++noRecords; // Increment counters reporter.incrCounter(NUM_RECORDS, 1); // Every 100 records update application-level status if ((noRecords%100) == 0) { reporter.setStatus(mapTaskId + " processed " + noRecords + " from input-file: " + inputFile); } // Output the result  output.collect(key, val); } }

上述應用自定義一個MapRunnable來對map處理過程進行更多的控制:如多線程Mapper等等。

或者示例:

 public class TokenCounterMapper 
     extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

應用可以重新(org.apache.hadoop.mapreduce.Mapper.Context)的run方法來來對映射處理進行更精確的控制,例如多線程的Mapper等等。

Mapper的方法:

  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
  throws IOException;

該方法將一個單獨的鍵值對輸入映射成一個中間鍵值對。

輸出鍵值對不需要和輸入鍵值對的類型保持一致,一個給定的數據鍵值對可以映射到0個或者多個輸出鍵值對。輸出鍵值對可以通過OutputCollector#collect(Object,Object)獲得的。

  應用可以使用Reporter提供處理報告或者僅僅是標示它們的存活。在一個應用需要相當多的時間來處理單獨的鍵值對的場景中,Report就非常重要了,因為框架可能認為task已經超期,並殺死那個task。避免這種情況的辦法是設置mapreduce.task.timeout到一個足夠大的值(或者設置為0表示永遠不會超時)。

mapper的層次結構:

2.Reducer

  將一組共享一個鍵的中間值減少到一小組值。

 用戶通過JobConf#setNumReducerTask(int)方法來設置job的Reducer的數目。Reducer的實現類通過JobConfigurable#configure(JobConf)方法來獲取job,並初始化它們。類似的,可通過Closeable#close()方法來消耗初始化。

  Reducer有是3個主要階段:

第一階段:洗牌,Reducer的輸入是Mapper的分組輸出。在這個階段,每個Reducer通過http獲取所有Mapper的相關分區的輸出。

第二階段:排序,在這個階段,框架根據鍵(因不同的Mapper可能產生相同的Key)將Reducer進行分組。洗牌和排序階段是同步發生的,例如:當取出輸出時,將合並它們。

  二次排序,若分組中間值等價的鍵規則和reduce之前鍵分組的規則不同時,那么其中之一可以通過JobConf#setOutputValueGroupingComparator(Class)來指定一個Comparator。

JobConf#setOutputKeyComparatorClass(Class)可以用來控制中間鍵分組,可以用在模擬二次排序的值連接中。

示例:若你想找出重復的web網頁,並將他們全部標記為“最佳”網址的示例。你可以這樣創建job:

  Map輸入的鍵:url

  Map輸入的值:document

  Map輸出的鍵:document checksum,url pagerank

  Map輸出的值:url

  分區:通過checksum

      輸出鍵比較器:通過checksum,然后是pagerank降序。

  輸出值分組比較器:通過checksum

Reduce

  在此階段,為在分組書中的每個<key,value數組>對調用reduce(Object, Iterator, OutputCollector, Reporter)方法。

  reduce task的輸出通常寫到寫到文件系統中,方法是:OutputCollector#collect(Object, Object)。

Reducer的輸出結果沒有重新排序。

示例:

     public class MyReducer<K extends WritableComparable, V extends Writable> 
      extends MapReduceBase implements Reducer<K, V, K, V> {
      
        static enum MyCounters { NUM_RECORDS }
         
        private String reduceTaskId;
        private int noKeys = 0;
        
        public void configure(JobConf job) {
          reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
        }
        
        public void reduce(K key, Iterator<V> values,
                           OutputCollector<K, V> output, 
                           Reporter reporter)
        throws IOException {
        
          // Process
          int noValues = 0;
          while (values.hasNext()) {
            V value = values.next();
            
            // Increment the no. of values for this key
            ++noValues;
            
            // Process the <key, value> pair (assume this takes a while)
            // ...
            // ...
            
            // Let the framework know that we are alive, and kicking!
            if ((noValues%10) == 0) {
              reporter.progress();
            }
          
            // Process some more
            // ...
            // ...
            
            // Output the <key, value> 
            output.collect(key, value);
          }
          
          // Increment the no. of <key, list of values> pairs processed
          ++noKeys;
          
          // Increment counters
          reporter.incrCounter(NUM_RECORDS, 1);
          
          // Every 100 keys update application-level status
          if ((noKeys%100) == 0) {
            reporter.setStatus(reduceTaskId + " processed " + noKeys);
          }
        }
      }

 下圖來源:http://x-rip.iteye.com/blog/1541914

3. Job

  3.1 上述示例程序最關鍵的一句:job.waitForCompletion(true)

 /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

  3.2 提交的過程

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect(); final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

  連接過程:

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

其中,

ugi定義在JobContextImpl.java中:

/**
* The UserGroupInformation object that has a reference to the current user
*/
protected UserGroupInformation ugi;

Cluster類提供了一個訪問map/reduce集群的接口:

public static enum JobTrackerStatus {INITIALIZING, RUNNING};
  
  private ClientProtocolProvider clientProtocolProvider;
  private ClientProtocol client;
  private UserGroupInformation ugi;
  private Configuration conf;
  private FileSystem fs = null;
  private Path sysDir = null;
  private Path stagingAreaDir = null;
  private Path jobHistoryDir = null;

  4. JobSubmitter

/**
   * Internal method for submitting jobs to the system.
   * 
   * <p>The job submission process involves:
   * <ol>
   *   <li>
   *   Checking the input and output specifications of the job.
   *   </li>
   *   <li>
   *   Computing the {@link InputSplit}s for the job.
   *   </li>
   *   <li>
   *   Setup the requisite accounting information for the 
   *   {@link DistributedCache} of the job, if necessary.
   *   </li>
   *   <li>
   *   Copying the job's jar and configuration to the map-reduce system
   *   directory on the distributed file-system. 
   *   </li>
   *   <li>
   *   Submitting the job to the <code>JobTracker</code> and optionally
   *   monitoring it's status.
   *   </li>
   * </ol></p>
   * @param job the configuration to submit
   * @param cluster the handle to the Cluster
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
         
          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(keyLen);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
 status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

上面所說,job的提交有如下過程:

1. 檢查job的輸入/輸出規范

2. 計算job的InputSplit

3. 如需要,計算job的DistributedCache所需要的前置計算信息

4. 復制job的jar和配置文件到分布式文件系統的map-reduce系統目錄

5. 提交job到JobTracker,還可以監視job的執行狀態。

若當前JobClient (0.22 hadoop) 運行在YARN.則job提交任務運行在YARNRunner

 Hadoop Yarn 框架原理及運作機制

主要步驟

  • 作業提交
  • 作業初始化
  • 資源申請與任務分配
  • 任務執行

具體步驟

    在運行作業之前,Resource Manager和Node Manager都已經啟動,所以在上圖中,Resource Manager進程和Node Manager進程不需要啟動

 

  • 1. 客戶端進程通過runJob(實際中一般使用waitForCompletion提交作業)在客戶端提交Map Reduce作業(在Yarn中,作業一般稱為Application應用程序)
  • 2. 客戶端向Resource Manager申請應用程序ID(application id),作為本次作業的唯一標識
  • 3. 客戶端程序將作業相關的文件(通常是指作業本身的jar包以及這個jar包依賴的第三方的jar),保存到HDFS上。也就是說Yarn based MR通過HDFS共享程序的jar包,供Task進程讀取
  • 4. 客戶端通過runJob向ResourceManager提交應用程序
  • 5.a/5.b. Resource Manager收到來自客戶端的提交作業請求后,將請求轉發給作業調度組件(Scheduler),Scheduler分配一個Container,然后Resource Manager在這個Container中啟動Application Master進程,並交由Node Manager對Application Master進程進行管理
  • 6. Application Master初始化作業(應用程序),初始化動作包括創建監聽對象以監聽作業的執行情況,包括監聽任務匯報的任務執行進度以及是否完成(不同的計算框架為集成到YARN資源調度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架為了運行在Yarn之上,它們都提供了ApplicationMaster)
  • 7. Application Master根據作業代碼中指定的數據地址(數據源一般來自HDFS)進行數據分片,以確定Mapper任務數,具體每個Mapper任務發往哪個計算節點,Hadoop會考慮數據本地性,本地數據本地性、本機架數據本地性以及最后跨機架數據本地性)。同時還會計算Reduce任務數,Reduce任務數是在程序代碼中指定的,通過job.setNumReduceTask顯式指定的
  • 8.如下幾點是Application Master向Resource Manager申請資源的細節
  • 8.1 Application Master根據數據分片確定的Mapper任務數以及Reducer任務數向Resource Manager申請計算資源(計算資源主要指的是內存和CPU,在Hadoop Yarn中,使用Container這個概念來描述計算單位,即計算資源是以Container為單位的,一個Container包含一定數量的內存和CPU內核數)。
  • 8.2 Application Master是通過向Resource Manager發送Heart Beat心跳包進行資源申請的,申請時,請求中還會攜帶任務的數據本地性等信息,使得Resource Manager在分配資源時,不同的Task能夠分配到的計算資源盡可能滿足數據本地性
  • 8.3 Application Master向Resource Manager資源申請時,還會攜帶內存數量信息,默認情況下,Map任務和Reduce任務都會分陪1G內存,這個值是可以通過參數mapreduce.map.memory.mb and mapreduce.reduce.memory.mb進行修改。

  5. YARNRunner

 @Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
 ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

 調用YarnClient的submitApplication()方法,其實現如下: 

  6. YarnClientImpl

@Override
  public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    // Automatically add the timeline DT into the CLC
    // Only when the security and the timeline service are both enabled
    if (isSecurityEnabled() && timelineServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);        
    while (true) {
      try {
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          LOG.error("Interrupted while waiting for application "
              + applicationId
              + " to be successfully submitted.");
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);
      }
    }

    return applicationId;
  }

 

  7. ClientRMService

ClientRMService是resource manager的客戶端接口。這個模塊處理從客戶端到resource mananger的rpc接口。

 @Override
  public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException {
    ApplicationSubmissionContext submissionContext = request
        .getApplicationSubmissionContext();
    ApplicationId applicationId = submissionContext.getApplicationId();

    // ApplicationSubmissionContext needs to be validated for safety - only
    // those fields that are independent of the RM's configuration will be
    // checked here, those that are dependent on RM configuration are validated
    // in RMAppManager.

    String user = null;
    try {
      // Safety
      user = UserGroupInformation.getCurrentUser().getShortUserName();
    } catch (IOException ie) {
      LOG.warn("Unable to get the current user.", ie);
      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
          ie.getMessage(), "ClientRMService",
          "Exception in submitting application", applicationId);
      throw RPCUtil.getRemoteException(ie);
    }

    // Check whether app has already been put into rmContext,
    // If it is, simply return the response
    if (rmContext.getRMApps().get(applicationId) != null) {
      LOG.info("This is an earlier submitted application: " + applicationId);
      return SubmitApplicationResponse.newInstance();
    }

    if (submissionContext.getQueue() == null) {
      submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
    }
    if (submissionContext.getApplicationName() == null) {
      submissionContext.setApplicationName(
          YarnConfiguration.DEFAULT_APPLICATION_NAME);
    }
    if (submissionContext.getApplicationType() == null) {
      submissionContext
        .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
    } else {
      if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
        submissionContext.setApplicationType(submissionContext
          .getApplicationType().substring(0,
            YarnConfiguration.APPLICATION_TYPE_LENGTH));
      }
    }

    try {
      // call RMAppManager to submit application directly
 rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user);

      LOG.info("Application with id " + applicationId.getId() + 
          " submitted by user " + user);
      RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
          "ClientRMService", applicationId);
    } catch (YarnException e) {
      LOG.info("Exception in submitting application with id " +
          applicationId.getId(), e);
      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
          e.getMessage(), "ClientRMService",
          "Exception in submitting application", applicationId);
      throw e;
    }

    SubmitApplicationResponse response = recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
    return response;
  }

調用RMAppManager來直接提交application

 @SuppressWarnings("unchecked")
  protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    RMAppImpl application =
        createAndPopulateNewRMApp(submissionContext, submitTime, user);
    ApplicationId appId = submissionContext.getApplicationId();

    if (UserGroupInformation.isSecurityEnabled()) {
      try {
        this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
            parseCredentials(submissionContext),
            submissionContext.getCancelTokensWhenComplete(),
            application.getUser());
      } catch (Exception e) {
        LOG.warn("Unable to parse credentials.", e);
        // Sending APP_REJECTED is fine, since we assume that the
        // RMApp is in NEW state and thus we haven't yet informed the
        // scheduler about the existence of the application
        assert application.getState() == RMAppState.NEW;
        this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
        throw RPCUtil.getRemoteException(e);
      }
    } else {
      // Dispatcher is not yet started at this time, so these START events
      // enqueued should be guaranteed to be first processed when dispatcher
      // gets started.
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
    }
  }

  8.RMAppManager

 @SuppressWarnings("unchecked")
  protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    RMAppImpl application =
        createAndPopulateNewRMApp(submissionContext, submitTime, user);
    ApplicationId appId = submissionContext.getApplicationId();

    if (UserGroupInformation.isSecurityEnabled()) {
      try {
        this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser());
      } catch (Exception e) {
        LOG.warn("Unable to parse credentials.", e);
        // Sending APP_REJECTED is fine, since we assume that the
        // RMApp is in NEW state and thus we haven't yet informed the
        // scheduler about the existence of the application
        assert application.getState() == RMAppState.NEW;
        this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
        throw RPCUtil.getRemoteException(e);
      }
    } else {
      // Dispatcher is not yet started at this time, so these START events
      // enqueued should be guaranteed to be first processed when dispatcher
      // gets started.
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
    }
  }

  9. 異步增加Application--DelegationTokenRenewer

  /**
   * Asynchronously add application tokens for renewal.
   * @param applicationId added application
   * @param ts tokens
   * @param shouldCancelAtEnd true if tokens should be canceled when the app is
   * done else false. 
   * @param user user
   */
  public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
      boolean shouldCancelAtEnd, String user) {
 processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent( applicationId, ts, shouldCancelAtEnd, user));
  }

  調用如下:

  private void processDelegationTokenRenewerEvent(
      DelegationTokenRenewerEvent evt) {
    serviceStateLock.readLock().lock();
    try {
      if (isServiceStarted) {
        renewerService.execute(new DelegationTokenRenewerRunnable(evt));
      } else {
        pendingEventQueue.add(evt);
      }
    } finally {
      serviceStateLock.readLock().unlock();
    }
  }

從上面可以看到,通過鎖形式來讓線程池來處理事件或者放入到事件隊列中中。

新啟一個線程:

 @Override
    public void run() {
      if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
        DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
            (DelegationTokenRenewerAppSubmitEvent) evt;
        handleDTRenewerAppSubmitEvent(appSubmitEvt);
      } else if (evt.getType().equals(
          DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
        DelegationTokenRenewer.this.handleAppFinishEvent(evt);
      }
    }

 

 @SuppressWarnings("unchecked")
    private void handleDTRenewerAppSubmitEvent(
        DelegationTokenRenewerAppSubmitEvent event) {
      /*
       * For applications submitted with delegation tokens we are not submitting
       * the application to scheduler from RMAppManager. Instead we are doing
       * it from here. The primary goal is to make token renewal as a part of
       * application submission asynchronous so that client thread is not
       * blocked during app submission.
       */
      try {
        // Setup tokens for renewal
        DelegationTokenRenewer.this.handleAppSubmitEvent(event);
        rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(event.getApplicationId(), RMAppEventType.START));
      } catch (Throwable t) {
        LOG.warn(
            "Unable to add the application to the delegation token renewer.",
            t);
        // Sending APP_REJECTED is fine, since we assume that the
        // RMApp is in NEW state and thus we havne't yet informed the
        // Scheduler about the existence of the application
        rmContext.getDispatcher().getEventHandler().handle(
            new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
      }
    }
  }

 

private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
      throws IOException, InterruptedException {
    ApplicationId applicationId = evt.getApplicationId();
    Credentials ts = evt.getCredentials();
    boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
    if (ts == null) {
      return; // nothing to add
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Registering tokens for renewal for:" +
          " appId = " + applicationId);
    }

    Collection<Token<?>> tokens = ts.getAllTokens();
    long now = System.currentTimeMillis();

    // find tokens for renewal, but don't add timers until we know
    // all renewable tokens are valid
    // At RM restart it is safe to assume that all the previously added tokens
    // are valid
    appTokens.put(applicationId,
      Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));
    Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
    boolean hasHdfsToken = false;
    for (Token<?> token : tokens) {
      if (token.isManaged()) {
        if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
          LOG.info(applicationId + " found existing hdfs token " + token);
          hasHdfsToken = true;
        }

        DelegationTokenToRenew dttr = allTokens.get(token);
        if (dttr == null) {
          dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
              getConfig(), now, shouldCancelAtEnd, evt.getUser());
          try {
            renewToken(dttr);
          } catch (IOException ioe) {
            throw new IOException("Failed to renew token: " + dttr.token, ioe);
          }
        }
        tokenList.add(dttr);
      }
    }

    if (!tokenList.isEmpty()) {
      // Renewing token and adding it to timer calls are separated purposefully
      // If user provides incorrect token then it should not be added for
      // renewal.
      for (DelegationTokenToRenew dtr : tokenList) {
        DelegationTokenToRenew currentDtr =
            allTokens.putIfAbsent(dtr.token, dtr);
        if (currentDtr != null) {
          // another job beat us
          currentDtr.referringAppIds.add(applicationId);
          appTokens.get(applicationId).add(currentDtr);
        } else {
          appTokens.get(applicationId).add(dtr);
          setTimerForTokenRenewal(dtr);
        }
      }
    }

    if (!hasHdfsToken) {
      requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
        shouldCancelAtEnd);
    }
  }

 RM:resourceManager
AM:applicationMaster
NM:nodeManager
簡單的說,yarn涉及到3個通信協議:
ApplicationClientProtocol:client通過該協議與RM通信,以后會簡稱其為CR協議
ApplicationMasterProtocol:AM通過該協議與RM通信,以后會簡稱其為AR協議
ContainerManagementProtocol:AM通過該協議與NM通信,以后會簡稱其為AN協議
---------------------------------------------------------------------------------------------------------------------
通常而言,客戶端向RM提交一個程序,流程是這樣滴:
step1:創建一個CR協議的客戶端
rmClient=(ApplicationClientProtocol)rpc.getProxy(ApplicationClientProtocol,rmAddress,conf)

step2:客戶端通過CR協議#getNewApplication從RM獲取唯一的應用程序ID,簡化過的代碼:
//GetNewApplicationRequest包含兩項信息:ApplicationId 和 最大可申請的資源量
//Records.newRecord(...)是一個靜態方法,通過序列化框架生成一些RPC過程需要的對象(yarn默認采用ProtocolBuffers(序列化框架,google ProtocolBuffers這些東東,麻煩大家google下呀,喵))
GetNewApplicationRequest request=Records.newRecord(GetNewApplicationRequest.class);

繼續看代碼(代碼都是簡化過的,親們原諒):
GetNewApplicationResponse newApp =rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();

step3:客戶端通過CR協議#submitApplication將AM提交到RM上,簡化過的代碼:
// 客戶端將啟動AM需要的所有信息打包到ApplicationSubmissionContext 中
ApplicationSubmissionContext  context = Records.newRecord(ApplicationSubmissionContext.class);

。。。。//設置應用程序名稱,優先級,隊列名稱雲雲
context.setApplicationName(appName);
//構造一個AM啟動上下文對象 
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext .class)
。。。//設置AM相關的變量
amContainer.setLocalResource(localResponse);//設置AM啟動所需要的本地資源
amContainer.setEnvironment(env);
context.setAMContainerSpec(amContainer);
context.setApplicationId(appId);
SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); 
request.setApplicationSubmissionContext(request);
rmClien.submitApplication(request);//將應用程序提交到RM上 
--------------------------------------------------------------------------------------------------------------------------------------------------
通常而言,AM向RM注冊自己,申請資源,請求NM啟動Container的流程是這樣滴:
AM-RM流程:
step1:創建一個AR協議的客戶端
ApplicationMasterProtocol  rmClient = (ApplicationMasterProtocol)rpc.getProxy(ApplicationMasterProtocol.class,rmAddress,conf);
step2:AM向RM注冊自己
//這里的 recordFactory.newRecordInstance(。。。)與上面的Records.newRecord(。。。)作用一樣,都屬於靜態調用
RegisterApplicationMasterRequest  request =recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); 

request.setHost(host);
request.setRpcPort(port);
request.setTrackingUrl(appTrackingUrl) 
RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request);//完成注冊
step3:AM向RM請求資源
一段簡化的代碼如下(感興趣的朋友,還請親自閱讀源碼):
synchronized(this){
askList =new ArrayList<ResourceRequest>(ask);
releaseList = new ArrayList<ContainerId>(release);
allocateRequest = BuilderUtils.newAllocateRequest(....);構造一個 allocateRequest 對象

//向RM申請資源,同時領取新分配的資源(CPU,內存等)
allocateResponse = rmClient.allocate(allocateRequest ) ;
//根據RM的應答信息設計接下來的邏輯(資源分配)
..... 
step4:AM告訴RM應用程序執行完畢,並退出
//構造請求對象
FinishApplicationMasterRequest  request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class );
request.setFinishApplicationStatus(appStatus);
..//設置診斷信息
..//設置trackingUrl
//通知RM自己退出
rmclient.finishApplicationMaster(request); 
--------------------------------------------------------------------------------------------------------------------------------------------
AM-NM流程 :
step1:構造AN協議客戶端,並啟動Container
String cmIpPortStr = container.getNodeId().getHost()+":"+container.getNodeId().getPort();
InetSocketAddress   cmAddress=NetUtils.createSocketAddr(cmIpPortStr);
anClient = (ContainerManagementProtocol)rpc.getProxy(ContainerManagementProtocol.class,cmAddress,conf)
ContainerLaunchContext  ctx=Records.newRecord(ContainerLaunchContext.class);
。。。//設置ctx變量
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
request.setContainerLaunchContext(ctx);  
request.setContainer(container); 
anClient.startContainer(request);
Step2:為了實時掌握各個Container運行狀態,AM可通過AN協議#getContainerStatus向NodeManager詢問Container運行狀態 
Step3:一旦一個Container運行完成后,AM可通過AN協議#stopContainer釋放Container 
===============================================================================================


參考文獻:

【1】http://www.aboutyun.com/thread-14277-1-1.html

【2】http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/

【3】http://www.bigdatas.cn/thread-59001-1-1.html

【4】http://bit1129.iteye.com/blog/2186238

【5】http://x-rip.iteye.com/blog/1541914


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM