MapReduce剖析筆記之五:Map與Reduce任務分配過程


在上一節分析了TaskTracker和JobTracker之間通過周期的心跳消息獲取任務分配結果的過程。中間留了一個問題,就是任務到底是怎么分配的。任務的分配自然是由JobTracker做出來的,具體來說,存在一個抽象類:TaskScheduler,主要負責分配任務,繼承該類的有幾個類:

CapacityTaskScheduler、FairScheduler、JobQueueTaskScheduler(LimitTasksPerJobTaskScheduler又繼承於該類)。

從名字大致可以看出,CapacityTaskScheduler應該是根據容量進行分配,FairScheduler實現相對公平的一些分配,JobQueueTaskScheduler似乎是按照隊列分配,難道是根據隊列的任務按照順序來分配?那豈不是沒有優先級、搶占等等的概念了,微觀來看,多線程並發執行的時候,線程還有個搶占機制呢,Map或Reduce任務最終實際上就是個JAVA進程,只是一個分布式的,其分配策略應該與操作系統中的線程分配有類似的思想,不過沒研究過操作系統里線程是咋分配的,所以這里也無法對比,這里只是提一句,任務分配作為一個大到社會資源,小到CPU資源,都有類似的分配過程,理解其核心策略才是最重要的,Hadoop里也不可能突破人類認知范圍,出現十全十美的分配策略,只是一種適應某種場景的分配策略。

MapReduce里默認是使用JobQueueTaskScheduler分配任務的,下文進行分析。

 

任務分配過程在JobQueueTaskScheduler的assignTasks方法中。

這個方法的輸入參數是TaskTracker taskTracker,也就是對某個TaskTracker進行分配。

首先來看它第一步做什么。

    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
    final int numTaskTrackers = clusterStatus.getTaskTrackers();
    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();

首先,獲得該TaskTracker的狀態,這個狀態是通過心跳響應從TaskTracker傳給JobTracker的,這個也好理解,只有TaskTracker知道自己處於什么狀態,通過心跳把自身的狀態上報。

之后,獲得集群狀態。ClusterStatus是一個描述集群狀態的類,用哪些參數來描述集群的狀態呢?下面是他的主要變量:

public class ClusterStatus implements Writable {

  private int numActiveTrackers;
  private Collection<String> activeTrackers = new ArrayList<String>();
  private Collection<String> blacklistedTrackers = new ArrayList<String>();
  private Collection<String> graylistedTrackers = new ArrayList<String>();
  private int numBlacklistedTrackers;
  private int numGraylistedTrackers;
  private int numExcludedNodes;
  private long ttExpiryInterval;
  private int map_tasks;
  private int reduce_tasks;
  private int max_map_tasks;
  private int max_reduce_tasks;
  private JobTracker.State state;

可以看出,主要有當前活動的TaskTracker(numActiveTrackers),處於黑名單和灰名單的TaskTracker,這個概念在上一節分析時提過,主要是指一些TaskTracker在應該心跳的時候可能沒出現心跳,可能原因是機器出現一些故障了,性能降低了等等,總之,在JobTracker這個總管看來,凡是沒有在該出現的時間出現的小弟,就被認為是不信任的小弟,於是加入黑名單。

另外,還有當前Map,Reduce的任務數量,以及集群最大的Map,Reduce任務數量。看起來也比較簡單。

之后,獲取當前的作業隊列。

    //
    // Get map + reduce counts for the current tracker.
    //
    final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
    final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
    final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

    // Assigned tasks
    List<Task> assignedTasks = new ArrayList<Task>();

之后,獲取TaskTracker的最大容量,即Map最大數量、Reduce最大數量、以及正在運行着的Map、Reduce數量。

接着,計算當前隊列里所有需要分配的任務數量(與TaskTracker無關,理論上,所有任務都有可能分配到這個TaskTracker)。

    int remainingReduceLoad = 0;
    int remainingMapLoad = 0;
    synchronized (jobQueue) {
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
          if (job.scheduleReduces()) {
            remainingReduceLoad += 
              (job.desiredReduces() - job.finishedReduces());
          }
        }
      }
    }

根據這個總數,除以集群的全部容量(就是全部TaskTracker的容量之和),可以得到目前Map和Reduce的負載因子,這實際上是評估目前集群的負載壓力:

    // Compute the 'load factor' for maps and reduces
    double mapLoadFactor = 0.0;
    if (clusterMapCapacity > 0) {
      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
    }
    double reduceLoadFactor = 0.0;
    if (clusterReduceCapacity > 0) {
      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
    }

整個集群的負載壓力跟對某個TaskTracker分配任務有什么關系呢?試想一下,如果我們要給某個TaskTracker分配任務,總不能把很多任務都分給他把,應該要使他的負載壓力跟整個集群的負載情況大致差不多,這樣才算做到了負載均衡把?所以先得到集群的負載情況,可以為后面TaskTracker的負載情況進行一個對比。接着往下看:

    final int trackerCurrentMapCapacity = 
      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
                              trackerMapCapacity);
    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;

這句話是計算要分配任務這個TaskTracker的當前容量,獲得目前可以分配的Map數量。可以想見,一個TaskTracker的容量實際上事先是指定好的,是靜態的,比如可以執行多少個Map,多少個Reduce,但當前的容量,實際上是根據集群負載情況計算得到的,就是前面說的那個原則,雖然某個機器的最大容量很大,但因為當前負載可能比較低,所以也不能拼命給他分配任務,否則后面如果有大量任務來了(集群整體負載上去了)這個機器就分配不了多少任務,但有可能上面有不少數據,這些數據要計算,豈不是要搬到其他機器上去算?IO消耗必然帶來計算耗時加大。

好了,在得到TaskTracker中可以分配的Map數量后,是不是就可以從Job隊列里取出Map隨意進行分配了呢?還不是,還需要進行一個檢測,Hadoop認為需要為一些任務預留一些資源,哪些任務需要預留資源呢?從源代碼的解釋來看,主要是:failures and speculative executions,直觀上看,其中文含義是失敗的任務和預測執行,或者投機任務。失敗的任務好理解,就是一個任務在某個機器上執行失敗了,應該重新調度執行,而不應該不管了;預測執行到底是什么意思呢?要理解這句話,需要理解Hadoop里面的一個優化方式。我們知道,一個作業會分為很多Map任務並發執行,當執行完了Map后才開始Reduce任務的執行,因此,一個作業的執行時間取決於Map的時間和Reduce的時間之和。對於Map或Reduce階段,因為有很多任務同時執行,這個階段的執行時間就取決於執行最慢的那個Map或Reduce任務,為了加速執行,對於很慢的那些Map或Reduce任務,Hadoop會啟動多份,執行相同的代碼,那么Hadoop怎么知道哪個任務執行最慢呢?自然是靠心跳的時候TaskTracker把各個任務的進度匯報上來,當大部分都執行完了,就剩幾個進度很慢的任務,就可能會啟動推測執行,實際上個人覺得叫冗余執行更好理解。在Hadoop里一個在執行的任務稱為Task Attemt,Task就像一個類,而Task Attemt就如同一個實例化的對象,是任務的一次實現。這種實現可以有多個,因為啟動了多個,那么,最先完的那個就代表這個任務執行完了,其它還沒執行完的就停了就行了。是一種用空間換時間的策略,如果不保留一點資源,當出現任務很慢的時候,想換都沒得換了。失敗的任務也類似,一個失敗的任務,需要調度到其它機器上重新執行一遍,可能就成功了。所以集群內的機器需要保留一小部分資源,防止出現這些情況時需要利用資源。

這種預留資源稱為Padding,不能把一台集群內的資源全部都分完了,如果出現這種狀況,我稱之為到達了預警狀態,此時就得慢慢分才行,一次心跳只給一個機器分配一個任務就返回,否則可以分配多個任務返回。那么, 到底要預留多少資源呢?從代碼來看,最多預留當前正在執行任務的1%的資源,也就是由下面的變量控制:

    padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 0.01f);

比如,整個集群正在執行1000個Map任務,此時最大容量是多少算到達了預警狀態呢?Hadoop里認為跟要分配任務的機器有關,比如目前的機器如果最大容量是4個Map任務,而1000x1%=10個,兩者取其小值,即4,如果當前容量是1004,即到達了預警狀態,此次心跳就只能為這個機器分配1個Map任務。而如果這台機器最大容量是20,大於10,則如果當前容量是1010,則到達了預警狀態。為什么這種預警狀態的判斷跟具體分配任務的機器有關呢?1%當然是一個上限,說白了預留的資源也不能太多了。最多預留10個,最少預留多少呢?跟機器容量有關,極端一點,假如每台機器最多就能執行1個Map任務,那么最少預留1個。這個似乎也可以理解,如果機器性能都很好,那就多預留點,如果機器性能都不咋地,那就少預留點,但是也不能不預留。判斷是否到達預警狀態的方法如下,其中maxTaskTrackerSlots就是指當前需要分配任務的機器的最大容量:

  private boolean exceededPadding(boolean isMapTask, 
                                  ClusterStatus clusterStatus, 
                                  int maxTaskTrackerSlots) { 
    int numTaskTrackers = clusterStatus.getTaskTrackers();
    int totalTasks = 
      (isMapTask) ? clusterStatus.getMapTasks() : 
        clusterStatus.getReduceTasks();
    int totalTaskCapacity = 
      isMapTask ? clusterStatus.getMaxMapTasks() : 
                  clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();

    boolean exceededPadding = false;
    synchronized (jobQueue) {
      int totalNeededTasks = 0;
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() != JobStatus.RUNNING ||
            job.numReduceTasks == 0) {
          continue;
        }

        //
        // Beyond the highest-priority task, reserve a little 
        // room for failures and speculative executions; don't 
        // schedule tasks to the hilt.
        //
        totalNeededTasks += 
          isMapTask ? job.desiredMaps() : job.desiredReduces();
        int padding = 0;
        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
          padding = 
            Math.min(maxTaskTrackerSlots,
                     (int) (totalNeededTasks * padFraction));
        }
        if (totalTasks + padding >= totalTaskCapacity) {
          exceededPadding = true;
          break;
        }
      }
    }

    return exceededPadding;
  }

接下來,JobQueueTaskScheduler會對該機器上所有剩余容量(也就是還能執行多少個Map或Reduce)進行逐一分配,調用的方法是:

          // Try to schedule a Map task with locality between node-local 
          // and rack-local
          t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());

即從隊列里循環取出一個JobInProgress對象,執行該方法。當前心跳的TaskTracker是某個服務器上的一個JAVA進程,這台服務器稱為S1,一個Job可能涉及到很多待處理的數據,這些數據可能位於S1的磁盤上,也可能在其他服務器上,而其他服務器,可能與S1位於同一機架內,也可能位於其他機架。這個方法的目的就是獲取一個Map任務,這個Map任務要處理的數據要么在S1的磁盤上,要么在與S1位於同一機架內的服務器的磁盤上。因為把這樣的任務調度到S1上,數據就不需要跨機架傳輸了,甚至就在本地。這種原則是Hadoop里面最為核心的一個原則,稱為數據本地性。

obtainNewNodeOrRackLocalMapTask調用了該JobInProgress對象的另一個方法obtainNewMapTaskCommon。
在前面分析Job初始化的時候,我們知道Job初始化過程中根據Split的個數創建了一個map對象數組:

    maps = new TaskInProgress[numMapTasks];
    for(int i=0; i < numMapTasks; ++i) {
      inputLength += splits[i].getInputDataLength();
      maps[i] = new TaskInProgress(jobId, jobFile, 
                                   splits[i], 
                                   jobtracker, conf, this, i, numSlotsPerMap);
    }

可以想見,分配過程應該就是從作業的這個Map數組中取出Map任務返回。不過取哪些Map呢?難道又是按照隊列來取?因為不同的Map對應於不同的Split,而不同的Split位於不同的機器,考慮到上面說的本地化策略,則不會那么簡單,需要考慮數據本地性。

obtainNewMapTaskCommon方法里調用了另一個方法findNewMapTask,這個方法是Map任務分配中最核心的方法,理解了這個方法,也就理解了Map任務分配過程。其聲明為:

  private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                          final int clusterSize,
                                          final int numUniqueHosts,
                                          final int maxCacheLevel,
                                          final double avgProgress)

其中,tts也就是跟JobTracker保持心跳的那個TaskTracker的狀態,ClusterSize就是當前集群中所有TaskTracker的數量,numUniqueHosts是指集群中機器的數量,從這里可以看出,機器數量和TaskTracker數量還不一樣,一個機器也可以有多個TaskTracker,因為TaskTracker本質上就是一個JAVA進程。maxCacheLevel表示本地化策略,這個變量十分關鍵,控制了findNewMapTask方法中應該執行哪些代碼。該變量是前面由方法obtainNewNodeOrRackLocalMapTask傳過來的,具體代碼為:

  public synchronized Task obtainNewNodeOrRackLocalMapTask(
      TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
  throws IOException {
    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel);
  }

也就是maxLevel,該變量是JobInProgress類中的一個固定變量。默認值是:

    this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;

DEFAULT_HOST_LEVEL的值等於2,即兩個層次,Hadoop將網絡拓撲看做一顆樹,主機作為一層,其上層是機架,兩層的意思就是在主機本身和機架內部尋找任務。比如如果這個值是anyCacheLevel:

    this.anyCacheLevel = this.maxLevel+1;

那就意味着,對於某個要分配任務的節點來說,某個任務的處理數據只要是在節點本地的,或者跟這個節點在一個機架內的(還是在一個交換機下面),甚至是隔着一個交換機的(第三層),都可以分配給這個節點,這就失去了本地化的意義了,所以obtainNewNodeOrRackLocalMapTask這個方法顧名思義就是限定在主機和機架內,而通過maxLevel參數設置為2,即可限定查找范圍。

另外,avgProgress是一個0-1之間的數,表示Map任務的進度,這個參數后面會用來評估哪些Map任務執行進度較慢,以啟動推測執行,后面分析中會看到。avgProgress屬於JobStatus這個類,指的是一個作業中所有Map任務的平均進度。

下面我們仔細分析findNewMapTask這個方法的具體流程。

1,執行shouldRunOnTaskTracker方法,該方法判斷是否應該在這個TaskTracker上分配任務,其代碼為:

  private boolean shouldRunOnTaskTracker(String taskTracker) {
    //
    // Check if too many tasks of this job have failed on this
    // tasktracker prior to assigning it a new one.
    //
    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
        taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
      if (LOG.isDebugEnabled()) {
        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
        LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
                  + "' for assigning a new task");
      }
      return false;
    }
    return true;
  }

簡單點說,如果該TaskTracker上執行失敗的任務太多的話,就不適合繼續分配任務了。那么, 到底什么叫失敗任務過多呢?即:

taskTrackerFailedTasks >= maxTaskFailuresPerTracker,表明失敗任務過多了。

maxTaskFailuresPerTracker是這么來的:

  public int getMaxTaskFailuresPerTracker() {
    return getInt("mapred.max.tracker.failures", 4); 
  }

也就是由默認4個以上任務失敗了,就不適合分配任務。當然,這里還有一個條件:

flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT))

flakyTaskTrackers是超過4個失敗任務的TaskTracker的數量,如果這個數量少於整個集群大小的25%:

  private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;

那么就不合適分配,去其它TaskTracker上分配。當然,如果每台機器都失敗了很多任務,那么可能就不是機器的問題,可能是在某些時候出現了一些有問題的任務,不應該把問題歸結為機器,機器仍然可以分配任務。

 

2,檢查TaskTracker是否有足夠資源來執行任務:

    // Check to ensure this TaskTracker has enough resources to 
    // run tasks from this job
    long outSize = resourceEstimator.getEstimatedMapOutputSize();
    long availSpace = tts.getResourceStatus().getAvailableSpace();
    if(availSpace < outSize) {
      LOG.warn("No room for map task. Node " + tts.getHost() + 
               " has " + availSpace + 
               " bytes free; but we expect map to take " + outSize);

      return -1; //see if a different TIP might work better. 
    }

getEstimatedMapOutputSize方法對Map任務的輸出需要占的空間進行估計,這里的空間是指磁盤大小,如果一個機器磁盤空間都不夠,而Map任務執行后的中間結果不存入HDFS,而是存入本地文件系統(MapReduce的設計原則之一),就會導致Map任務失敗。不過這個任務還沒執行,怎么知道要輸出多少中間數據呢?Hadoop是根據歷史經驗來判斷的,以前的Map任務輸出了多少內容,在TaskStatus這個類里面會記錄,而ResourceEstimator這個類就是進行空間估計的,里面有一個方法:

  protected synchronized void updateWithCompletedTask(TaskStatus ts, 
      TaskInProgress tip) {

    //-1 indicates error, which we don't average in.
    if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
      completedMapsUpdates++;

      completedMapsInputSize+=(tip.getMapInputSize()+1);
      completedMapsOutputSize+=ts.getOutputSize();

      if(LOG.isDebugEnabled()) {
        LOG.debug("completedMapsUpdates:"+completedMapsUpdates+"  "+
                  "completedMapsInputSize:"+completedMapsInputSize+"  " +
                  "completedMapsOutputSize:"+completedMapsOutputSize);
      }
    }
  }

可以看出,這個類對以前任務執行的結果進行統計,就得到了平均一個Map任務到底會占用多少本地空間。而估計新Map任務所占空間的方法也就依賴於此,就不細講了。

另外,TaskTracker會把自己的剩余空間通過心跳傳遞過來,兩個比一下,如果不夠,自然就不合適分配任務。

 進行了一些預先檢測后,要開始調度任務了,任務的調度順序是怎樣的呢?是不是從JobInProgress隊列里面逐一取出Job,然后再從該Job里面的TaskInProgress隊列里逐一取出Map任務,分給這台機器呢?並不是,Hadoop把隊列里的任務分為四類進行調度,首先對執行失敗的任務進行調度,如果一個任務執行失敗了,按理來說應該馬上再次執行,因為這類任務時間比較緊迫,而且Hadoop的JobTracker在調度這類任務的時候並不區分數據本地性,也就是不管這類任務要處理的數據是不是在跟自己心跳的這台機器(或者是不是一個機架內等等),總之都調度給他。換句話說,Map階段實際上也是有可能存在數據從一台機器拷貝到另一台機器的可能,並不是嚴格的數據本地性。

不過有一個問題,如果一個任務在機器A上已經執行失敗了,再調度給它很可能還會失敗,所以調度的都是其它機器執行失敗的任務。

失敗的任務優先級最高。之后是調度那些還沒運行的任務,也就是一般的正在等待中的任務,之后是推測任務,也就是那些執行得很慢的任務,需要冗余執行。在調度還沒運行的任務、以及正在運行的但是很慢的任務(正在運行的速度也不錯的任務當然就不需要調度了)的過程中,會遵從先本地、再機架、再機架外、最后無位置信息的任務查找順序,這就是數據本地性的實現方式,按照優先級逐步查找任務。簡單點說,心跳節點知道IP地址了,一個任務與Split緊密對應,而Split的位置信息可以由HDFS提供出來,兩者一比較就知道這個任務運行在心跳節點上是否划算了。

 

3,先找失敗的任務,分配給這個節點。

 其代碼為:

    // 0) Schedule the task with the most failures, unless failure was on this
    //    machine
    tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
    if (tip != null) {
      // Add to the running list
      scheduleMap(tip);
      LOG.info("Choosing a failed task " + tip.getTIPId());
      return tip.getIdWithinJob();
    }

可見,利用了findTaskFromList這個方法,從failedMaps也就是失敗的Map任務隊列里取出任務。failedMaps是一個SortedSet<TaskInProgress> 對象,存在一個自定義的排序准則,就是任務失敗的次數:this.failedMaps = new TreeSet<TaskInProgress>(failComparator),failComparator的定義如下:

  // keep failedMaps, nonRunningReduces ordered by failure count to bias
  // scheduling toward failing tasks
  private static final Comparator<TaskInProgress> failComparator =
    new Comparator<TaskInProgress>() {
      @Override
      public int compare(TaskInProgress t1, TaskInProgress t2) {
        if (t1 == null) return -1;
        if (t2 == null) return 1;
        
        int failures = t2.numTaskFailures() - t1.numTaskFailures();
        return (failures == 0) ? (t1.getTIPId().getId() - t2.getTIPId().getId())
            : failures;
      }
    };

也就是說,失敗次數最多的那些Map任務被先調度。

后面我們可以看到,后面的幾類任務,都是使用了findTaskFromList這個方法,所以對這個方法進行分析。這個方法輸入參數的說明為:

  /**
   * Find a non-running task in the passed list of TIPs
   * @param tips a collection of TIPs
   * @param ttStatus the status of tracker that has requested a task to run
   * @param numUniqueHosts number of unique hosts that run trask trackers
   * @param removeFailedTip whether to remove the failed tips
   */

即從一個任務集合里面找出一個任務。整個過程就是循環地對集合里面的任務進行判斷。其代碼為:

    Iterator<TaskInProgress> iter = tips.iterator();
    while (iter.hasNext()) {
      TaskInProgress tip = iter.next();

      // Select a tip if
      //   1. runnable   : still needs to be run and is not completed
      //   2. ~running   : no other node is running it
      //   3. earlier attempt failed : has not failed on this host
      //                               and has failed on all the other hosts
      // A TIP is removed from the list if 
      // (1) this tip is scheduled
      // (2) if the passed list is a level 0 (host) cache
      // (3) when the TIP is non-schedulable (running, killed, complete)
      if (tip.isRunnable() && !tip.isRunning()) {
        // check if the tip has failed on this host
        if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
             tip.getNumberOfFailedMachines() >= numUniqueHosts) {
          // check if the tip has failed on all the nodes
          iter.remove();
          return tip;
        } else if (removeFailedTip) { 
          // the case where we want to remove a failed tip from the host cache
          // point#3 in the TIP removal logic above
          iter.remove();
        }
      } else {
        // see point#3 in the comment above for TIP removal logic
        iter.remove();
      }
    }
    return null;

可見,如果一個任務可以運行,並且沒有在運行,那么再判斷兩個條件,如果沒有在這個機器上失敗過,那么表明可以調度;或者已經在所有機器上都失敗了,那也可以調度。因為所有機器上都失敗了,應該不是機器的問題了,可能是任務本身存在什么問題,不管怎樣,既然失敗了,還是繼續調度,如果多執行幾次都失敗了,那當然就沒法了。可能就停止了,向用戶報錯。

 

4,因為findNewMapTask這個方法返回的是某個Job的Map任務的編號,所以如果沒有找到失敗的任務,就接着調度那些未運行的普通任務。從前面傳進來的參數來看,只查找那些數據位於本地或者同一機架內的任務,調度到心跳TaskTracker執行。

在這類任務的調度里,考慮了數據本地性,我們對數據本地性再進行一些解釋,這個概念在Hadoop是如此重要,值得大書特書。Hadoop里的數據本地性反應的是要執行任務的機器與這個任務處理的數據所在機器之間的距離,這兩台機器如果是一台機器當然最好,就省得跨機器搬移數據了,這稱為local;如果兩台機器不是一台,不過位於一個機架,那么就稱為Rack-Local,熟悉數據中心機房的同學都知道,目前機房的一般方案是有很多機架,每個機架放着很多1U、2U(不知道1U是多高的建議百度,這是基本概念)的服務器,一個機架可以放多達幾十台服務器,一般有一個交換機,稱為Top-Of-Rack,交換機都放在一個機架上面,這個機架的所有服務器都用以太網線或者光纜往上走線連接到這個交換機(比如24口等等),也就是說,在一個機架內部兩台服務器要通信,需要經過這個交換機一次,如果兩個機架的兩台服務器要通信,則至少需要經過兩個交換機,交換機之間的走線在機架上面。所以,除了Rack-Local,稍微遠點的稱為Off-Switch,如果繼續細分,還可以分為兩台服務器是否在一個機房,還是跨越了機房等等。因此,Hadoop把這種呈現樹狀的結構稱為網絡拓撲(NetworkTopology),網絡拓撲中的服務器或者機架稱為節點(Node),需要注意的是Hadoop所說的Node並不一定是一台服務器,可以是一個機架,甚至是一個數據中心,都抽象為一個節點。節點存在一個名稱,比如"/default-rack"是一個節點的名字,可以表示為一個機架,機架下面的服務器Server1可以表示為"/default-rack/Server1",而這個服務器的父節點即為這個機架。這樣的好處是如果知道了服務器擁有哪些本地任務,那么通過計算就得到機架擁有哪些任務,其實就是機架下面這些服務器所有任務的並集。

說到這里,可能會有一個疑問,Hadoop怎么知道哪些服務器在同一個機架下面呢?實際上這是需要用戶按照一個格式寫個配置腳本,Hadoop通過解析這個腳本知道的,於是知道了服務器之間的距離,比如XML里,一個機架標簽里面把服務器的IP地址全部填進去即可。

有了上面的背景知識,需要着重理解Hadoop里面的關鍵對象。在JobTracker中有幾個比較重要的對象,首先一個是: 

// NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;

這個變量記錄了該節點對應的任務。比如對於一個服務器而言,其對應的Map任務就是那些處理數據位於該服務器的任務,也就是本地任務;舉例而言,假如一個作業需要對一個大文件進行處理,該大文件分為5份:F1,F2,F3,F4,F5,分別存儲於3台服務器:S1、S2、S3,對應存儲關系(依賴於HDFS告知MapReduce)為:

S1:F1、F2、F3、F4

S2:F2、F3、F4、F5

S3:F3、F4、F5、F1

那么,S1這個節點對應的任務也就是處理F1、F2、F3、F4的這四個任務;S2這個節點對應的任務也就是處理F2、F3、F4、F5的這四個任務。

Job在初始化的時候,會根據Splits(即=5)分別創建5個任務,即5個TaskInProgress對象,然后會執行一個createCache方法,將這些任務分別與節點對應起來:

  private Map<Node, List<TaskInProgress>> createCache(
                                 TaskSplitMetaInfo[] splits, int maxLevel)
                                 throws UnknownHostException {
    Map<Node, List<TaskInProgress>> cache = 
      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);

其中maxLevel也就是記錄到哪個級別,默認是    this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL,即為2,就是我們前面提到的,本機和本機架兩個層次。也就是記錄了Local和Rack-Local這兩個級別。對應於上面的例子,第一級就是記錄S1等與各個任務的關系,第二級會記錄S1、S2、S3這些服務器的所有父節點(也就是機架)對應的任務,顯然,因為一個機架會有很多服務器,所以一個機架會對應很多任務。比如S1、S2,S3屬於兩個機架C1、C2,那么,會有以下對應關系:

C1:對應於S1和S2任務的並集;

C2:對應於S3的任務。

最終,nonRunningMapCache這個Map就記錄了以下內容:

S1、S2、S3對應的任務,以及C1、C2對應的任務。

為什么要這么記錄下來呢?理論來說,在任務分配的時候,逐一取出Job,然后逐一取出Job里面事先創建好的TaskInProgress,通過查詢其Split可以獲知該任務位於哪個機器,然后再根據網絡拓撲計算心跳機器與該機器之間的距離,不過這樣做豈不是耗時較長,本來,任務分配就是當心跳發生時JobTracker執行的,速度當然越快越好,所以Hadoop的策略是當Job初始化時,就創建好任務,並且將這些任務與機器的對應關系緩存起來,所以命名為nonRunningMapCache,即還未運行的Map任務的緩存信息。當某個機器與JobTracker心跳時,因為這個映射表的索引就是Node,如果要查找本地任務,則可以以TaskTracker對應的Node為索引,就快速獲得了本地任務;而如果要查詢機架內任務,則獲得其父節點直接查詢即可,進而快速完成滿足數據本地性的任務分配。
所以,在我們分析任務分配的時候,一定要有以下基本認識:

1)一個JobTracker有一個JobInProgress隊列,所有Job在初始化的時候,都會創建TaskInProgress對象數組(作業是隊列,存在到達順序;任務是數組,無先后順序),每個JobInProgress會有以下對象數組:

  TaskInProgress maps[] = new TaskInProgress[0];
  TaskInProgress reduces[] = new TaskInProgress[0];
  TaskInProgress cleanup[] = new TaskInProgress[0];
  TaskInProgress setup[] = new TaskInProgress[0];

其中,cleanup和setup用於任務執行的開始和結束時候的初始化和清理等工作。

2)雖然上面的任務數組已經記錄了該作業的所有任務,但是還存在一些數據結構對這些任務進行了另外形式的記錄,首先是目前還未運行的Map任務的映射表,以節點Node為索引,將所有Map任務打亂了重新記錄,以提升后面任務分配的速度,如果已經分配了,這些任務就會從這個表里面移除掉;以及正在運行的Map映射表;以及非本地的Map,這些Map任務的運行比較倒霉,並不是以數據本地化進行運行的,其原因有可能是那台服務器已經沒有計算資源了,或者運行失敗了被調度到其它機器了等等;另外還會記錄失敗的任務。這些所有的數據結構幾乎都是為了更好地對任務進行合理、高效分配而創建的。仔細想想,假如我們要來寫這一段任務分配代碼,我們自然而然也會加入一些額外的數據結構來記錄這些任務。

  // NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;
  // Map of NetworkTopology Node to set of running TIPs
  Map<Node, Set<TaskInProgress>> runningMapCache;
  // A list of non-local, non-running maps
  final List<TaskInProgress> nonLocalMaps;
  // Set of failed, non-running maps sorted by #failures
  final SortedSet<TaskInProgress> failedMaps;
  // A set of non-local running maps
  Set<TaskInProgress> nonLocalRunningMaps;
  // A list of non-running reduce TIPs
  Set<TaskInProgress> nonRunningReduces;
  // A set of running reduce TIPs
  Set<TaskInProgress> runningReduces;

有了上面的認識,我們來看分配一個普通任務的過程。繼續分析findNewMapTask方法。

首先獲得心跳機器對應的節點,該節點顯然是一個服務器節點(不是機架節點):

    Node node = jobtracker.getNode(tts.getHost());

另外,注意tts表示TaskTrackerStatus,是心跳的那台TaskTracker主動發過來的。

獲得了該節點后,就以該Node為索引,去剛才分析過的那個映射表nonRunningMapCache里面查找該節點對應的任務,這些任務是該節點的本地任務,所以應該說是調度的首選,如果不存在這類任務,那么就取出該節點的父節點對應的任務列表,也就是說,如果有TaskTracker對應的本地任務,那就最好;如果沒有,那就找出那些與TaskTracker位於同一機架內的任務,也行,其代碼為:

      for (level = 0;level < maxLevelToSchedule; ++level) {
        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
        if (cacheForLevel != null) {
          tip = findTaskFromList(cacheForLevel, tts, 
              numUniqueHosts,level == 0);
          if (tip != null) {
            // Add to running cache
            scheduleMap(tip);

            // remove the cache if its empty
            if (cacheForLevel.size() == 0) {
              nonRunningMapCache.remove(key);
            }

            return tip.getIdWithinJob();
          }
        }
        key = key.getParent();
      }

其中,level從0開始到maxLevelToSchedule表示按照本地、機架的順序依次取出任務來進行分配。因為findNewMapTask每次返回一個任務號,所以如果存在本地任務,自然優先返回這類任務。

上面的maxLevelToSchedule是方法參數maxCacheLevel和maxLevel的較小者。從代碼來看,默認都是2。也就是會獲取Level=0,1兩種情況下的任務。也就是本地或同一個機架內的任務。

當獲得了一個任務后,會把它從nonRunningMapCache中移除掉,這個代碼在findTaskFromList這個方法里,前面已經看過。

獲得了任務,就執行scheduleMap這個方法,這個方法顧名思義就是調度任務的意思,實際上,所謂的調度任務,就是把這個任務加入到正在運行任務的那個數據結構中,沒有特別的東西,完全只是記錄改變了。前面看到,除了nonRunningMapCache這個映射表,還有runningMapCachenonLocalRunningMaps兩個數據結構,分別記錄當前有位置信息的正在運行的Map任務,以及無位置信息的正在運行的Map任務。

任務調度的部分代碼為(存在位置信息時):

    for(String host: splitLocations) { Node node = jobtracker.getNode(host); for (int j = 0; j < maxLevel; ++j) { Set<TaskInProgress> hostMaps = runningMapCache.get(node); if (hostMaps == null) { // create a cache if needed hostMaps = new LinkedHashSet<TaskInProgress>(); runningMapCache.put(node, hostMaps); } hostMaps.add(tip); node = node.getParent(); } }

可見, 是對於所有Split的主機集合,獲得該主機的節點,之后從runningMapCache中去找找有沒有(比如已經處於運行狀態了,有的任務可能調度多次,運行多個),如果沒有就加進去。

另外,從上面的代碼key = key.getParent()可以看出,如果沒有本地性的任務,就按照距離由近及遠尋找其他任務。這樣,調度本地化(但不算完全數據本地性)的任務就結束了。

另外,從下面最后一行比較關鍵的代碼可以看出,尋找本地任務和機架內節點任務的工作就結束了,程序不會往下執行:

    if (node != null) {
      Node key = node;
      int level = 0;
      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
      // called to schedule any task (local, rack-local, off-switch or
      // speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
      // findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
      // off-switch/speculative tasks
      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
      for (level = 0;level < maxLevelToSchedule; ++level) {
。。。。。。。。。。。
      // Check if we need to only schedule a local task (node-local/rack-local)
      if (level == maxCacheLevel) {
        return -1;
      }
    }

因為level從0到達maxLevelToSchedule的循環后,level的值等於maxLevelToSchedule,而因為傳入的參數maxCacheLevel=maxLevel,maxLevel=2,也就是兩者相同,level最后的值就等於maxCacheLevel,所以如果在本地和機架內沒有找到任務,就返回-1,也就是沒有找到。至於后面的代碼,下面的分析可以看到也會得到執行,只不過是賦了新的參數,又重新調用了這個方法,執行后面的代碼。

 

5,findNewMapTask這個核心方法暫時告一段落,實際上我們分析了前半部分,前半部分的功能是在本地和機架內尋找任務。該方法會返回一個任務號(return tip.getIdWithinJob(); 沒有則返回-1,見上面的代碼)。我們假設已經在本地或機架內找到了任務(如果沒找到下面再分析),之后回到JobInProgress的方法obtainNewMapTaskCommon中。

接下來的代碼為:

    Task result = maps[target].getTaskToRun(tts.getTrackerName());
    if (result != null) { addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true); // DO NOT reset for off-switch! if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) { resetSchedulingOpportunities(); } } return result;

也就是創建要運行的任務。getTaskToRun這個方法會創建一個TaskAttemptID,並創建任務對象:

    Task t = null; if (isMapTask()) { if(LOG.isDebugEnabled()) { LOG.debug("attempt " + numTaskFailures + " sending skippedRecords " + failedRanges.getIndicesCount()); } t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(), numSlotsNeeded); } else { t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded); }

這里需要注意不同的任務類。TaskInProgress是Job在初始化過程中就創建好了的,而MapTask是在分配任務時創建的,是可以序列化並最后會傳到TaskTracker端的,而TaskInProgress只是JobTracker維護的。基本上可以這么理解,TaskInProgress用於任務在隊列中等待的階段;MapTask,ReduceTask則記錄一個要執行的任務的主要參數,比如Job文件位置,任務ID,Split的位置信息等等。

接下來是JobInProgress對任務的各類狀態、參數進行記錄,在方法addRunningTaskToTIP中,比如任務ID啊,任務名啊,任務的本地特性,有下面一些類型:

enum Locality { NODE_LOCAL, GROUP_LOCAL, RACK_LOCAL, OFF_SWITCH }

從上面可以看到,Hadoop這一版本還不支持數據中心之間的調度策略,不支持跨機房調度,它將跨越了一個機架的那些機器認為都一樣。另外,任務是第一次執行還是推測執行(也就是太慢了被重新調度執行)有下面的枚舉變量:

enum Avataar { VIRGIN, SPECULATIVE }

Avataar中文是阿凡達,天神下凡的意思,主要有兩種類型,碼農的英文比較詭異,VIRGIN用來表示這是任務的第一次執行,SPECULATIVE表示二次調度執行,這是哪個碼農取的名字?

此時,將創建的Task返回至JobQueueTaskScheduler的assignTasks方法。

 

6,上面我們假設在本地或機架內找到了任務,如果沒有找到任務呢?那么,上面的getTaskToRun等等代碼不會得到執行,而會直接返回null:

    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel, 
                                status.mapProgress());
    if (target == -1) {
      return null;
    }

此時,直接返回到JobQueueTaskScheduler的assignTasks方法。可以看出,如果經過obtainNewNodeOrRackLocalMapTask的執行返回null,也就是在本地和機架內沒有找到任務,則會往下執行另一個方法:obtainNewNonLocalMapTask

 

          t =  job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            。。。。。。。。。。。
          }          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());

 

我們前面提過,任務有幾種調度順序,再來小結一下,首先是調度那些失敗的任務,因為這些任務比較緊急;其次是未運行的任務以及運行得很慢的任務,在尋找未運行任務的過程中,首先查找本地任務,之后查找機架內的任務。obtainNewNodeOrRackLocalMapTask限定了只找兩層節點,本地或父節點,父節點即機架內)這兩種任務的查找我們已經分析過了,如果沒找到就返回null,此時obtainNewNonLocalMapTask的執行實際上就是去尋找后面兩種任務:執行較慢的任務,以及沒有位置信息的任務。

這部分代碼的不好理解之處就在於obtainNewNonLocalMapTask和obtainNewNodeOrRackLocalMapTask一樣,都是調用了obtainNewMapTaskCommon這個方法,唯一的區別就是參數不同:

  public synchronized Task obtainNewNodeOrRackLocalMapTask( TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) throws IOException { return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel); } public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, int clusterSize, int numUniqueHosts) throws IOException { return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, NON_LOCAL_CACHE_LEVEL); }

可以看出,上一個傳的是maxLevel,默認是2,在本地和機架內查找;后一個傳的參數是NON_LOCAL_CACHE_LEVEL,默認是-1,也就是不關心位置信息了。其實這個參數輸入后,前面我們分析過的從本地開始逐層往上尋找任務的代碼就不會得到執行,只會執行無位置信息和推測執行的部分代碼。具體可見:

 

    if (node != null) {
      Node key = node;
      int level = 0;
      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
      for (level = 0;level < maxLevelToSchedule; ++level) {
。。。。。。。。。。。。。。。。。
      }
      
      // Check if we need to only schedule a local task (node-local/rack-local)
      if (level == maxCacheLevel) {
        return -1;
      }
    }

 

因為此時輸入的參數maxCacheLevel=-1,所以maxLevelToSchedule=-1,因此就不會執行這個查找本地和機架內任務的循環體代碼。而是直接跳出。執行接下來的代碼。

 

 

7,如果在本地和機架內找不到任務,則需要擴大范圍去找,應該按照什么順序去找呢?這就是我們需要分析的findNewMapTask這個方法的接下來的代碼。首先獲得所有最高級別的節點,也就是說,直接獲取最高層的那些節點(不管有幾層了,最高層總是含有更多任務):

    // collection of node at max level in the cache structure
    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();

這些節點比如就是機架等等層次較高的節點。這些節點可能是心跳TaskTracker所對應主機的父節點,或父節點的父節點等等。對於這些父節點,逐一去緩存中查找。不過因為在上面的查找過程(obtainNewNodeOrRackLocalMapTask)中,已經找了該節點對應的任務以及該節點所在機架對應的任務,所以需要跳過這些已經查找過的節點,去其它父節點中去找。查找方式和前面一樣。這種找的策略Hadoop稱為breadth-wise across parents at max level,是一種類似廣度優先遍歷的思想,比如其節點的叔叔節點、伯父節點(如果只有兩層)、爺爺節點(如果有三層)去找。此時的尋找仍然是在緩存nonRunningMapCache中尋找的。

 

8、如果利用廣度優先遍歷的方法在nonRunningMapCache中都找不到呢?看起來更高層的節點都找了,會是什么原因?從代碼來看主要原因是Split沒有帶着位置信息。按理來說Split是有位置信息的,但如果底層不是運行在HDFS上,或者其他一些原因(待考慮),也就無法得到Split的位置信息,此時nonRunningMapCache甚至可能會為空集。前面在分析createCache的時候暫時忽略了一點,就是如果作業上傳了Split的位置信息,任務會加入到映射表nonRunningMapCache中,該映射表的索引是Node,值是任務列表;而如果沒有Split的位置信息呢?會加入到另外一個任務列表中,也就是nonLocalMaps,表示缺少位置信息條件下的那些還未運行的Map任務的列表,自然不是一個映射表了,因為無法獲知Map任務要處理的Split到底在哪台機器上,也就是沒有Node節點信息,只是一個List對象:

    this.nonLocalMaps = new LinkedList<TaskInProgress>();

在createCache方法中,其代碼為:

    for (int i = 0; i < splits.length; i++) {
      String[] splitLocations = splits[i].getLocations();
      if (splitLocations == null || splitLocations.length == 0) {
        nonLocalMaps.add(maps[i]);
        continue;
      }

即對於所有Split,如果缺乏位置信息,就加入到這個列表中。因此,在分配任務時,按照先nonRunningMapCachenonLocalMaps的順序分配,其代碼與上面一樣:

    // 3. Search non-local tips for a new task
    tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
    if (tip != null) {
      // Add to the running list
      scheduleMap(tip);

      LOG.info("Choosing a non-local task " + tip.getTIPId());
      return tip.getIdWithinJob();
    }

 

9,終於完成了最復雜,也是最普通的未運行任務分配過程的分析了,如果上面的任務都沒找到,那么就進入下一類任務的分配,也就是推測執行的任務,前面已經提到,如果一個任務在運行,但是進度很慢,Hadoop會考慮將其多次調度,這樣的話,可能可以降低最慢那個任務的影響。

與前面通用的方法findTaskFromList類似,查找推測任務的方法是findSpeculativeTask。如果明白了前面查找普通任務的順序,這里也就簡單了,和前面一樣,只是要到正在運行的任務緩存中查找,這種緩存分為兩類,即存在位置信息的和不存在位置信息的。首先在runningMapCache中尋找(前面是nonRunningMapCache),如果找到這種任務,就返回;如果沒找到,就在nonLocalRunningMaps(前面是nonLocalMaps,我認為這個名字應該改為nonLocalnonRunningMaps,否則容易引起歧義)中尋找。而在runningMapCache中尋找的過程中,也是有兩個階段,首先是從TaskTracker對應的節點開始找,然后擴展到機架等等,如果沒有,就廣度優先,在父節點的兄弟節點、爺爺節點等去找。具體代碼流程與尋找普通任務幾乎一樣。因此,唯一有較大區別的就是findSpeculativeTask這個方法。

比較一下這兩個方法,可以看出,findSpeculativeTask多了avgProgress這個參數,表示當前這個Job的所有Map任務的平均執行進度,這個參數我們前面提過。currentTime表示當前時間。

  private synchronized TaskInProgress findTaskFromList(
      Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
      int numUniqueHosts, boolean removeFailedTip)

  protected synchronized TaskInProgress findSpeculativeTask(
      Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
      double avgProgress, long currentTime, boolean shouldRemove) 

還是依次獲取list中的任務,如果正在運行,則不在考慮之列:

      // should never be true! (since we delete completed/failed tasks)
      if (!tip.isRunning() || !tip.isRunnable()) {
        iter.remove();
        continue;
      }

否則檢測TaskInProgress這個任務是否需要推測執行,采用的方法是TaskInProgress這個類中的hasSpeculativeTask方法:

  /**
   * Return whether the TIP has a speculative task to run.  We
   * only launch a speculative task if the current TIP is really
   * far behind, and has been behind for a non-trivial amount of 
   * time.
   */
  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
    //
    // REMIND - mjc - these constants should be examined
    // in more depth eventually...
    //

    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
        (averageProgress - progress >= SPECULATIVE_GAP) &&
        (currentTime - startTime >= SPECULATIVE_LAG) 
        && completes == 0 && !isOnlyCommitPending()) {
      return true;
    }
    return false;
  }

其代碼比較簡單,主要是在幾個條件下判斷是否應該推測執行。首先看skipping,該標志記錄了該任務是否不應該進行推測執行,其代碼為:

  /**
   * Get whether to start skipping mode. 
   */
  private boolean startSkipping() {
    if(maxSkipRecords>0 && 
        numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
      return true;
    }
    return false;
  }

  public static int getAttemptsToStartSkipping(Configuration conf) {
    return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
  }

可以看出,如果失敗的次數已經大於默認的2次,就應該跳過這個任務,因為可能是任務本身有問題,繼續推測執行也沒用。

另外一個條件是activeTasks.size() <= MAX_TASK_EXECS,其中MAX_TASK_EXECS=1,也就是activeTasks.size()<=1。

activeTasks記錄了當前這個任務(類似一個類)正在執行(類似實例化對象)的數量,其定義為:

  // Map from task Id -> TaskTracker Id, contains tasks that are
  // currently runnings
  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();

是一個映射表,索引是TaskAttemptID,值是TaskTracker的ID。

前面已經分析過,一個任務可能有多個執行實例。每個實例用TaskAttemptID表示,一個例子是(從其它博客拷過來的):

”attempt_201108091551_0001_m_000000_0表示2011年8月9日15時51分啟動的JobTracker中第0001號作業的第000000號map task的第0號task attempt“

從前面可以看出,如果activeTasks.size()>1,也就是說,已經有多個執行實例了,那就別再調度了,這個很容易理解。

另外一個條件是averageProgress - progress >= SPECULATIVE_GAP,SPECULATIVE_GAP等於0.2,averageProgress是整個Job的所有Map任務的平均執行進度,是作為參數傳到這個方法中的,progress則表示某個任務的進度。一個任務的進度是怎么算出來的呢?是TaskInProgress類中的recomputeProgress這個方法計算出來的,從代碼來看,主要過程為(省略了一些其他代碼):

  void recomputeProgress() {
    if (isComplete()) {
      this.progress = 1;
    } else if (failed) {
      this.progress = 0;
    } else {
      double bestProgress = 0;
      String bestState = "";
      Counters bestCounters = new Counters();
      for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
        TaskAttemptID taskid = it.next();
        TaskStatus status = taskStatuses.get(taskid);
        if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
          bestProgress = 1;
          break;
        } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
          //for COMMIT_PENDING, we take the last state that we recorded
          //when the task was RUNNING
          bestProgress = this.progress;
        } else if (status.getRunState() == TaskStatus.State.RUNNING) {
          if (status.getProgress() >= bestProgress) {
            bestProgress = status.getProgress();
          }
        }
      }
      this.progress = bestProgress;
    }
  }

實際上也就是說,如果完成了,進度就是1;如果失敗了,進度就是0;否則,對於該任務的所有TaskAttemptID,通過其status.getProgress()得到TaskAttemptID的進度,status是一個TaskStatus對象,描述了一個TaskAttempt的狀態,Hadoop里面命名不是特別規范,這里如果用TaskAttemptStatus我覺得更好理解,要我來寫,我會分別寫TaskAttemptStatus和TaskStatus兩個類,前一個記錄每次執行的狀態,后一個記錄Map任務總體的狀態(實際上就來自於前一個狀態的融合)。總之,簡而言之,這些TaskAttempt的進度的最大值就是當前這個任務的進度。

如果Job的所有Map任務的平均進度超過某個Map任務進度的20%,那么這個落后的Map任務就應該調度(當然還不一定,還取決於其它條件)進行推測執行。

再來看另一個條件:currentTime - startTime >= SPECULATIVE_LAG,這個條件含義很明顯,就是一個任務在啟動時可能人家是慢一點,但說不定是有一些其它原因,比如服務器IO不行剛開始都在讀數據,比別人慢一些,但說不定CPU頻率高而在處理時后發制人呢,不過如果超過了SPECULATIVE_LAG=60000,也就是1分鍾了還是很慢,那就有問題了。

completes == 0這個條件也很簡單,就是沒有TaskAttempt執行完了。這種條件似乎與前面的條件存在冗余,前面已經說了,進度就是與平均進度拉開了至少80%,也就是說,最快的那個TaskAttempt也沒完,這里再加一個這種判斷是什么意思,如果有一個完成了,豈不是進度已經為1?可能為了保險起見把,或者這兩個標志在計算因為理論上肯定有一定延時(哪怕很短),比如獲知某個TaskAttempt執行完了,准備更新進度之前,這中間另一個線程要尋找推測執行的任務,發現進度並沒達到1,如果不加完成標志,可能會引起冗余調度。個人猜想。

最后一個條件是!isOnlyCommitPending(),直觀含義是沒有處於准備提交的狀態。其代碼為:

  public boolean isOnlyCommitPending() {
    for (TaskStatus t : taskStatuses.values()) {
      if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
        return true;
      }
    }
    return false;
  }

一個TaskAttempt的執行階段和狀態有下面一些:

  // what state is the task in?
  public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}

從狀態來看,主要有運行中、成功、失敗、未分配、被殺掉、准備提交、清理失敗、被殺死但還未清理等等。准備提交應該就是指某一個在執行的任務其實已經執行完了,也就是等着最后的提交,這類任務就不要再重復執行了。

只要滿足了上面的條件,這些任務就會被提取出來。

 

10,終於,主要的任務尋找過程已經分析完畢。接下來按照前面obtainNewNodeOrRackLocalMapTask一樣,如果尋找到無位置信息的任務或者推測執行的任務,則構造MapTask對象返回。回到JobQueueTaskScheduler的assignTasks方法,如果找到了這類任務,那么不再循環找其它任務了,一次心跳只能返回一個這類任務,另外,在尋找本機和機架內任務的時候,如果滿足了需要預留資源的條件,也只能一次心跳至多返回一個任務,我們再來看看其主要流程:

    for (int i=0; i < availableMapSlots; ++i) {
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }
          Task t = null;          
          // Try to schedule a Map task with locality between node-local 
          // and rack-local
          t =   job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            if (exceededMapPadding) {
              break scheduleMaps;
            }           
            // Try all jobs again for the next Map task 
            break;
          }
          // Try to schedule a node-local or rack-local Map task
          t =  job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numNonLocalMaps;
            // We assign at most 1 off-switch or speculative task
            // This is to prevent TaskTrackers from stealing local-tasks
            // from other TaskTrackers.
            break scheduleMaps;
          }
        }
      }
    }

可以看出,有兩種情況會跳出break scheduleMaps;也就是不再執行任務分配工作,否則,會對該心跳TaskTracker的所有目前可用的availableMapSlots進行循環分配,現在可以看出,所謂任務分配,就是對於所有心跳TaskTracker的所有可用資源,逐一按照某種順序尋找任務。

 

11,接下來是Reduce任務的分配,相對Map任務而言要簡單一些。首先也是計算目前可用的Reduce Slot數目(就是可以分配幾個Reduce):

    //
    // Same thing, but for reduce tasks
    // However we _never_ assign more than 1 reduce task per heartbeat
    //
    final int trackerCurrentReduceCapacity = 
      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
               trackerReduceCapacity);
    final int availableReduceSlots = 
      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);

從availableReduceSlots來看,每次頂多分配一個Reduce任務給心跳TaskTracker。

同樣,執行exceededReducePadding方法檢測一下是否應該預留資源。調用的都是exceededPadding這個方法,與Map任務時相同。

之后,順序取出JobInProgress隊列里面的作業,執行obtainNewReduceTask方法。這個方法就沒有Level之類的拓撲的概念,因為Map需要考慮數據本地性原則,而Reduce本身就需要利用Shuffle去遠端機器取數據,所以不考慮網絡拓撲。

在obtainNewReduceTask方法里,首先也是估計Reduce需要的磁盤空間資源,只是此時是輸入空間的評估,Map是輸出的評估:

    long estimatedReduceInputSize = resourceEstimator.getEstimatedReduceInputSize()/2;
    if (((estimatedReduceInputSize) > 
      reduce_input_limit) && (reduce_input_limit > 0L)) {
      // make sure jobtracker lock is held
      LOG.info("Exceeded limit for reduce input size: Estimated:" + 
          estimatedReduceInputSize + " Limit: " + 
          reduce_input_limit + " Failing Job " + jobId);
      status.setFailureInfo("Job exceeded Reduce Input limit " 
          + " Limit:  " + reduce_input_limit + 
          " Estimated: " + estimatedReduceInputSize);
      jobtracker.failJob(this);
      return null;
    }

之后,判斷目前是否可以調度Reduce任務,是否可以調度Reduce任務主要是看Map任務執行到了什么進度,因為Reduce需要等到所有Map任務執行完畢后才開始,所以,Map任務還沒執行到一定程度的時候,調度過去也沒用,只會消耗資源,判斷方法是:

  public synchronized boolean scheduleReduces() {
    return finishedMapTasks + failedMapTIPs >= completedMapsForReduceSlowstart;
  }

方法看起來很簡單,就是目前完成的Map任務與失敗的Map任務數量需要大於某個門限,這個門限值默認為:

    // Calculate the minimum number of maps to be complete before 
    // we should start scheduling reduces
    completedMapsForReduceSlowstart = 
      (int)Math.ceil( (conf.getFloat("mapred.reduce.slowstart.completed.maps", 
                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *  numMapTasks));

DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART默認為0.05,也就是大致完成了5%的Map任務之后,可以開始調度Reduce任務了,這個數字也是個參考值,一般而言,有5%的Map任務已經結束了或者失敗了(至少已經執行過了),剩下的可能很快也就快完畢了,此時調度Reduce過去似乎也差不多,但具體數值可能需要調優,不知道有沒有什么理論依據。

如果可以開始調度,則執行以下代碼,和Map類似:

    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
                                    status.reduceProgress());
    if (target == -1) {
      return null;
    }
    
    Task result = reduces[target].getTaskToRun(tts.getTrackerName());
    if (result != null) {
      addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
    }

看來,findNewReduceTask也是調度Reduce任務的核心方法。

有了前面的基礎,這時候分析Reduce任務就簡單不少了。

首先是從未運行的Reduce任務集合中尋找任務

    // 1. check for a never-executed reduce tip
    // reducers don't have a cache and so pass -1 to explicitly call that out
    tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
    if (tip != null) {
      scheduleReduce(tip);
      return tip.getIdWithinJob();
    }

因為Reduce不考慮數據本地性了,所以這些未運行任務都放在一個集合中:

  // A list of non-running reduce TIPs
  Set<TaskInProgress> nonRunningReduces;

如果沒有找到,則到運行的Reduce任務集合中尋找那些需要推測執行的任務:

    // 2. check for a reduce tip to be speculated
    if (hasSpeculativeReduces) {
      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
                                jobtracker.getClock().getTime(), false);
      if (tip != null) {
        scheduleReduce(tip);
        return tip.getIdWithinJob();
      }
    }

基本就是這兩個階段。里面涉及到兩個方法:findTaskFromList、findSpeculativeTask。這兩個方法在Map任務調度時已經分析過,兩者共用,因此不用贅述了。

到此為止,assignTasks方法已經分析完畢,該方法將獲得的MapTask和ReduceTask任務打包成一個數組List<Task>返回heartbeat方法:

        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null ) {
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if(LOG.isDebugEnabled()) {
              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }

可見,如果已分配了任務,則對於每一個任務,將其加入到ExpireLaunchingTasks對象中,該對象維護了一個HashMap對象:

    /**
     * This is a map of the tasks that have been assigned to task trackers,
     * but that have not yet been seen in a status report.
     * map: task-id -> time-assigned 
     */
    private Map<TaskAttemptID, Long> launchingTasks =
      new LinkedHashMap<TaskAttemptID, Long>();

並且,ExpireLaunchingTasks本身是一個線程類,主要用於在后台記錄一個已經啟動的任務是否超期了等等。

最后,創建LaunchTaskAction對象,將其丟入ArrayList<TaskTrackerAction>數組中,准備返回心跳時返回。

注意,LaunchTaskAction是一個可序列化對象,Task也是一個可序列化對象,最后來看看Task里面有些什么內容,可見,主要有Job配置文件、用戶名、任務ID等等。

  ////////////////////////////////////////////
  // Fields
  ////////////////////////////////////////////

  private String jobFile;                         // job configuration file
  private String user;                            // user running the job
  private TaskAttemptID taskId;                   // unique, includes job id
  private int partition;                          // id within job
  TaskStatus taskStatus;                          // current status of the task
  protected JobStatus.State jobRunStateForCleanup;
  protected boolean jobCleanup = false;
  protected boolean jobSetup = false;
  protected boolean taskCleanup = false;
  
  //skip ranges based on failed ranges from previous attempts
  private SortedRanges skipRanges = new SortedRanges();
  private boolean skipping = false;
  private boolean writeSkipRecs = true;
  
  //currently processing record start index
  private volatile long currentRecStartIndex; 
  private Iterator<Long> currentRecIndexIterator = 
    skipRanges.skipRangeIterator();
  
  private ResourceCalculatorPlugin resourceCalculator = null;
  private long initCpuCumulativeTime = 0;

  protected JobConf conf;
  protected MapOutputFile mapOutputFile = new MapOutputFile();
  protected LocalDirAllocator lDirAlloc;
  private final static int MAX_RETRIES = 10;
  protected JobContext jobContext;
  protected TaskAttemptContext taskContext;
  protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
  protected org.apache.hadoop.mapreduce.OutputCommitter committer;
  protected final Counters.Counter spilledRecordsCounter;
  private int numSlotsRequired;
  private String pidFile = "";
  protected TaskUmbilicalProtocol umbilical;
  protected SecretKey tokenSecret;
  protected JvmContext jvmContext;

 

12,最后我們總結一下Map任務分配的簡單過程,因為這是任務調度中最復雜的部分:

(1)最外層的循環是心跳TaskTracker的所有可用資源,Hadoop用Slot描述一個TaskTracker的資源,比如2個Map Slot表示可以同時執行2個Map任務。對於每一個Slot,去尋找一個任務調度給它;

(2)內層循環是jobQueueJobInProgressListener這個Job隊列里面的所有Job,對於每個Job,分兩步:第一步,去查找該Job中那些失敗的任務(按照失敗次數越多越優先的原則),之后去查找與心跳TaskTracker位於同一主機或者同一機架的未運行的任務。第二步,去查找那些非本地的未運行的任務,以及正在處於運行中但是很慢的任務,在查找運行較慢的任務時,也遵循先本地,再機架,再全局,最后無位置信息的順序。

 

本節分析了JobQueueTaskScheduler的任務調度策略,這也是Hadoop默認的調度策略,另外兩種調度策略:CapacityTaskScheduler、FairScheduler。CapacityTaskScheduler是雅虎提出的,支持多個Job隊列,這樣的話可以避免單一隊列必須先進先得到調度的問題;FairScheduler是Facebook提出的,目標是使得各個作業相對公平地獲得集群資源,避免FIFO引起某些作業獨占資源的問題,總的來說,這兩種調度策略都可以防止單一作業占集群資源過多的問題,增加了一些優先級等等概念。具體的代碼留作以后分析。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM