Hadoop 三大調度器源碼分析及編寫自己的調度器


如要轉載,請注上作者和出處。  由於能力有限,如有錯誤,請大家指正。

須知: 我們下載的是hadoop-2.7.3-src 源碼。 這個版本默認調度器是Capacity調度器。 在2.0.2-alpha版本的時候,有人匯報了一個fifo調度器的bug,社區把默認調度器從原來的fifo切換成capacity了。  參考   

  在Hadoop中,調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器,然后在配置文件中指定相應的調度器,這樣,當Hadoop集群啟動時,便會加載該調度器。當前Hadoop自帶了幾種調度器,分別是FIFO(默認調度器),Capacity Scheduler和FairScheduler,通常境況下,這些調度器很難滿足公司復雜的應用需求,因而往往需要開發自己的調度器。本文介紹了Hadoop調度器的基本編寫方法,  參考1    

  Hadoop1 調度框架:Hadoop的調度器是在JobTracker中加載和調用的,用戶可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler屬性中指定調度器。本節分析了Hadoop調度器的調度框架,實際上分析了兩個重要類:TaskScheduler和JobTracker的關系。

  Hadoop2 調度框架:Hadoop的調度器是在ResourceManager中加載和調用的,用戶可以在配置文件yarn-site.xml中的yarn.resourcemanager.scheduler.class屬性中指定調度器,默認是 org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler ; 還可以配置Fifo調度器,org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler ; 還可以配置Fair調度器, org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 。 本節分析了Hadoop調度器的調度框架, 類比Hadoop1 , 三個調度器的共同擴展類是 AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> , 它的功能類似Hadoop1的TaskScheduler ; 如果用戶要編寫自己的調度器,需要繼承抽象類AbstractYarnScheduler

  MapReduce在Hadoop2中稱為MR2或YARN,將JobTracker中的資源管理及任務生命周期管理(包括定時觸發及監控),拆分成兩個獨立的服務,用於管理全部資源的ResourceManager以及管理每個應用的ApplicationMaster,ResourceManager用於管理向應用程序分配計算資源,每個ApplicationMaster用於管理應用程序、調度以及協調。一個應用程序可以是經典的MapReduce架構中的一個單獨的任務,也可以是這些任務的一個DAG(有向無環圖)任務。ResourceManager及每台機上的NodeManager服務,用於管理那台機的用戶進程,形成計算架構。每個應用程序的ApplicationMaster實際上是一個框架具體庫,並負責從ResourceManager中協調資源及與NodeManager(s)協作執行並監控任務。  參考2

  針對Hadoop 1.0中的MapReduce在擴展性和多框架支持等方面的不足,它將JobTracker中的資源管理和作業控制功能分開,分別由組件ResourceManager和ApplicationMaster實現,其中,ResourceManager負責所有應用程序的資源分配,而ApplicationMaster僅負責管理一個應用程序,進而誕生了全新的通用資源管理框架YARN。基於YARN,用戶可以運行各種類型的應用程序(不再像1.0那樣僅局限於MapReduce一類應用),從離線計算的MapReduce到在線計算(流式處理)的Storm等。Hadoop 2.0對應Hadoop版本為Apache Hadoop 0.23.x、2.x和CDH4。

架構圖:

 

其中ResourceManager包含兩個主要的組件:定時調用器(Scheduler)以及應用管理器(ApplicationManager)。

定時調用器(Scheduler): 定時調度器負責向應用程序分配置資源,它不做監控以及應用程序的狀 態跟蹤,並且它不保證會重啟由於應用程序本身或硬件出錯而執行失敗 的應用程序。

應用管理器(ApplicationManager): 應用程序管理器負責接收新任務,協調並提供在ApplicationMaster容 器失敗時的重啟功能。

節點管理器(NodeManager): NodeManager是ResourceManager在每台機器的上代理,負責容器的管 理,並監控他們的資源使用情況(cpu,內存,磁盤及網絡等),以及向 ResourceManager/Scheduler提供這些資源使用報告。

應用總管(ApplicationMaster): 每個應用程序的ApplicationMaster負責從Scheduler申請資源,以及 跟蹤這些資源的使用情況以及任務進度的監控。

1  調度器

我們先想分析調度器,首先要分析它的父類,以及父類的父類和實現接口,如 AbstractService, YarnScheduler, ResourceScheduler 以及 AbstractYarnScheduler, 如下所示:

AbstractService.java 在 hadoop-2.7.3-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/AbstractService.java 

package org.apache.hadoop.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;

import com.google.common.annotations.VisibleForTesting;

/**
 * This is the base implementation class for services.
 */
//這是服務的基本實現類。
@Public
@Evolving
public abstract class AbstractService implements Service {

  private static final Log LOG = LogFactory.getLog(AbstractService.class);

  /**
   * Service name.
   */
  //服務名稱
  private final String name;

  /** service state */
  //服務狀態
  private final ServiceStateModel stateModel;

  /**
   * Service start time. Will be zero until the service is started.
   */
  //服務開始時間。在服務開始之前為0。
  private long startTime;

  /**
   * The configuration. Will be null until the service is initialized.
   */
  //配置。在服務初始化之前為null。
  private volatile Configuration config;

  /**
   * List of state change listeners; it is final to ensure
   * that it will never be null.
   */
  //狀態更改偵聽器列表;最終確保它不為null。
  private final ServiceOperations.ServiceListeners listeners
    = new ServiceOperations.ServiceListeners();
  /**
   * Static listeners to all events across all services
   */
  //所有服務的所有事件的靜態監聽器
  private static ServiceOperations.ServiceListeners globalListeners
    = new ServiceOperations.ServiceListeners();

  /**
   * The cause of any failure -will be null.
   * if a service did not stop due to a failure.
   */
  //任何失敗的原因 - 是因為null。 如果服務沒有因為故障停止。
  private Exception failureCause;

  /**
   * the state in which the service was when it failed.
   * Only valid when the service is stopped due to a failure
   */
  //服務失敗時的狀態。僅當服務由於失敗而停止時才有效。
  private STATE failureState = null;

  /**
   * object used to co-ordinate {@link #waitForServiceToStop(long)}
   * across threads.
   */
  //對象用於協調 {@link #waitForServiceToStop(long)} 跨線程。
  private final AtomicBoolean terminationNotification =
    new AtomicBoolean(false);

  /**
   * History of lifecycle transitions
   */
  //生命周期轉換的歷史
  private final List<LifecycleEvent> lifecycleHistory
    = new ArrayList<LifecycleEvent>(5);

  /**
   * Map of blocking dependencies
   */
  //阻止依賴關系的映射
  private final Map<String,String> blockerMap = new HashMap<String, String>();

  private final Object stateChangeLock = new Object();
 
  /**
   * Construct the service.
   * @param name service name
   */
  //構造服務
  public AbstractService(String name) {
    this.name = name;
    stateModel = new ServiceStateModel(name);
  }

  /*
   * 獲取當前的服務狀態。
   * 返回:服務的狀態
   */
  @Override
  public final STATE getServiceState() {
    return stateModel.getState();
  }

  /*
   * 獲取服務失敗時引發的第一個異常。 如果為空,則不記錄任何異常
   * 返回:在轉換到停止狀態期間日志記錄的故障
   */
  @Override
  public final synchronized Throwable getFailureCause() {
    return failureCause;
  }

  /*
   * 獲取發生在{@link #getFailureCause()}中失敗的狀態。
   * 返回:狀態,如果沒有失敗,則為null
   */
  @Override
  public synchronized STATE getFailureState() {
    return failureState;
  }

  /**
   * Set the configuration for this service.
   * This method is called during {@link #init(Configuration)}
   * and should only be needed if for some reason a service implementation
   * needs to override that initial setting -for example replacing
   * it with a new subclass of {@link Configuration}
   * @param conf new configuration.
   */
  /*
   * 設置此服務的配置。當{@link #init(Configuration)}時該方法會被調用並且
   * 只有在某些原因出現,服務實現需要覆蓋該初始設置的情況下才需要這樣做 - 例如
   * 用{@link Configuration}的新子類替換它。
   */
  protected void setConfig(Configuration conf) {
    this.config = conf;
  }

  /**
   * {@inheritDoc}
   * This invokes {@link #serviceInit}
   * @param conf the configuration of the service. This must not be null
   * @throws ServiceStateException if the configuration was null,
   * the state change not permitted, or something else went wrong
   */
  //這將調用{@link #serviceInit}
  //子類的serviceInit會初始化所需服務,會創建相應的服務類然后加入服務列表
  @Override
  public void init(Configuration conf) {
    //服務配置是否為空
    if (conf == null) {
      throw new ServiceStateException("Cannot initialize service "
                                      + getName() + ": null configuration");
    }
    //服務是否已經初始化
    if (isInState(STATE.INITED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.INITED) != STATE.INITED) {
        setConfig(conf);
        try {
          //服務初始化,會進入子類的同名函數
          serviceInit(config);
          if (isInState(STATE.INITED)) {
            //if the service ended up here during init,
            //notify the listeners
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  /**
   * {@inheritDoc}
   * @throws ServiceStateException if the current service state does not permit
   * this action
   */
  //開始服務
  @Override
  public void start() {
    if (isInState(STATE.STARTED)) {
      return;
    }
    //enter the started state
    synchronized (stateChangeLock) {
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
        try {
          startTime = System.currentTimeMillis();
          serviceStart();
          if (isInState(STATE.STARTED)) {
            //if the service started (and isn't now in a later state), notify
            if (LOG.isDebugEnabled()) {
              LOG.debug("Service " + getName() + " is started");
            }
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  /**
   * {@inheritDoc}
   */
  //停止服務
  @Override
  public void stop() {
    if (isInState(STATE.STOPPED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.STOPPED) != STATE.STOPPED) {
        try {
          serviceStop();
        } catch (Exception e) {
          //stop-time exceptions are logged if they are the first one,
          noteFailure(e);
          throw ServiceStateException.convert(e);
        } finally {
          //report that the service has terminated
          terminationNotification.set(true);
          synchronized (terminationNotification) {
            terminationNotification.notifyAll();
          }
          //notify anything listening for events
          notifyListeners();
        }
      } else {
        //already stopped: note it
        if (LOG.isDebugEnabled()) {
          LOG.debug("Ignoring re-entrant call to stop()");
        }
      }
    }
  }

  /**
   * Relay to {@link #stop()}
   * @throws IOException
   */
  @Override
  public final void close() throws IOException {
    stop();
  }

  /**
   * Failure handling: record the exception
   * that triggered it -if there was not one already.
   * Services are free to call this themselves.
   * @param exception the exception
   */
  /*
   * 故障處理:記錄觸發它的異常 - 如果還沒有一個。 服務可以自由調用。
   */
  protected final void noteFailure(Exception exception) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("noteFailure " + exception, null);
    }
    if (exception == null) {
      //make sure failure logic doesn't itself cause problems
      return;
    }
    //record the failure details, and log it
    //記錄故障細節,並記錄日志
    synchronized (this) {
      if (failureCause == null) {
        failureCause = exception;
        failureState = getServiceState();
        LOG.info("Service " + getName()
                 + " failed in state " + failureState
                 + "; cause: " + exception,
                 exception);
      }
    }
  }

  /*
   * 阻止等待服務停止; 使用終止通知對象這樣做。
   * 該方法只有在執行所有服務停止操作(成功或失敗)之后才返回,或超時已過
   * 該方法可以在服務初始化或啟動之前調用; 這是為了消除任何競爭條件,服務在此事件發生之前停止。
   */
  @Override
  public final boolean waitForServiceToStop(long timeout) {
    boolean completed = terminationNotification.get();
    while (!completed) {
      try {
        synchronized(terminationNotification) {
          terminationNotification.wait(timeout);
        }
        // here there has been a timeout, the object has terminated,
        // or there has been a spurious wakeup (which we ignore)
        //這里有一個超時,對象已經終止了,或者有一個虛假的喚醒(我們忽略)
        completed = true;
      } catch (InterruptedException e) {
        // interrupted; have another look at the flag
        completed = terminationNotification.get();
      }
    }
    return terminationNotification.get();
  }

  /* ===================================================================== */
  /* Override Points */
  /* ===================================================================== */

  /**
   * All initialization code needed by a service.
   *
   * This method will only ever be called once during the lifecycle of
   * a specific service instance.
   *
   * Implementations do not need to be synchronized as the logic
   * in {@link #init(Configuration)} prevents re-entrancy.
   *
   * The base implementation checks to see if the subclass has created
   * a new configuration instance, and if so, updates the base class value
   * @param conf configuration
   * @throws Exception on a failure -these will be caught,
   * possibly wrapped, and wil; trigger a service stop
   */
  /*
   * 服務所需的所有初始化代碼。
   * 該方法只能在特定服務實例的生命周期中被調用一次。
   * 實現不需要同步機制,因為{@link #init(Configuration))中的邏輯可以防止重新進入。
   * 基本實現檢查子類是否已創建新的配置實例,如果是,則更新基類值。
   */
  protected void serviceInit(Configuration conf) throws Exception {
    if (conf != config) {
      LOG.debug("Config has been overridden during init");
      setConfig(conf);
    }
  }

  /**
   * Actions called during the INITED to STARTED transition.
   *
   * This method will only ever be called once during the lifecycle of
   * a specific service instance.
   *
   * Implementations do not need to be synchronized as the logic
   * in {@link #start()} prevents re-entrancy.
   *
   * @throws Exception if needed -these will be caught,
   * wrapped, and trigger a service stop
   */
  /*
   * 在INITED到STARTED過渡期間所采取的行動。
   * 該方法只能在特定服務實例的生命周期中被調用一次。
   * 實現不需要同步機制,因為{@link #start()}中的邏輯可以防止重新進入。
   */
  protected void serviceStart() throws Exception {

  }

  /**
   * Actions called during the transition to the STOPPED state.
   *
   * This method will only ever be called once during the lifecycle of
   * a specific service instance.
   *
   * Implementations do not need to be synchronized as the logic
   * in {@link #stop()} prevents re-entrancy.
   *
   * Implementations MUST write this to be robust against failures, including
   * checks for null references -and for the first failure to not stop other
   * attempts to shut down parts of the service.
   *
   * @throws Exception if needed -these will be caught and logged.
   */
  /*
   * 在轉換到STOPPED狀態期間調用的動作。
   * 該方法只能在特定服務實例的生命周期中被調用一次。
   * 實現不需要同步機制,因為{@link #stop()}中的邏輯可以防止重入。
   * 實現MUST寫入這個要健壯來避免失敗, 包括對空引用的檢查,以及第一個不能停止其他嘗試關閉部分服務的失敗。
   */
  protected void serviceStop() throws Exception {

  }

  /*
   * 將監聽器注冊到服務狀態更改事件。
   * 如果提供的偵聽器已經在監聽此服務,則此方法是無效的。
   * 參數 l 表示:一個新的監聽器
   */
  @Override
  public void registerServiceListener(ServiceStateChangeListener l) {
    listeners.add(l);
  }

  /*
   * 取消注冊先前注冊的服務狀態更改事件的偵聽器。 如果監聽器已經被注銷,則不用操作。
   * 參數 l 表示:要注銷的監聽器
   */
  @Override
  public void unregisterServiceListener(ServiceStateChangeListener l) {
    listeners.remove(l);
  }

  /**
   * Register a global listener, which receives notifications
   * from the state change events of all services in the JVM
   * @param l listener
   */
  //注冊一個全局監聽器,它從JVM中所有服務的狀態更改事件接收通知
  public static void registerGlobalListener(ServiceStateChangeListener l) {
    globalListeners.add(l);
  }

  /**
   * unregister a global listener.
   * @param l listener to unregister
   * @return true if the listener was found (and then deleted)
   */
  //取消注冊全局監聽器。
  public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
    return globalListeners.remove(l);
  }

  /**
   * Package-scoped method for testing -resets the global listener list
   */
  //用於測試的程序包范圍的方法 - 重新設置全局偵聽器列表
  @VisibleForTesting
  static void resetGlobalListeners() {
    globalListeners.reset();
  }

  /*
   * 獲取服務的名稱。
   * 返回:服務的名稱
   */
  @Override
  public String getName() {
    return name;
  }

  /*
   * 獲取該服務的配置信息。
   * 這通常不是一個克隆,並且可能被操縱,盡管不能保證這種行為的后果可能如何
   * 返回:當前配置,除非具體實現選擇。
   */
  @Override
  public synchronized Configuration getConfig() {
    return config;
  }

  /*
   * 獲取服務的開始時間。
   * 返回:服務的開始時間。 如果服務尚未啟動,則為零。
   */
  @Override
  public long getStartTime() {
    return startTime;
  }

  /**
   * Notify local and global listeners of state changes.
   * Exceptions raised by listeners are NOT passed up.
   */
  //通知本地和全局監聽器的狀態變化。監聽器提出的異常情況不會被傳遞。
  private void notifyListeners() {
    try {
      listeners.notifyListeners(this);
      globalListeners.notifyListeners(this);
    } catch (Throwable e) {
      LOG.warn("Exception while notifying listeners of " + this + ": " + e,
               e);
    }
  }

  /**
   * Add a state change event to the lifecycle history
   */
  //將狀態更改事件添加到生命周期歷史記錄
  private void recordLifecycleEvent() {
    LifecycleEvent event = new LifecycleEvent();
    event.time = System.currentTimeMillis();
    event.state = getServiceState();
    lifecycleHistory.add(event);
  }

  /*
   * 獲取生命周期歷史的快照; 它是一個靜態列表
   * 返回:一個可能是empty的但從不是null的生命周期事件列表。
   */
  @Override
  public synchronized List<LifecycleEvent> getLifecycleHistory() {
    return new ArrayList<LifecycleEvent>(lifecycleHistory);
  }

  /**
   * Enter a state; record this via {@link #recordLifecycleEvent}
   * and log at the info level.
   * @param newState the proposed new state
   * @return the original state
   * it wasn't already in that state, and the state model permits state re-entrancy.
   */
  /*
   * 輸入狀態; 記錄這個通過{@link #recordLifecycleEvent}並以信息級別記錄在日志。
   * 參數 newState 表示 提出新的狀態
   * 返回:原來的狀態還沒有在這個狀態,狀態模式允許狀態重新進入。
   */
  private STATE enterState(STATE newState) {
    assert stateModel != null : "null state in " + name + " " + this.getClass();
    STATE oldState = stateModel.enterState(newState);
    if (oldState != newState) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(
          "Service: " + getName() + " entered state " + getServiceState());
      }
      recordLifecycleEvent();
    }
    return oldState;
  }

  /*
   * 查詢狀態是否處於特定狀態
   * 參數 表示提出新的狀態
   */
  @Override
  public final boolean isInState(Service.STATE expected) {
    return stateModel.isInState(expected);
  }

  @Override
  public String toString() {
    return "Service " + name + " in state " + stateModel;
  }

  /**
   * Put a blocker to the blocker map -replacing any
   * with the same name.
   * @param name blocker name
   * @param details any specifics on the block. This must be non-null.
   */
  /*
   * 將攔截器放在攔截器map上 - 重新放置任何具有相同名稱的。
   * 參數 name 表示:攔截器名稱
   * 參數 details 表示:詳細說明塊上的細節。 這必須是非空。
   */
  protected void putBlocker(String name, String details) {
    synchronized (blockerMap) {
      blockerMap.put(name, details);
    }
  }

  /**
   * Remove a blocker from the blocker map -
   * this is a no-op if the blocker is not present
   * @param name the name of the blocker
   */
  /*
   * 從攔截器map中移除一個攔截器 - 如果攔截器不存在,這是空操作
   * 參數 name 表示:攔截器的名稱
   */
  public void removeBlocker(String name) {
    synchronized (blockerMap) {
      blockerMap.remove(name);
    }
  }

  /*
   * 獲取一個服務的攔截器 - 遠程依賴關系,使服務不再是<i>live</i>。
   * 返回:一個攔截器名稱-&gt的(快照)map;描述值
   */
  @Override
  public Map<String, String> getBlockers() {
    synchronized (blockerMap) {
      Map<String, String> map = new HashMap<String, String>(blockerMap);
      return map;
    }
  }
}

 

YarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java

參考3   

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

/**
 * This interface is used by the components to talk to the
 * scheduler for allocating of resources, cleaning up resources.
 *
 */
//該接口用於組件與調度器對話以分配資源、清理資源。
public interface YarnScheduler extends EventHandler<SchedulerEvent> {

  /**
   * Get queue information
   * @param queueName queue name
   * @param includeChildQueues include child queues?
   * @param recursive get children queues?
   * @return queue information
   * @throws IOException
   */
  /*
    獲取隊列信息
    參數queueName 表示隊列名稱
    參數includeChildQueues 表示是否包含子隊列
    參數recursive 表示遞歸得到孩子隊列?
    返回QueueInfo 隊列信息
   */
  @Public
  @Stable
  public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
      boolean recursive) throws IOException;

  /**
   * Get acls for queues for current user.
   * @return acls for queues for current user
   */
  /*
    獲取當前用戶的隊列的訪問控制列表(acls)
  */
  @Public
  @Stable
  public List<QueueUserACLInfo> getQueueUserAclInfo();

  /**
   * Get the whole resource capacity of the cluster.
   * @return the whole resource capacity of the cluster.
   */
  /*
    獲取集群的全部資源容量。
    返回:集群的全部資源容量
  */
  @LimitedPrivate("yarn")
  @Unstable
  public Resource getClusterResource();

  /**
   * Get minimum allocatable {@link Resource}.
   * @return minimum allocatable resource
   */
  /*
    獲取最小可分配資源。
    返回:最小可分配資源。
  */
  @Public
  @Stable
  public Resource getMinimumResourceCapability();
  
  /**
   * Get maximum allocatable {@link Resource} at the cluster level.
   * @return maximum allocatable resource
   */
  /*
    獲得最大的可分配的資源在集群級別。
    返回:最大的可分配的資源
  */
  @Public
  @Stable
  public Resource getMaximumResourceCapability();

  /**
   * Get maximum allocatable {@link Resource} for the queue specified.
   * @param queueName queue name
   * @return maximum allocatable resource
   */
  /*
    獲取指定隊列的最大可分配資源。
    參數queueName 指隊列名
    返回:最大可分配資源
  */
  @Public
  @Stable
  public Resource getMaximumResourceCapability(String queueName);

  @LimitedPrivate("yarn")
  @Evolving
  ResourceCalculator getResourceCalculator();

  /**
   * Get the number of nodes available in the cluster.
   * @return the number of available nodes.
   */
  /*
    獲取集群中可用節點的數目。
    返回:可用節點的數目
  */
  @Public
  @Stable
  public int getNumClusterNodes();
  
  /**
   * The main api between the ApplicationMaster and the Scheduler.
   * The ApplicationMaster is updating his future resource requirements
   * and may release containers he doens't need.
   * 
   * @param appAttemptId
   * @param ask
   * @param release
   * @param blacklistAdditions 
   * @param blacklistRemovals 
   * @return the {@link Allocation} for the application
   */
  /*
    ApplicationMaster 和 Scheduler 之間的主要接口。ApplicationMaster 正在更新它的將來的資源需求以及可能釋放它不需要的 containers 。
    返回:應用程序的 {@link Allocation}
  */
  @Public
  @Stable
  Allocation 
  allocate(ApplicationAttemptId appAttemptId, 
      List<ResourceRequest> ask,
      List<ContainerId> release, 
      List<String> blacklistAdditions, 
      List<String> blacklistRemovals);

  /**
   * Get node resource usage report.
   * @param nodeId
   * @return the {@link SchedulerNodeReport} for the node or null
   * if nodeId does not point to a defined node.
   */
  /*
    獲取節點資源使用報告。
    返回:節點的 {@link SchedulerNodeReport} ;或者null,當nodeId沒有指向一個已經定義的節點。
  */
  @LimitedPrivate("yarn")
  @Stable
  public SchedulerNodeReport getNodeReport(NodeId nodeId);
  
  /**
   * Get the Scheduler app for a given app attempt Id.
   * @param appAttemptId the id of the application attempt
   * @return SchedulerApp for this given attempt.
   */
  /*
    獲取調度器應用程序,通過一個應用程序的嘗試Id。
    參數appAttemptId 應用程序嘗試的id
    返回:對這個給定的嘗試返回 SchedulerApp
  */
  @LimitedPrivate("yarn")
  @Stable
  SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);

  /**
   * Get a resource usage report from a given app attempt ID.
   * @param appAttemptId the id of the application attempt
   * @return resource usage report for this given attempt
   */
  /*
    從給定的應用程序嘗試ID獲取資源使用報告。
    參數appAttemptId表示應用程序嘗試的id
    返回:給定的嘗試的資源使用報告
  */
  @LimitedPrivate("yarn")
  @Evolving
  ApplicationResourceUsageReport getAppResourceUsageReport(
      ApplicationAttemptId appAttemptId);
  
  /**
   * Get the root queue for the scheduler.
   * @return the root queue for the scheduler.
   */
  /*
    獲取調度器的根隊列。
    返回:度器的根隊列
  */
  @LimitedPrivate("yarn")
  @Evolving
  QueueMetrics getRootQueueMetrics();

  /**
   * Check if the user has permission to perform the operation.
   * If the user has {@link QueueACL#ADMINISTER_QUEUE} permission,
   * this user can view/modify the applications in this queue
   * @param callerUGI
   * @param acl
   * @param queueName
   * @return <code>true</code> if the user has the permission,
   *         <code>false</code> otherwise
   */
  /*
    檢查用戶是否具有執行操作的權限。如果用戶有{@link QueueACL#ADMINISTER_QUEUE}這樣的權限,這個用戶就可以查看和修改這個隊列里的應用程序。
    返回:<code>true</code>表示用戶有這樣的權限, 其它返回 <code>false</code> 
  */
  boolean checkAccess(UserGroupInformation callerUGI,
      QueueACL acl, String queueName);
  
  /**
   * Gets the apps under a given queue
   * @param queueName the name of the queue.
   * @return a collection of app attempt ids in the given queue.
   */
  /*
    獲取給定隊列下的應用程序。
    參數 queueName指隊列的名稱
    返回:給定隊列的應用程序嘗試的id的集合
  */
  @LimitedPrivate("yarn")
  @Stable
  public List<ApplicationAttemptId> getAppsInQueue(String queueName);

  /**
   * Get the container for the given containerId.
   * @param containerId
   * @return the container for the given containerId.
   */
  /*
    獲得給定containerId的容器。
    返回:給定containerId的容器
  */
  @LimitedPrivate("yarn")
  @Unstable
  public RMContainer getRMContainer(ContainerId containerId);

  /**
   * Moves the given application to the given queue
   * @param appId
   * @param newQueue
   * @return the name of the queue the application was placed into
   * @throws YarnException if the move cannot be carried out
   */
  /*
    將給定的應用程序移動到給定的隊列。
    返回:返回應用程序被放置的隊列的名稱
    拋出YarnException異常,當移動不能進行的時候
  */
  @LimitedPrivate("yarn")
  @Evolving
  public String moveApplication(ApplicationId appId, String newQueue)
      throws YarnException;

  /**
   * Completely drain sourceQueue of applications, by moving all of them to
   * destQueue.
   *
   * @param sourceQueue
   * @param destQueue
   * @throws YarnException
   */
  /*
    應用程序完全用完sourceQueue, 通過把它們都移動到destQueue
  */
  void moveAllApps(String sourceQueue, String destQueue) throws YarnException;

  /**
   * Terminate all applications in the specified queue.
   *
   * @param queueName the name of queue to be drained
   * @throws YarnException
   */
  /*
    終止指定隊列中的所有應用程序。
    參數queueName指資源被用完的隊列名稱
  */
  void killAllAppsInQueue(String queueName) throws YarnException;

  /**
   * Remove an existing queue. Implementations might limit when a queue could be
   * removed (e.g., must have zero entitlement, and no applications running, or
   * must be a leaf, etc..).
   *
   * @param queueName name of the queue to remove
   * @throws YarnException
   */
  /*
    刪除現有隊列。當隊列可以被移除時,實現可能會受到限制 (例如,必須有零個授權,並且沒有應用程序運行,或必須是葉子,等。)
    參數 queueName指要刪除的隊列名
  */
  void removeQueue(String queueName) throws YarnException;

  /**
   * Add to the scheduler a new Queue. Implementations might limit what type of
   * queues can be dynamically added (e.g., Queue must be a leaf, must be
   * attached to existing parent, must have zero entitlement).
   *
   * @param newQueue the queue being added.
   * @throws YarnException
   */
  /*
    給調度器添加一個新隊列。實現可能會限制哪種類型的隊列能夠動態添加(例如,隊列必須是一個葉子,必須依附於現有的父級,必須有零的授權) 
  */
  void addQueue(Queue newQueue) throws YarnException;

  /**
   * This method increase the entitlement for current queue (must respect
   * invariants, e.g., no overcommit of parents, non negative, etc.).
   * Entitlement is a general term for weights in FairScheduler, capacity for
   * the CapacityScheduler, etc.
   *
   * @param queue the queue for which we change entitlement
   * @param entitlement the new entitlement for the queue (capacity,
   *              maxCapacity, etc..)
   * @throws YarnException
   */
  /*
    此方法增加當前隊列的權限(必須遵守不變量,例如,沒有過度使用的雙親,非負,等等。)。
  */
  void setEntitlement(String queue, QueueEntitlement entitlement)
      throws YarnException;

  /**
   * Gets the list of names for queues managed by the Reservation System
   * @return the list of queues which support reservations
   */
  /*
    獲取由預訂系統管理的隊列的名稱列表。
    返回:支持預定的隊列列表
  */
  public Set<String> getPlanQueues() throws YarnException;  

  /**
   * Return a collection of the resource types that are considered when
   * scheduling
   *
   * @return an EnumSet containing the resource types
   */
  /*
    返回調度時所考慮的資源類型的集合
    返回:返回一個EnumSet包含資源類型
  */
  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
}

 

ResourceScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;

/**
 * This interface is the one implemented by the schedulers. It mainly extends 
 * {@link YarnScheduler}. 
 *
 */
//這個接口是由調度器實現的。它主要擴展{@link YarnScheduler}.
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {

  /**
   * Set RMContext for <code>ResourceScheduler</code>.
   * This method should be called immediately after instantiating
   * a scheduler once.
   * @param rmContext created by ResourceManager
   */
  /*
   * 為<code>ResourceScheduler</code>設置RMContext。
   * 一旦實例化一個scheduler, 該方法應該立刻被調用。
   * 參數: rmContext 被 ResourceManager 創建
   */
  void setRMContext(RMContext rmContext);

  /**
   * Re-initialize the <code>ResourceScheduler</code>.
   * @param conf configuration
   * @throws IOException
   */
  /*
   * 重新初始化<code>ResourceScheduler</code>.
   * 參數conf表示配置
   */
  void reinitialize(Configuration conf, RMContext rmContext) throws IOException;
}

 

AbstractYarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.util.resource.Resources;

import com.google.common.util.concurrent.SettableFuture;


@SuppressWarnings("unchecked")
@Private
@Unstable
public abstract class AbstractYarnScheduler
    <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
    extends AbstractService implements ResourceScheduler {

  private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);

  // Nodes in the cluster, indexed by NodeId
  // 在集群中的節點,用NodeId索引
  protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();

  // Whole capacity of the cluster
  // 集群全部容量
  protected Resource clusterResource = Resource.newInstance(0, 0);

  protected Resource minimumAllocation;
  private Resource maximumAllocation;
  private Resource configuredMaximumAllocation;
  private int maxNodeMemory = -1;
  private int maxNodeVCores = -1;
  private final ReadLock maxAllocReadLock;
  private final WriteLock maxAllocWriteLock;

  private boolean useConfiguredMaximumAllocationOnly = true;
  private long configuredMaximumAllocationWaitTime;

  protected RMContext rmContext;
  
  /*
   * All schedulers which are inheriting AbstractYarnScheduler should use
   * concurrent version of 'applications' map.
   */
  // 所有繼承自AbstractYarnScheduler的調度器應該使用並行版本的'applications' map
  protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
  protected int nmExpireInterval;

  protected final static List<Container> EMPTY_CONTAINER_LIST =
      new ArrayList<Container>();
  protected static final Allocation EMPTY_ALLOCATION = new Allocation(
    EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);

  /**
   * Construct the service.
   *
   * @param name service name
   */
  /*
   * 構造服務
   * 參數name表示服務名
   */
  public AbstractYarnScheduler(String name) {
    super(name);
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.maxAllocReadLock = lock.readLock();
    this.maxAllocWriteLock = lock.writeLock();
  }

  // 服務所需的所有初始化代碼。
  @Override
  public void serviceInit(Configuration conf) throws Exception {
    // getInt()表示獲取<code> name </code>屬性的值作為<code> int </code>。
    // 如果沒有這樣的屬性,返回提供的默認值,或者如果指定的值不是有效的<code> int </ code>,那么會拋出一個錯誤。
    // 第一個參數是String name,第二個參數int defaultValue
    // DEFAULT_RM_NM_EXPIRY_INTERVAL_MS指節點管理器被認為死所要的等待的時間,默認為600000ms。
    nmExpireInterval =
        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
          YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
    // 獲取<code> name </code>屬性的值作為<code> long </code>。
    // 如果沒有這樣的屬性,返回所提供的默認值,或者如果指定的值不是有效的<code> long </ code>,則會拋出錯誤。
    // 第一個參數是String name,第二個參數long defaultValue
    // DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS指,默認為10000ms。
    configuredMaximumAllocationWaitTime =
        conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
          YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
    //
    createReleaseCache();
    super.serviceInit(conf);
  }

  public List<Container> getTransferredContainers(
      ApplicationAttemptId currentAttempt) {
    // 從<code>ApplicationAttempId</code>中獲取<code>ApplicationId</code>
    ApplicationId appId = currentAttempt.getApplicationId();
    // 調用的get()函數是Map類,返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。
    SchedulerApplication<T> app = applications.get(appId);
    // 構造一個初始容量為十的空列表。它只接收Container類型
    List<Container> containerList = new ArrayList<Container>();
    // rmContext是接口RMContext的對象,而該接口只有一個實現類RMContextImpl,
    // rmContext.getRMApps()返回ConcurrentMap<ApplicationId, RMApp> 
    // rmContext.getRMApps().get(appId)調用的是Map類的get()函數。
    RMApp appImpl = this.rmContext.getRMApps().get(appId);
    // appImpl是接口RMApp對象,
    // appImpl.getApplicationSubmissionContext()此{@link RMApp}的應用程序提交上下文,返回ApplicationSubmissionContext
    // getUnmanagedAM()獲取是否RM應該管理AM的執行。
    if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
      return containerList;
    }
    if (app == null) {
      return containerList;
    }
    // getLiveContainers()獲取應用程序的活動容器並返回。
    Collection<RMContainer> liveContainers =
        app.getCurrentAppAttempt().getLiveContainers();
    // getMasterContainer()是供Application Master運行的容器,
    // Container類的getId()獲取容器的全局唯一標識符。
    // 最后獲取的是Application Master的容器Id
    ContainerId amContainerId =
        rmContext.getRMApps().get(appId).getCurrentAppAttempt()
          .getMasterContainer().getId();
    for (RMContainer rmContainer : liveContainers) {
      // 判斷當前的Id是否是Application Master的容器Id
      if (!rmContainer.getContainerId().equals(amContainerId)) {
        // 不相等,則往容器列表中添加容器
        containerList.add(rmContainer.getContainer());
      }
    }
    return containerList;
  }

  public Map<ApplicationId, SchedulerApplication<T>>
      getSchedulerApplications() {
    return applications;
  }

  // 獲取集群的整個資源容量。
  @Override
  public Resource getClusterResource() {
    return clusterResource;
  }

  // 獲取最小可分配{@link Resource}。
  @Override
  public Resource getMinimumResourceCapability() {
    return minimumAllocation;
  }

  // 在集群級別獲取最大可分配{@link Resource}。
  @Override
  public Resource getMaximumResourceCapability() {
    Resource maxResource;
    maxAllocReadLock.lock();
    try {
      // 類最開始定義useConfiguredMaximumAllocationOnly為true
      if (useConfiguredMaximumAllocationOnly) {
        // System.currentTimeMillis()產生一個當前的毫秒,這個毫秒其實就是自1970年1月1日0時起的毫秒數
        // ResourceManager.getClusterTimeStamp()調用的也是System.currentTimeMillis(),
        // configuredMaximumAllocationWaitTime默認值為10000ms
        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
            > configuredMaximumAllocationWaitTime) {
          useConfiguredMaximumAllocationOnly = false;    //設為false
        }
        // 克隆一份資源
        maxResource = Resources.clone(configuredMaximumAllocation);
      } else {
        maxResource = Resources.clone(maximumAllocation);
      }
    } finally {
      maxAllocReadLock.unlock();
    }
    return maxResource;
  }

  //
  @Override
  public Resource getMaximumResourceCapability(String queueName) {
    return getMaximumResourceCapability();
  }

  // 初始化最大資源容量
  protected void initMaximumResourceCapability(Resource maximumAllocation) {
    maxAllocWriteLock.lock();
    try {
      if (this.configuredMaximumAllocation == null) {
        // 克隆資源
        this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
        this.maximumAllocation = Resources.clone(maximumAllocation);
      }
    } finally {
      maxAllocWriteLock.unlock();
    }
  }

  //
  protected synchronized void containerLaunchedOnNode(
      ContainerId containerId, SchedulerNode node) {
    // Get the application for the finished container
    // 獲取完成了的容器的應用程序
    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
        (containerId);
    if (application == null) {
      // getApplicationAttemptId()獲取分配了<code> Container </code>的應用程序的<code> ApplicationAttemptId </code>。
      // getApplicationId() 獲取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
      LOG.info("Unknown application "
          + containerId.getApplicationAttemptId().getApplicationId()
          + " launched container " + containerId + " on node: " + node);
      // rmContext是接口RMContext的對象, rmContext.getDispatcher()返回接口Dispatcher的對象, 
      // rmContext.getDispatcher().getEventHandler()返回接口EventHandler對象, 最后調用EventHandler的handle()方法
      // RMNodeCleanContainerEvent表示資源管理器節點清除容器事件,構造函數內部有RMNodeEventType.CLEANUP_CONTAINER
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
      return;
    }

    application.containerLaunchedOnNode(containerId, node.getNodeID());
  }

  //
  public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
    // getApplicationId() 獲取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。
    // 調用的get()函數是Map類,返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。
    SchedulerApplication<T> app =
        applications.get(applicationAttemptId.getApplicationId());
    // getCurrentAppAttempt()返回的是SchedulerApplicationAttempt類對象
    return app == null ? null : app.getCurrentAppAttempt();
  }

  // 從給定應用程序嘗試Id中獲取調度器應用程序
  @Override
  public SchedulerAppReport getSchedulerAppInfo(
      ApplicationAttemptId appAttemptId) {
    SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
    if (attempt == null) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
      }
      return null;
    }
    // SchedulerAppReport類 表示應用程序嘗試,以及嘗試使用的資源。
    return new SchedulerAppReport(attempt);
  }

  // 從給定的應用程序嘗試ID獲取資源使用情況報告。
  @Override
  public ApplicationResourceUsageReport getAppResourceUsageReport(
      ApplicationAttemptId appAttemptId) {
    SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
    if (attempt == null) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
      }
      return null;
    }
    //
    return attempt.getResourceUsageReport();
  }

  // 根據容器Id獲取當前應用程序的嘗試
  public T getCurrentAttemptForContainer(ContainerId containerId) {
    return getApplicationAttempt(containerId.getApplicationAttemptId());
  }

  // 獲取給定containerId的容器。
  @Override
  public RMContainer getRMContainer(ContainerId containerId) {
    SchedulerApplicationAttempt attempt =
        getCurrentAttemptForContainer(containerId);
    // getRMContainer()方法表示獲取資源管理器容器
    return (attempt == null) ? null : attempt.getRMContainer(containerId);
  }

  // 獲取節點資源使用情況報告。
  @Override
  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
    // Map類方法get()
    N node = nodes.get(nodeId);
    // SchedulerNodeReport類表示節點使用報告
    return node == null ? null : new SchedulerNodeReport(node);
  }

  // 將給定的應用程序移動到給定的隊列
  @Override
  public String moveApplication(ApplicationId appId, String newQueue)
      throws YarnException {
    throw new YarnException(getClass().getSimpleName()
        + " does not support moving apps between queues");
  }

  // 移除一個已有的隊列
  public void removeQueue(String queueName) throws YarnException {
    throw new YarnException(getClass().getSimpleName()
        + " does not support removing queues");
  }

  // 把一個新隊列添加到調度器。
  @Override
  public void addQueue(Queue newQueue) throws YarnException {
    throw new YarnException(getClass().getSimpleName()
        + " does not support this operation");
  }

  // 此方法增加了當前隊列的權限
  @Override
  public void setEntitlement(String queue, QueueEntitlement entitlement)
      throws YarnException {
    throw new YarnException(getClass().getSimpleName()
        + " does not support this operation");
  }

  //在節點上殺死孤立容器
  private void killOrphanContainerOnNode(RMNode node,
      NMContainerStatus container) {
    // getContainerState()獲取容器的狀態
    // Enum類的equals()函數表示 如果指定的對象等於此枚舉常量,則返回true。否則false。
    // ContainerState類表示容器的狀態,有三種NEW, RUNNING, COMPLETE。COMPLETE表示完成的容器。
    if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
      // 在本類的containerLaunchedOnNode()函數中有一樣的,略
      this.rmContext.getDispatcher().getEventHandler().handle(
        new RMNodeCleanContainerEvent(node.getNodeID(),
          container.getContainerId()));
    }
  }

  // 在節點上恢復容器
  public synchronized void recoverContainersOnNode(
      List<NMContainerStatus> containerReports, RMNode nm) {
    if (!rmContext.isWorkPreservingRecoveryEnabled()
        || containerReports == null
        || (containerReports != null && containerReports.isEmpty())) {
      return;
    }

    for (NMContainerStatus container : containerReports) {
      /*
       * container.getContainerId()獲取容器的<code> ContainerId </code>。
       * getApplicationAttemptId() 獲取分配了<code> Container </code>的應用程序的<code> ApplicationAttemptId </code>。
       * getApplicationId() 獲取<code> ApplicationAttempId </ code>的<code> ApplicationId </code>。
       */
      ApplicationId appId =
          container.getContainerId().getApplicationAttemptId().getApplicationId();
      //
      RMApp rmApp = rmContext.getRMApps().get(appId);
      if (rmApp == null) {
        LOG.error("Skip recovering container " + container
            + " for unknown application.");
        killOrphanContainerOnNode(nm, container);
        continue;
      }

      // Unmanaged AM recovery is addressed in YARN-1815
      // 未經管理的AM恢復在YARN-1815中得到解決
      // rmApp.getApplicationSubmissionContext()函數表示{@link RMApp}的應用程序提交上下文
      // getUnmanagedAM()獲取是否RM應該管理AM的執行。如果為真,則RM不會為AM分配容器並啟動它。
      if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
        LOG.info("Skip recovering container " + container + " for unmanaged AM."
            + rmApp.getApplicationId());
        killOrphanContainerOnNode(nm, container);
        continue;
      }

      //Map類的get()函數
      SchedulerApplication<T> schedulerApp = applications.get(appId);
      if (schedulerApp == null) {
        //rmApp.getState()表示{@link RMApp}的當前狀態。
        LOG.info("Skip recovering container  " + container
            + " for unknown SchedulerApplication. Application current state is "
            + rmApp.getState());
        killOrphanContainerOnNode(nm, container);
        continue;
      }

      LOG.info("Recovering container " + container);
      SchedulerApplicationAttempt schedulerAttempt =
          schedulerApp.getCurrentAppAttempt();

      // getKeepContainersAcrossApplicationAttempts()函數 獲取指示是否在應用程序嘗試中保留容器的標志
      if (!rmApp.getApplicationSubmissionContext()
        .getKeepContainersAcrossApplicationAttempts()) {
        // Do not recover containers for stopped attempt or previous attempt.
        // 不要因為停止了的嘗試或以前的嘗試恢復容器。
        if (schedulerAttempt.isStopped()
            || !schedulerAttempt.getApplicationAttemptId().equals(
              container.getContainerId().getApplicationAttemptId())) {
          LOG.info("Skip recovering container " + container
              + " for already stopped attempt.");
          killOrphanContainerOnNode(nm, container);
          continue;
        }
      }

      // create container
      // 創建容器
      RMContainer rmContainer = recoverAndCreateContainer(container, nm);

      // recover RMContainer
      // 恢復 RMContainer
      rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
        container));

      // recover scheduler node
      // 恢復調度器節點
      nodes.get(nm.getNodeID()).recoverContainer(rmContainer);

      // recover queue: update headroom etc.
      // 恢復隊列:更新凈空等等
      Queue queue = schedulerAttempt.getQueue();
      queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);

      // recover scheduler attempt
      // 恢復調度器嘗試
      schedulerAttempt.recoverContainer(rmContainer);
            
      // set master container for the current running AMContainer for this
      // attempt.
      // 為這個嘗試 為當前運行的AMContainer設置主容器
      RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
      if (appAttempt != null) {
        // getMasterContainer()函數表示 ApplicationMaster運行在其上的容器
        Container masterContainer = appAttempt.getMasterContainer();

        // Mark current running AMContainer's RMContainer based on the master
        // container ID stored in AppAttempt.
        // 根據存儲在AppAttempt中的主容器ID,標記當前正在運行的AMContainer的RMContainer。
        if (masterContainer != null
            && masterContainer.getId().equals(rmContainer.getContainerId())) {
          // 設置ApplicationMaster容器
          ((RMContainerImpl)rmContainer).setAMContainer(true);
        }
      }

      synchronized (schedulerAttempt) {
        // 這個pendingRelease用於工作維護恢復方案,以跟蹤AM的未完成發布請求。
        // RM恢復可以收到AM的發布請求表,在此之前從NM收到容器狀態以進行恢復。
        // 在這種情況下,由NM報告的待回收容器不應該被收回。
        Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
        // Set類中的contains()函數, 
        //如果此集合包含指定的元素,則返回<tt> true </ tt>。 更正式地,當且僅當該集合包含元素<tt> e </ tt>時,返回<tt> true </ tt>,
        //這樣<tt>(o==null&nbsp;?&nbsp;e==null&nbsp;:&nbsp;o.equals(e))</tt>.
        if (releases.contains(container.getContainerId())) {
          // release the container
          //釋放容器
          rmContainer.handle(new RMContainerFinishedEvent(container
            .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
            container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
            RMContainerEventType.RELEASED));
          releases.remove(container.getContainerId());
          LOG.info(container.getContainerId() + " is released by application.");
        }
      }
    }
  }

  // 恢復並創建容器
  // NMContainerStatus包括容器的當前信息。
  // RMNode類表示節點管理器有關可用資源和其他靜態信息的信息。
  private RMContainer recoverAndCreateContainer(NMContainerStatus status,
      RMNode node) {
    // 創建Container實例
    Container container =
        Container.newInstance(status.getContainerId(), node.getNodeID(),
          node.getHttpAddress(), status.getAllocatedResource(),
          status.getPriority(), null);
    // 獲取應用程序的嘗試Id
    ApplicationAttemptId attemptId =
        container.getId().getApplicationAttemptId();
    // 創建一個RMContainerImpl對象                                                                      
    RMContainer rmContainer =
        new RMContainerImpl(container, attemptId, node.getNodeID(),
          applications.get(attemptId.getApplicationId()).getUser(), rmContext,
          status.getCreationTime());
    return rmContainer;
  }

  /**
   * Recover resource request back from RMContainer when a container is 
   * preempted before AM pulled the same. If container is pulled by
   * AM, then RMContainer will not have resource request to recover.
   * @param rmContainer
   */
  /*
   * 在AM拉出相同之前當容器被搶占時,從RMContainer恢復資源請求。如果容器被AM拉過來,則RMContainer將不會有資源請求恢復。
   */
  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
    // getResourceRequests()函數獲取資源請求
    List<ResourceRequest> requests = rmContainer.getResourceRequests();

    // If container state is moved to ACQUIRED, request will be empty.
    // 如果容器狀態被移動到 ACQUIRED,請求將為空。
    if (requests == null) {
      return;
    }
    // Add resource request back to Scheduler.
    // 將資源請求添加回調度器。
    SchedulerApplicationAttempt schedulerAttempt 
        = getCurrentAttemptForContainer(rmContainer.getContainerId());
    if (schedulerAttempt != null) {
      // 恢復資源請求
      schedulerAttempt.recoverResourceRequests(requests);
    }
  }

  protected void createReleaseCache() {
    // Cleanup the cache after nm expire interval.
    // 在nm到期之際后清除緩存。
    // Timer類創建一個新的計時器。schedule()函數表示在指定的延遲之后安排指定的任務執行。
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
        // Map類的values()函數表示 返回此map中包含的值的{@link Collection}視圖。
        for (SchedulerApplication<T> app : applications.values()) {

          // 獲取當前應用程序的嘗試
          T attempt = app.getCurrentAppAttempt();
          synchronized (attempt) {
            // 
            for (ContainerId containerId : attempt.getPendingRelease()) {
              // logFailure()函數表示 為失敗的事件創建可讀和可分析的審核日志字符串。
              RMAuditLogger.logFailure(
                app.getUser(),
                AuditConstants.RELEASE_CONTAINER,
                "Unauthorized access or invalid container",
                "Scheduler",
                "Trying to release container not owned by app or with invalid id.",
                attempt.getApplicationId(), containerId);
            }
            // Set類的clear()函數表示 從此set中刪除所有元素(可選操作)。 此調用返回后,該組將為空。
            attempt.getPendingRelease().clear();
          }
        }
        LOG.info("Release request cache is cleaned up");
      }
    }, nmExpireInterval);
  }

  // clean up a completed container
  // 清理完成的容器
  protected abstract void completedContainer(RMContainer rmContainer,
      ContainerStatus containerStatus, RMContainerEventType event);

  // 清除容器
  protected void releaseContainers(List<ContainerId> containers,
      SchedulerApplicationAttempt attempt) {
    for (ContainerId containerId : containers) {
      // 獲取給定containerId的容器。
      RMContainer rmContainer = getRMContainer(containerId);
      if (rmContainer == null) {
        //
        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
            < nmExpireInterval) {
          LOG.info(containerId + " doesn't exist. Add the container"
              + " to the release request cache as it maybe on recovery.");
          synchronized (attempt) {
            // Set類的add()函數表示 如果指定的元素不存在,則將其指定的元素添加到這個set(可選操作)。 
            // 更正式地,如果set不包含元素<tt> e2 </tt>,則將指定的元素<tt> e </tt>添加到此set,以便
            //<tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
            attempt.getPendingRelease().add(containerId);
          }
        } else {
          // logFailure()函數表示 為失敗的事件創建可讀和可分析的審核日志字符串
          RMAuditLogger.logFailure(attempt.getUser(),
            AuditConstants.RELEASE_CONTAINER,
            "Unauthorized access or invalid container", "Scheduler",
            "Trying to release container not owned by app or with invalid id.",
            attempt.getApplicationId(), containerId);
        }
      }
      // 清理完成的容器
      // createAbnormalContainerStatus()函數表示在特殊情況下創建{@link ContainerStatus}的實用程序。
      completedContainer(rmContainer,
        SchedulerUtils.createAbnormalContainerStatus(containerId,
          SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
    }
  }

  // 獲取
  // SchedulerNode類表示 從調度器的角度表示YARN集群節點。
  public SchedulerNode getSchedulerNode(NodeId nodeId) {
    // Map類的get()函數表示 返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。
    return nodes.get(nodeId);
  }

  // 完全排除應用程序的sourceQueue,將其全部移動到destQueue。
  @Override
  public synchronized void moveAllApps(String sourceQueue, String destQueue)
      throws YarnException {
    // check if destination queue is a valid leaf queue
    // 檢查目標隊列是否是有效的葉隊列
    try {
      getQueueInfo(destQueue, false, false);
    } catch (IOException e) {
      LOG.warn(e);
      throw new YarnException(e);
    }
    // check if source queue is a valid
    // 檢查源隊列是否有效
    // getAppsInQueue()函數表示 獲取給定隊列下的應用程序
    List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
    if (apps == null) {
      String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
      LOG.warn(errMsg);
      throw new YarnException(errMsg);
    }
    // generate move events for each pending/running app
    // 為每個待處理/正在運行的應用生成移動事件
    for (ApplicationAttemptId app : apps) {
      // 
      SettableFuture<Object> future = SettableFuture.create();
      // RMAppMoveEvent類構造函數內部有 RMAppEventType.MOVE事件。
      this.rmContext
          .getDispatcher()
          .getEventHandler()
          .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
    }
  }
  
  // 終止指定隊列中的所有應用程序。
  @Override
  public synchronized void killAllAppsInQueue(String queueName)
      throws YarnException {
    // check if queue is a valid
    // 檢查隊列是否有效
    // getAppsInQueue()函數表示 獲取給定隊列下的應用程序
    List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
    if (apps == null) {
      String errMsg = "The specified Queue: " + queueName + " doesn't exist";
      LOG.warn(errMsg);
      throw new YarnException(errMsg);
    }
    // generate kill events for each pending/running app
    // 為每個待處理/正在運行的應用生成kill事件
    for (ApplicationAttemptId app : apps) {
      this.rmContext
          .getDispatcher()
          .getEventHandler()
          .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL,
          "Application killed due to expiry of reservation queue " +
          queueName + "."));
    }
  }
  
  /**
   * Process resource update on a node.
   */
  // 在節點上處理資源更新。
  public synchronized void updateNodeResource(RMNode nm, 
      ResourceOption resourceOption) {
    SchedulerNode node = getSchedulerNode(nm.getNodeID());
    Resource newResource = resourceOption.getResource();
    // 獲取節點上的總資源。
    Resource oldResource = node.getTotalResource();
    if(!oldResource.equals(newResource)) {
      // Log resource change
      // 日志記錄資源更改
      LOG.info("Update resource on node: " + node.getNodeName()
          + " from: " + oldResource + ", to: "
          + newResource);

      // Map類的remove()函數表示 從該map中刪除一個鍵的映射,如果存在(可選的操作)。
      // 更正式地,如果該map包含從<tt> k </tt>到值<tt> v </tt>的映射,使得<code>(key==null ? k==null : key.equals(k))</code>,
      // 該映射被刪除。(map最多可以包含一個這樣的映射。)
      nodes.remove(nm.getNodeID());
      //
      updateMaximumAllocation(node, false);

      // update resource to node
      // 將資源更新到節點
      // 在節點上設置總資源。
      node.setTotalResource(newResource);

      // Map類的put()函數表示 將指定的值與該映射中的指定鍵相關聯(可選操作)。如果map先前包含了鍵的映射,則舊值將被指定的值替換。
      nodes.put(nm.getNodeID(), (N)node);
      // 
      updateMaximumAllocation(node, true);

      // update resource to clusterResource
      // 將資源更新到clusterResource
      // subtractFrom(clusterResource, oldResource)表示從clusterResource減去oldResource,資源包括內存和虛擬內核
      Resources.subtractFrom(clusterResource, oldResource);
      // addTo(clusterResource, newResource)表示在clusterResource添加newResource,資源包括內存和虛擬內核
      Resources.addTo(clusterResource, newResource);
    } else {
      // Log resource change
      // 日志記錄資源改變
      LOG.warn("Update resource on node: " + node.getNodeName() 
          + " with the same resource: " + newResource);
    }
  }

  /** {@inheritDoc} */
  // 返回調度時考慮的資源類型的集合
  @Override
  public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
    // EnumSet類的of()函數 創建一個最初包含指定元素的枚舉集。 
    return EnumSet.of(SchedulerResourceTypes.MEMORY);
  }

  // 獲取由預留系統管理的隊列的名稱列表
  @Override
  public Set<String> getPlanQueues() throws YarnException {
    // Object類的getClass()函數 返回此{@code Object}的運行時類。
    // 返回的{@code Class}對象是被表示類的{@code static synchronized}方法鎖定的對象。
    // Class類的getSimpleName()函數返回源代碼中給出的基礎類的簡單名稱。 如果基礎類是匿名的,則返回一個空字符串。
    throw new YarnException(getClass().getSimpleName()
        + " does not support reservations");
  }

  // 更新最大可分配
  protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
    // 獲取節點上的總資源
    Resource totalResource = node.getTotalResource();
    maxAllocWriteLock.lock();
    try {
      if (add) { // added node  //添加節點
        // 獲取資源的<em>memory</em> 
        int nodeMemory = totalResource.getMemory();
        if (nodeMemory > maxNodeMemory) {
          maxNodeMemory = nodeMemory;
          // 設置資源的<em>memory</em>
          // Math.min()返回兩個數的最小值
          maximumAllocation.setMemory(Math.min(
              configuredMaximumAllocation.getMemory(), maxNodeMemory));
        }
        // 獲取資源的<em>number of virtual cpu cores</em>
        int nodeVCores = totalResource.getVirtualCores();
        if (nodeVCores > maxNodeVCores) {
          maxNodeVCores = nodeVCores;
          maximumAllocation.setVirtualCores(Math.min(
              configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
        }
      } else {  // removed node  //刪除節點
        if (maxNodeMemory == totalResource.getMemory()) {
          maxNodeMemory = -1;
        }
        if (maxNodeVCores == totalResource.getVirtualCores()) {
          maxNodeVCores = -1;
        }
        // We only have to iterate through the nodes if the current max memory
        // or vcores was equal to the removed node's
        // 如果當前的最大內存或虛擬內核等於被刪除的節點的,我們只需遍歷節點
        if (maxNodeMemory == -1 || maxNodeVCores == -1) {
          // A map entry (key-value pair).  entrySet()返回此map中包含的映射的{@link Set}視圖。
          for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
            int nodeMemory =
                nodeEntry.getValue().getTotalResource().getMemory();
            if (nodeMemory > maxNodeMemory) {
              maxNodeMemory = nodeMemory;
            }
            int nodeVCores =
                nodeEntry.getValue().getTotalResource().getVirtualCores();
            if (nodeVCores > maxNodeVCores) {
              maxNodeVCores = nodeVCores;
            }
          }
          if (maxNodeMemory == -1) {  // no nodes  //無節點
            maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
          } else {
            maximumAllocation.setMemory(
                Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
          }
          if (maxNodeVCores == -1) {  // no nodes   //無節點
            maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
          } else {
            maximumAllocation.setVirtualCores(
                Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
          }
        }
      }
    } finally {
      maxAllocWriteLock.unlock();
    }
  }

  // 刷新最大可分配
  protected void refreshMaximumAllocation(Resource newMaxAlloc) {
    maxAllocWriteLock.lock();
    try {
      configuredMaximumAllocation = Resources.clone(newMaxAlloc);
      int maxMemory = newMaxAlloc.getMemory();
      if (maxNodeMemory != -1) {
        maxMemory = Math.min(maxMemory, maxNodeMemory);
      }
      int maxVcores = newMaxAlloc.getVirtualCores();
      if (maxNodeVCores != -1) {
        maxVcores = Math.min(maxVcores, maxNodeVCores);
      }
      // 
      maximumAllocation = Resources.createResource(maxMemory, maxVcores);
    } finally {
      maxAllocWriteLock.unlock();
    }
  }

  // 為應用嘗試獲取待處理的資源請求
  public List<ResourceRequest> getPendingResourceRequestsForAttempt(
      ApplicationAttemptId attemptId) {
    // 獲取應用程序嘗試
    SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId);
    if (attempt != null) {
      // getAppSchedulingInfo()獲取應用程序調度信息。  getAllResourceRequests()獲取所有的資源請求。
      return attempt.getAppSchedulingInfo().getAllResourceRequests();
    }
    return null;
  }
}

 

Hadoop 三大調度器包括, Fifo , Capacity 以及 Fair 調度器,如下所示:

(1)  Fifo調度器

  對應的類是 FifoScheduler.java , 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java 。

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

import com.google.common.annotations.VisibleForTesting;

@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
public class FifoScheduler extends
    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
    Configurable {

  private static final Log LOG = LogFactory.getLog(FifoScheduler.class);

  private static final RecordFactory recordFactory = 
    RecordFactoryProvider.getRecordFactory(null);

  Configuration conf;

  private boolean usePortForNodeName;

  private ActiveUsersManager activeUsersManager;

  private static final String DEFAULT_QUEUE_NAME = "default";
  private QueueMetrics metrics;
  
  private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();

  //創建一個默認隊列
  private final Queue DEFAULT_QUEUE = new Queue() {
    @Override
    public String getQueueName() {
      return DEFAULT_QUEUE_NAME;
    }

    @Override
    public QueueMetrics getMetrics() {
      return metrics;
    }

    @Override
    public QueueInfo getQueueInfo( 
        boolean includeChildQueues, boolean recursive) {
      QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
      queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
      queueInfo.setCapacity(1.0f);
      if (clusterResource.getMemory() == 0) {
        queueInfo.setCurrentCapacity(0.0f);
      } else {
        queueInfo.setCurrentCapacity((float) usedResource.getMemory()
            / clusterResource.getMemory());
      }
      queueInfo.setMaximumCapacity(1.0f);
      queueInfo.setChildQueues(new ArrayList<QueueInfo>());
      queueInfo.setQueueState(QueueState.RUNNING);
      return queueInfo;
    }

    public Map<QueueACL, AccessControlList> getQueueAcls() {
      Map<QueueACL, AccessControlList> acls =
        new HashMap<QueueACL, AccessControlList>();
      for (QueueACL acl : QueueACL.values()) {
        acls.put(acl, new AccessControlList("*"));
      }
      return acls;
    }

    @Override
    public List<QueueUserACLInfo> getQueueUserAclInfo(
        UserGroupInformation unused) {
      QueueUserACLInfo queueUserAclInfo = 
        recordFactory.newRecordInstance(QueueUserACLInfo.class);
      queueUserAclInfo.setQueueName(DEFAULT_QUEUE_NAME);
      queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values()));
      return Collections.singletonList(queueUserAclInfo);
    }

    @Override
    public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
      return getQueueAcls().get(acl).isUserAllowed(user);
    }
    
    @Override
    public ActiveUsersManager getActiveUsersManager() {
      return activeUsersManager;
    }

    @Override
    public void recoverContainer(Resource clusterResource,
        SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
        return;
      }
      increaseUsedResources(rmContainer);
      updateAppHeadRoom(schedulerAttempt);
      updateAvailableResourcesMetrics();
    }

    @Override
    public Set<String> getAccessibleNodeLabels() {
      // TODO add implementation for FIFO scheduler
      return null;
    }

    @Override
    public String getDefaultNodeLabelExpression() {
      // TODO add implementation for FIFO scheduler
      return null;
    }
  };

  public FifoScheduler() {
    super(FifoScheduler.class.getName());
  }

  // 初始化調度器
  private synchronized void initScheduler(Configuration conf) {
    // 驗證配置信息
    validateConf(conf);
    //Use ConcurrentSkipListMap because applications need to be ordered
    // 使用ConcurrentSkipListMap,因為應用程序需要有序
    // 該applications在它的父類抽象類AbstractYarnScheduler中聲明。
    this.applications =
        new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
    // createResource()函數表示創建資源。 getInt()函數表示獲取<code> name </code>屬性的值作為<code> int </code>。
    // 其中DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024
    this.minimumAllocation =
        Resources.createResource(conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
    // 初始化最大資源容量。 
    // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192
    // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4
    initMaximumResourceCapability(
        Resources.createResource(conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
          conf.getInt(
            YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
    // getBoolean()函數表示獲取<code> name </code>屬性的值作為<code>boolean</code>。
    // 其中 DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false
    this.usePortForNodeName = conf.getBoolean(
        YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
    //
    this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
        conf);
    // ActiveUsersManager 跟蹤系統中的活動用戶。
    this.activeUsersManager = new ActiveUsersManager(metrics);
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    //初始化調度器
    initScheduler(conf);
    super.serviceInit(conf);
  }

  @Override
  public void serviceStart() throws Exception {
    super.serviceStart();
  }

  @Override
  public void serviceStop() throws Exception {
    super.serviceStop();
  }

  @Override
  public synchronized void setConf(Configuration conf) {
    this.conf = conf;
  }
  
  //驗證配置信息
  private void validateConf(Configuration conf) {
    // validate scheduler memory allocation setting
    // 驗證調度器內存分配設置
    // 其中 DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024
    int minMem = conf.getInt(
      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192
    int maxMem = conf.getInt(
      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    
    if (minMem <= 0 || minMem > maxMem) {
      throw new YarnRuntimeException("Invalid resource scheduler memory"
        + " allocation configuration"
        + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
        + "=" + minMem
        + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB
        + "=" + maxMem + ", min and max should be greater than 0"
        + ", max should be no smaller than min.");
    }
  }
  
  @Override
  public synchronized Configuration getConf() {
    return conf;
  }

  @Override
  public int getNumClusterNodes() {
    // Map類的size()函數表示 返回此map中鍵值映射的數量。
    return nodes.size();
  }

  @Override
  public synchronized void setRMContext(RMContext rmContext) {
    this.rmContext = rmContext;
  }

  @Override
  public synchronized void
      reinitialize(Configuration conf, RMContext rmContext) throws IOException
  {
    setConf(conf);
  }

  // 
  @Override
  public Allocation allocate(
      ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
      List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
    if (application == null) {
      LOG.error("Calling allocate on removed " +
          "or non existant application " + applicationAttemptId);
      return EMPTY_ALLOCATION;
    }

    // Sanity check
    // 完整性檢查
    SchedulerUtils.normalizeRequests(ask, resourceCalculator, 
        clusterResource, minimumAllocation, getMaximumResourceCapability());

    // Release containers
    // 釋放容器
    releaseContainers(release, application);

    synchronized (application) {

      // make sure we aren't stopping/removing the application
      // when the allocate comes in
      // 確保在分配進來時我們不會停止/刪除應用程序
      if (application.isStopped()) {
        LOG.info("Calling allocate on a stopped " +
            "application " + applicationAttemptId);
        return EMPTY_ALLOCATION;
      }

      if (!ask.isEmpty()) {
        LOG.debug("allocate: pre-update" +
            " applicationId=" + applicationAttemptId + 
            " application=" + application);
        // 
        application.showRequests();

        // Update application requests
        // 更新應用程序請求
        application.updateResourceRequests(ask);

        LOG.debug("allocate: post-update" +
            " applicationId=" + applicationAttemptId + 
            " application=" + application);
        application.showRequests();

        LOG.debug("allocate:" +
            " applicationId=" + applicationAttemptId + 
            " #ask=" + ask.size());
      }
      
      // 更新黑名單列表
      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
      // 創建容器令牌和NMToken,如果其中任何一個由於某些原因(如DNS不可用)而失敗,
      // 請不要返回此容器並將其保留在等待重新引導的newlyAllocatedContainers中。
      ContainersAndNMTokensAllocation allocation =
          application.pullNewlyAllocatedContainersAndNMTokens();
      // 在應用程序的用戶的資源方面獲得可用的余量。
      Resource headroom = application.getHeadroom();
      // 
      application.setApplicationHeadroomForMetrics(headroom);
      // Allocation類
      return new Allocation(allocation.getContainerList(), headroom, null,
          null, null, allocation.getNMTokenList());
    }
  }

  private FiCaSchedulerNode getNode(NodeId nodeId) {
    return nodes.get(nodeId);
  }

  @VisibleForTesting
  public synchronized void addApplication(ApplicationId applicationId,
      String queue, String user, boolean isAppRecovering) {
    // 
    SchedulerApplication<FiCaSchedulerApp> application =
        new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
    // Map類函數put()表示將指定的值與該map中的指定鍵相關聯(可選操作)。
    applications.put(applicationId, application);
    // 
    metrics.submitApp(user);
    LOG.info("Accepted application " + applicationId + " from user: " + user
        + ", currently num of applications: " + applications.size());
    if (isAppRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
      }
    } else {
      rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
    }
  }

  @VisibleForTesting
  public synchronized void
      addApplicationAttempt(ApplicationAttemptId appAttemptId,
          boolean transferStateFromPreviousAttempt,
          boolean isAttemptRecovering) {
    // Map類函數get()表示返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。
    SchedulerApplication<FiCaSchedulerApp> application =
        applications.get(appAttemptId.getApplicationId());
    String user = application.getUser();
    // TODO: Fix store
    // 創建FiCaSchedulerApp類對象, 表示從FIFO或容量調度器的角度出發的應用程序嘗試。
    FiCaSchedulerApp schedulerApp =
        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
          activeUsersManager, this.rmContext);

    if (transferStateFromPreviousAttempt) {
      schedulerApp.transferStateFromPreviousAttempt(application
        .getCurrentAppAttempt());
    }
    application.setCurrentAppAttempt(schedulerApp);

    metrics.submitAppAttempt(user);
    LOG.info("Added Application Attempt " + appAttemptId
        + " to scheduler from user " + application.getUser());
    if (isAttemptRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(appAttemptId
            + " is recovering. Skipping notifying ATTEMPT_ADDED");
      }
    } else {
      rmContext.getDispatcher().getEventHandler().handle(
        new RMAppAttemptEvent(appAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
    }
  }

  private synchronized void doneApplication(ApplicationId applicationId,
      RMAppState finalState) {
    SchedulerApplication<FiCaSchedulerApp> application =
        applications.get(applicationId);
    if (application == null){
      LOG.warn("Couldn't find application " + applicationId);
      return;
    }

    // Inform the activeUsersManager
    // 通知activeUsersManager
    activeUsersManager.deactivateApplication(application.getUser(),
      applicationId);
    application.stop(finalState);
    // Map類函數remove()表示 如果存在(從可選的操作),從該map中刪除一個鍵的映射。
    applications.remove(applicationId);
  }

  private synchronized void doneApplicationAttempt(
      ApplicationAttemptId applicationAttemptId,
      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
      throws IOException {
    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
    SchedulerApplication<FiCaSchedulerApp> application =
        applications.get(applicationAttemptId.getApplicationId());
    if (application == null || attempt == null) {
      throw new IOException("Unknown application " + applicationAttemptId + 
      " has completed!");
    }

    // Kill all 'live' containers
    // 殺死所有的“活”容器
    for (RMContainer container : attempt.getLiveContainers()) {
      if (keepContainers
          && container.getState().equals(RMContainerState.RUNNING)) {
        // do not kill the running container in the case of work-preserving AM
        // restart.
        // 在維護AM重新啟動的情況下,不要殺死正在運行的容器。
        LOG.info("Skip killing " + container.getContainerId());
        continue;
      }
      // createAbnormalContainerStatus()表示在特殊情況下創建{@link ContainerStatus}的實用程序。
      completedContainer(container,
        SchedulerUtils.createAbnormalContainerStatus(
          container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
        RMContainerEventType.KILL);
    }

    // Clean up pending requests, metrics etc.
    // 清理待處理的請求,指標等
    attempt.stop(rmAppAttemptFinalState);
  }
  
  /**
   * Heart of the scheduler...
   * 
   * @param node node on which resources are available to be allocated
   */
  // 調度器的核心...  
  // 分配容器, 參數node表示資源可用於分配的節點
  private void assignContainers(FiCaSchedulerNode node) {
    LOG.debug("assignContainers:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " #applications=" + applications.size());

    // Try to assign containers to applications in fifo order
    // 嘗試以fifo順序將容器分配給應用程序
    for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
        .entrySet()) {
      FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
      if (application == null) {
        continue;
      }

      LOG.debug("pre-assignContainers");
      application.showRequests();
      synchronized (application) {
        // Check if this resource is on the blacklist
        // 檢查這個資源是否在黑名單上
        if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
          continue;
        }
        
        for (Priority priority : application.getPriorities()) {
          // 獲取最大可分配容器
          int maxContainers = 
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.OFF_SWITCH); 
          // Ensure the application needs containers of this priority
          // 確保應用程序需要這個優先級的容器
          if (maxContainers > 0) {
            // 在節點上分配容器
            int assignedContainers = 
              assignContainersOnNode(node, application, priority);
            // Do not assign out of order w.r.t priorities
            // 分配不要違反w.r.t優先級
            if (assignedContainers == 0) {
              break;
            }
          }
        }
      }
      
      LOG.debug("post-assignContainers");
      application.showRequests();

      // Done
      //
      if (Resources.lessThan(resourceCalculator, clusterResource,
              node.getAvailableResource(), minimumAllocation)) {
        break;
      }
    }

    // Update the applications' headroom to correctly take into
    // account the containers assigned in this update.
    // 更新應用程序的余量,以正確地考慮在此更新中分配的容器。
    for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
      // 
      FiCaSchedulerApp attempt =
          (FiCaSchedulerApp) application.getCurrentAppAttempt();
      if (attempt == null) {
        continue;
      }
      // 
      updateAppHeadRoom(attempt);
    }
  }

  private int getMaxAllocatableContainers(FiCaSchedulerApp application,
      Priority priority, FiCaSchedulerNode node, NodeType type) {
    int maxContainers = 0;
    
    ResourceRequest offSwitchRequest = 
      application.getResourceRequest(priority, ResourceRequest.ANY);
    if (offSwitchRequest != null) {
      maxContainers = offSwitchRequest.getNumContainers();
    }

    // 資源分類。 NODE_LOCAL(0) 表示同一節點, RACK_LOCAL(1) 同一機架上不同節點, OFF_SWITCH(2)不同機架
    if (type == NodeType.OFF_SWITCH) {
      return maxContainers;
    }

    if (type == NodeType.RACK_LOCAL) {
      // getResourceRequest()獲取資源請求。 getRMNode()返回RMNode類對象, getRackName()函數此節點管理器的機架名稱。
      ResourceRequest rackLocalRequest = 
        application.getResourceRequest(priority, node.getRMNode().getRackName());
      if (rackLocalRequest == null) {
        return maxContainers;
      }

      maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers());
    }

    if (type == NodeType.NODE_LOCAL) {
      // getResourceRequest()獲取資源請求。 getNodeAddress()該節點的ContainerManager地址。
      ResourceRequest nodeLocalRequest = 
        application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
      if (nodeLocalRequest != null) {
        // getNumContainers()獲取所需規格的容器數量
        maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers());
      }
    }

    return maxContainers;
  }

  // 在節點上分配容器
  private int assignContainersOnNode(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority 
  ) {
    // Data-local // 數據本地
    int nodeLocalContainers = 
      assignNodeLocalContainers(node, application, priority); 

    // Rack-local // 機架本地
    int rackLocalContainers = 
      assignRackLocalContainers(node, application, priority);

    // Off-switch // 非同一機架
    int offSwitchContainers =
      assignOffSwitchContainers(node, application, priority);

    
    LOG.debug("assignContainersOnNode:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " application=" + application.getApplicationId().getId() +
        " priority=" + priority.getPriority() + 
        " #assigned=" + 
        (nodeLocalContainers + rackLocalContainers + offSwitchContainers));


    return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
  }

  // 分配節點本地容器
  private int assignNodeLocalContainers(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority) {
    int assignedContainers = 0;
    // getNodeName()獲取節點的名稱以調度匹配決策。
    ResourceRequest request = 
      application.getResourceRequest(priority, node.getNodeName());
    if (request != null) {
      // Don't allocate on this node if we don't need containers on this rack
      // 如果我們不需要在此機架上的容器,則不要在此節點上分配
      // getRackName()此節點管理器的機架名稱。
      ResourceRequest rackRequest =
          application.getResourceRequest(priority, 
              node.getRMNode().getRackName()); 
      // getNumContainers()獲取所需規格的容器數量。
      if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
        return 0;
      }
      
      int assignableContainers = 
        Math.min(
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.NODE_LOCAL), 
                request.getNumContainers());
      // 分配容器
      assignedContainers = 
        assignContainer(node, application, priority, 
            assignableContainers, request, NodeType.NODE_LOCAL);
    }
    return assignedContainers;
  }

  // 分配機架本地容器
  private int assignRackLocalContainers(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority) {
    int assignedContainers = 0;
    ResourceRequest request = 
      application.getResourceRequest(priority, node.getRMNode().getRackName());
    if (request != null) {
      // Don't allocate on this rack if the application doens't need containers
      // 如果應用程序不需要容器,請不要在此機架上分配
      ResourceRequest offSwitchRequest =
          application.getResourceRequest(priority, ResourceRequest.ANY);
      if (offSwitchRequest.getNumContainers() <= 0) {
        return 0;
      }
      
      int assignableContainers = 
        Math.min(
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.RACK_LOCAL), 
                request.getNumContainers());
      // 分配容器
      assignedContainers = 
        assignContainer(node, application, priority, 
            assignableContainers, request, NodeType.RACK_LOCAL);
    }
    return assignedContainers;
  }

  // 分配容器跨機架
  private int assignOffSwitchContainers(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority) {
    int assignedContainers = 0;
    ResourceRequest request = 
      application.getResourceRequest(priority, ResourceRequest.ANY);
    if (request != null) {
      assignedContainers = 
        assignContainer(node, application, priority, 
            request.getNumContainers(), request, NodeType.OFF_SWITCH);
    }
    return assignedContainers;
  }

  // 分配容器
  private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, 
      Priority priority, int assignableContainers, 
      ResourceRequest request, NodeType type) {
    LOG.debug("assignContainers:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " application=" + application.getApplicationId().getId() + 
        " priority=" + priority.getPriority() + 
        " assignableContainers=" + assignableContainers +
        " request=" + request + " type=" + type);
    // 獲取請求的<code>Resource</code>容量。
    Resource capability = request.getCapability();

    int availableContainers = 
      node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy
                                                                        // application
                                                                        // with this
                                                                        // zero would
                                                                        // crash the
                                                                        // scheduler.
    int assignedContainers = 
      Math.min(assignableContainers, availableContainers);

    if (assignedContainers > 0) {
      for (int i=0; i < assignedContainers; ++i) {

        // getNodeID()該節點的節點ID。
        NodeId nodeId = node.getRMNode().getNodeID();
        ContainerId containerId = BuilderUtils.newContainerId(application
            .getApplicationAttemptId(), application.getNewContainerId());

        // Create the container
        // 創建容器
        Container container =
            BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
              .getHttpAddress(), capability, priority, null);
        
        // Allocate!
        // 分配!
        
        // Inform the application
        // 通知應用程序
        RMContainer rmContainer =
            application.allocate(type, node, priority, request, container);
        
        // Inform the node
        // 通知節點
        node.allocateContainer(rmContainer);

        // Update usage for this container
        // 更新此容器的使用
        increaseUsedResources(rmContainer);
      }

    }
    
    return assignedContainers;
  }

  private synchronized void nodeUpdate(RMNode rmNode) {
    FiCaSchedulerNode node = getNode(rmNode.getNodeID());
    
    // 獲取並清除在NM心跳中累積的containerUpdates列表。
    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
    for(UpdatedContainerInfo containerInfo : containerInfoList) {
      //
      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
      //
      completedContainers.addAll(containerInfo.getCompletedContainers());
    }
    // Processing the newly launched containers
    // 處理新發起的容器
    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
    }

    // Process completed containers
    // 處理完成的容器
    for (ContainerStatus completedContainer : completedContainers) {
      ContainerId containerId = completedContainer.getContainerId();
      LOG.debug("Container FINISHED: " + containerId);
      completedContainer(getRMContainer(containerId), 
          completedContainer, RMContainerEventType.FINISHED);
    }


    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }

    if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
            node.getAvailableResource(),minimumAllocation)) {
      LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
          " available resource = " + node.getAvailableResource());

      // 
      assignContainers(node);

      LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
          + node.getAvailableResource());
    }

    updateAvailableResourcesMetrics();
  }

  // 增加使用的資源, 
  private void increaseUsedResources(RMContainer rmContainer) {
    // addTo()把后面的資源添加到前面
    Resources.addTo(usedResource, rmContainer.getAllocatedResource());
  }

  private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
    // 
    schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
      usedResource));
  }

  private void updateAvailableResourcesMetrics() {
    // 設置可用資源。 資源變得可用時由調度器定期調用。
    metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
      usedResource));
  }

  @Override
  public void handle(SchedulerEvent event) {
    switch(event.getType()) {
    case NODE_ADDED:
    {
      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
      addNode(nodeAddedEvent.getAddedRMNode());
      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
        nodeAddedEvent.getAddedRMNode());

    }
    break;
    case NODE_REMOVED:
    {
      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
      removeNode(nodeRemovedEvent.getRemovedRMNode());
    }
    break;
    case NODE_RESOURCE_UPDATE:
    {
      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = 
          (NodeResourceUpdateSchedulerEvent)event;
      updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
        nodeResourceUpdatedEvent.getResourceOption());
    }
    break;
    case NODE_UPDATE:
    {
      NodeUpdateSchedulerEvent nodeUpdatedEvent = 
      (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
    }
    break;
    case APP_ADDED:
    {
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      addApplication(appAddedEvent.getApplicationId(),
        appAddedEvent.getQueue(), appAddedEvent.getUser(),
        appAddedEvent.getIsAppRecovering());
    }
    break;
    case APP_REMOVED:
    {
      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
      doneApplication(appRemovedEvent.getApplicationID(),
        appRemovedEvent.getFinalState());
    }
    break;
    case APP_ATTEMPT_ADDED:
    {
      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
          (AppAttemptAddedSchedulerEvent) event;
      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
        appAttemptAddedEvent.getIsAttemptRecovering());
    }
    break;
    case APP_ATTEMPT_REMOVED:
    {
      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
          (AppAttemptRemovedSchedulerEvent) event;
      try {
        doneApplicationAttempt(
          appAttemptRemovedEvent.getApplicationAttemptID(),
          appAttemptRemovedEvent.getFinalAttemptState(),
          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
      } catch(IOException ie) {
        LOG.error("Unable to remove application "
            + appAttemptRemovedEvent.getApplicationAttemptID(), ie);
      }
    }
    break;
    case CONTAINER_EXPIRED:
    {
      ContainerExpiredSchedulerEvent containerExpiredEvent = 
          (ContainerExpiredSchedulerEvent) event;
      ContainerId containerid = containerExpiredEvent.getContainerId();
      completedContainer(getRMContainer(containerid), 
          SchedulerUtils.createAbnormalContainerStatus(
              containerid, 
              SchedulerUtils.EXPIRED_CONTAINER),
          RMContainerEventType.EXPIRE);
    }
    break;
    case CONTAINER_RESCHEDULED:
    {
      ContainerRescheduledEvent containerRescheduledEvent =
          (ContainerRescheduledEvent) event;
      RMContainer container = containerRescheduledEvent.getContainer();
      recoverResourceRequestForContainer(container);
    }
    break;
    default:
      LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
    }
  }

  // 清理完成的容器
  @Lock(FifoScheduler.class)
  @Override
  protected synchronized void completedContainer(RMContainer rmContainer,
      ContainerStatus containerStatus, RMContainerEventType event) {
    if (rmContainer == null) {
      LOG.info("Null container completed...");
      return;
    }

    // Get the application for the finished container
    // 獲取完成了的容器的應用程序
    Container container = rmContainer.getContainer();
    // 根據容器Id獲取當前應用程序的嘗試
    FiCaSchedulerApp application =
        getCurrentAttemptForContainer(container.getId());
    ApplicationId appId =
        container.getId().getApplicationAttemptId().getApplicationId();
    
    // Get the node on which the container was allocated
    // 獲取分配容器的節點
    FiCaSchedulerNode node = getNode(container.getNodeId());
    
    if (application == null) {
      LOG.info("Unknown application: " + appId + 
          " released container " + container.getId() +
          " on node: " + node + 
          " with event: " + event);
      return;
    }

    // Inform the application
    // 通知應用程序
    application.containerCompleted(rmContainer, containerStatus, event);

    // Inform the node
    // 通知節點  在此節點上釋放分配的容器。
    node.releaseContainer(container);
    
    // Update total usage
    // 更新總的使用情況
    Resources.subtractFrom(usedResource, container.getResource());

    LOG.info("Application attempt " + application.getApplicationAttemptId() + 
        " released container " + container.getId() +
        " on node: " + node + 
        " with event: " + event);
     
  }
  
  private Resource usedResource = recordFactory.newRecordInstance(Resource.class);

  // 移除節點
  private synchronized void removeNode(RMNode nodeInfo) {
    FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
    if (node == null) {
      return;
    }
    // Kill running containers
    // 殺死正在運行的容器
    for(RMContainer container : node.getRunningContainers()) {
      completedContainer(container, 
          SchedulerUtils.createAbnormalContainerStatus(
              container.getContainerId(), 
              SchedulerUtils.LOST_CONTAINER),
              RMContainerEventType.KILL);
    }
    
    //Remove the node
    // 移除節點
    this.nodes.remove(nodeInfo.getNodeID());
    updateMaximumAllocation(node, false);
    
    // Update cluster metrics
    // 
    Resources.subtractFrom(clusterResource, node.getTotalResource());
  }

  @Override
  public QueueInfo getQueueInfo(String queueName,
      boolean includeChildQueues, boolean recursive) {
    return DEFAULT_QUEUE.getQueueInfo(false, false);
  }

  @Override
  public List<QueueUserACLInfo> getQueueUserAclInfo() {
    return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
  }

  @Override
  public ResourceCalculator getResourceCalculator() {
    return resourceCalculator;
  }

  // 添加節點
  private synchronized void addNode(RMNode nodeManager) {
    FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
        usePortForNodeName);
    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
    updateMaximumAllocation(schedulerNode, true);
  }

  @Override
  public void recover(RMState state) {
    // NOT IMPLEMENTED
  }

  @Override
  public RMContainer getRMContainer(ContainerId containerId) {
    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
    return (attempt == null) ? null : attempt.getRMContainer(containerId);
  }

  @Override
  public QueueMetrics getRootQueueMetrics() {
    return DEFAULT_QUEUE.getMetrics();
  }

  // 檢查用戶是否有權執行操作。
  @Override
  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
      QueueACL acl, String queueName) {
    return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
  }

  @Override
  public synchronized List<ApplicationAttemptId>
      getAppsInQueue(String queueName) {
    if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
      List<ApplicationAttemptId> attempts =
          new ArrayList<ApplicationAttemptId>(applications.size());
      for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
        attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
      }
      return attempts;
    } else {
      return null;
    }
  }

  public Resource getUsedResource() {
    return usedResource;
  }
}

Fifo隊列內部的調用過程是handle()函數內部NODE_UPDATE事件觸發nodeUpdate(...)函數; 該函數內部調用assignContainers(...); 該函數內部調用assignContainersOnNode(...); 該函數順序執行assignNodeLocalContainers(...), assignRackLocalContainers(...) 以及 assignOffSwitchContainers(...); 其中這三個函數內部都會順序執行 getResourceRequest(...), getMaxAllocatableContainers(...), assignContainer(...)

(2)  Capacity調度器

  對應的類是 CapacityScheduler.java ,在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java 。

 

(3)  Fair調度器

  對應的類是 FairScheduler.java, 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 。

  這三個調度器類都

 

2  JobTracker中的資源管理和作業控制功能分開,分別由組件ResourceManager和ApplicationMaster實現。

(1)  ResourceManager 負責所有應用程序的資源分配

  對應的類是 ResourceManager.java 在 

 

(2)  ApplicationMaster 僅負責管理一個應用程序

  對應的類是 ApplicationMaster.java 在 

 

另外, NodeManager

 

3  編寫Hadoop調度器  參考如何編寫Hadoop調度器   以及 深入Hadoop的調度器  

  假設我們要編寫一個新的調度器,為MyHadoopScheduler,需要進行以下工作:

  (1) 用戶需要自己實現的類

  

  (2) 用戶要用到的系統類


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM