MapReduce過程源碼分析


MapReduce過程源碼分析


Mapper

  首先mapper完成映射,將word映射成(word,1)的形式。

  MapReduce進程,Map階段也叫MapTask,在MapTask中會通過run()方法來調用我們用戶重寫的mapper() 方法,

  分布式的運算程序往往需要分成至少兩個階段:Map階段和Reduce階段。

  第一個階段,即Map階段的maptask並發實例,完全並行獨立運行,互不相干,如Map將要處理的多個文件的每個文件分成3份,分別放在集群中的各個數據節點,Map階段中由maptask進程來處理已經存進來的文件,一行一行地去讀數據,按空格切分行內單詞,切分完畢之后,將單詞統計出來以hashmap存儲,其中以單詞為key,以1作為單詞的value。等到分配給自己的數據片全部讀完之后,將這個hashmap按照首個字母的范圍分成2個hashmap(分區排序),兩個hashmap分別為:HashMap(a-p)和HashMap(q-z)。

  第二個階段的reduce task並發實例互不相干,但是他們的數據依賴於上一個階段的所有maptask的並發實例的輸出。
reduce task 分別統計a-p開頭的單詞和q-z開頭的單詞,然后輸出結果到文件。

  注意:MapReduce編程模型只能包含一個map階段和一個reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行執行。

  那么maptask如何進行任務分配?
  reducetask如何進行任務分配?
  maptask和reducetask之間如何銜接?
  如果maptask運行失敗,如何處理?
  maptask如果都要自己負責輸出數據的分區,很麻煩
  MrAPPMaster負責整個程序的過程調度及狀態的協調。

  三個進程分別對應三個類:
  三個進程:
    1)MrAppMaster:負責整個程序的過程調度及狀態協調
    2)MapTask:負責map階段的整個數據處理流程
    3)ReduceTask:負責reduce階段的整個數據處理流程
  分別對應的類:
    1)Driver階段
    整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象

    2)Mapper階段
    (1)用戶自定義的Mapper要繼承自己的父類
    (2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)
    (3)Mapper中的業務邏輯寫在map()方法中
    (4)Mapper的輸出數據是KV對的形式(KV的類型可自定義)
    (5)map()方法(maptask進程)對每一個<K,V>調用一次
    3)Reducer階段
    (1)用戶自定義的Reducer要繼承自己的父類
    (2)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
    (3)Reducer的業務邏輯寫在reduce()方法中
    (4)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法

數據切片與MapTask並行度的決定機制

  1.一個Job的Map階段並行度由客戶端在提交Job時的切片數決定
  2.每一個Split切片分配一個MapTask並行實例處理
  3.默認情況下,切片大小=BlockSize
  4.切片時不靠路數據集整體,而是諸葛針對每一個文件單獨切片。

job的提交過程分析

1 提交任務---->2 檢查狀態(if (state == JobState.DEFINE) {submit();})---->3.0 submit():3.1 確保job狀態(ensureState(JobState.DEFINE))---->3.2 使用新的API(setUseNewAPI())---->3.3 連接集群(connect())---->3.4 根據get到的集群獲取任務提交器(final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()))---->3.5 submitter提交任務(return submitter.submitJobInternal(Job.this, cluster))----->3.5.1檢查輸出路徑是否設置以及輸出路徑是否存在(checkSpecs(jobs))---->3.5.2注冊JobId(JobID jobId = submitClient.getNewJobID();)---->向目錄拷貝一個文件,該文件就是我們之前(setJarByClass(xxx.class))設置好的jar包---->客戶端就是通過該方法調用InputFormat來給我們的輸入文件進行切片---->切片之后,執行conf.setInt(MRJobConfig.NUM_MAPS, maps);將切片信息寫入到目錄(submitJobDir)中---->把job的配置信息conf也提交到submitDir---->任務正式提交。

首先我們調用:
boolean b = job.waitForCompletion(true);

該方法的主體:

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

然后調用submit()方法:

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

在submit()方法中,通過ensureState(JobState.DEFINE)方法再次確認job的狀態是否為DEFINE,如果是,就設置使用新的API(hadoop2.x版本升級了很多新的API,而老的版本調用MapReduce程序的時候,在這里自動轉成新的API),然后調用connect()方法建立和yarn集群的連接。
connect()方法的內容如下:

 private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

connect()方法中,首先判斷cluster,如果集群為null,那么就返回一個新的集群,return new Cluster(getConfiguration());,如果任務的配置是本地模式就是一個LocalMaster,如果是yarn集群就是YarnMaster。

連接到cluster之后,然后就可以提交我們要執行的任務了,這時執行
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

通過返回的集群的客戶端和協議來獲得一個submitter,然后執行語句
return submitter.submitJobInternal(Job.this, cluster);
利用submitter來真正的提交我們的job任務,submitJobInternal(Job.this, cluster)這個方法是真正提交job任務的方法,該方法的內容見文章最后。

然后通過checkSpecs(jobs)方法檢查輸出路徑是否設置以及輸出路徑是否存在,然后執行語句:
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);根據我們設置的cluster獲取我們的stagingDir,即存放MapReduce過程中產生數據的臨時文件夾。
然后執行JobID jobId = submitClient.getNewJobID();對我們的job進行注冊,進而得到JobId,通過job.setJobID(jobId);設置我們注冊好的jobId,然后執行copyAndConfigureFiles(job, submitJobDir);向目錄拷貝一個文件,該文件就是我們之前(setJarByClass(xxx.class))設置好的jar包,然后執行int maps = writeSplits(job, submitJobDir);客戶端就是通過該方法調用InputFormat來給我們的輸入文件進行切片,切片之后,執行conf.setInt(MRJobConfig.NUM_MAPS, maps);將切片信息寫入到目錄(submitJobDir)中。所謂切片信息就是標注了哪台機器處理哪個切片數據,相當於一個索引信息,有了這個索引信息,MapReduce的APPMaster在執行任務的時候就可以知道啟動幾個maptask並且知道每個機器處理哪一個部分的數據,所以這個信息也是要提交到HDFS的臨時目錄里面。
然后執行writeConf(conf, submitJobFile);把我們的job的配置信息conf也提交到submitDir,執行完之后,臨時目錄中生成job.xml(存放job的配置信息,包括集群的各種手動配置及默認配置信息)。

其實到此為止,這一系列的工作都是在為集群的工作做准備,集群中創建一個臨時目錄,它是可以供集群中所有的數據節點進行訪問的,首先在集群的臨時目錄中存放了jar包,然后放置了切片信息,最后又放置了配置文件,這樣maptask可以到臨時文件夾中讀取存放的這三個信息,進而執行他們各自的任務。

然后程序第240行真正進行job的提交,然后任務開始運行。

至此程序執行完畢。

總結

任務的提交過程:首先是檢查任務的狀態,檢查輸出目錄,都沒問題之后,然后開始連接集群,因為任務都是交由集群中的其他人來執行,所以其他人需要得知這個任務的必要信息,因此job提交的時候有必要將這些任務的必要信息提交到大家都可以訪問的臨時目錄(在HDFS上),這些必要的信息包括:jar包、切片信息以及配置信息(job.xml)。

附:submitJobInternal(Job.this, cluster)方法原代碼:

/**
   * Internal method for submitting jobs to the system.
   * 
   * <p>The job submission process involves:
   * <ol>
   *   <li>
   *   Checking the input and output specifications of the job.
   *   </li>
   *   <li>
   *   Computing the {@link InputSplit}s for the job.
   *   </li>
   *   <li>
   *   Setup the requisite accounting information for the 
   *   {@link DistributedCache} of the job, if necessary.
   *   </li>
   *   <li>
   *   Copying the job's jar and configuration to the map-reduce system
   *   directory on the distributed file-system. 
   *   </li>
   *   <li>
   *   Submitting the job to the <code>JobTracker</code> and optionally
   *   monitoring it's status.
   *   </li>
   * </ol></p>
   * @param job the configuration to submit
   * @param cluster the handle to the Cluster
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM