如要轉載,請注上作者和出處。 由於能力有限,如有錯誤,請大家指正。
須知: 我們下載的是hadoop-2.7.3-src 源碼。 這個版本默認調度器是Capacity調度器。 在2.0.2-alpha版本的時候,有人匯報了一個fifo調度器的bug,社區把默認調度器從原來的fifo切換成capacity了。 參考
在Hadoop中,調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器,然后在配置文件中指定相應的調度器,這樣,當Hadoop集群啟動時,便會加載該調度器。當前Hadoop自帶了幾種調度器,分別是FIFO(默認調度器),Capacity Scheduler和FairScheduler,通常境況下,這些調度器很難滿足公司復雜的應用需求,因而往往需要開發自己的調度器。本文介紹了Hadoop調度器的基本編寫方法, 參考1
Hadoop1 調度框架:Hadoop的調度器是在JobTracker中加載和調用的,用戶可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler屬性中指定調度器。本節分析了Hadoop調度器的調度框架,實際上分析了兩個重要類:TaskScheduler和JobTracker的關系。
Hadoop2 調度框架:Hadoop的調度器是在ResourceManager中加載和調用的,用戶可以在配置文件yarn-site.xml中的yarn.resourcemanager.scheduler.class屬性中指定調度器,默認是 org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler ; 還可以配置Fifo調度器,org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler ; 還可以配置Fair調度器, org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 。 本節分析了Hadoop調度器的調度框架, 類比Hadoop1 , 三個調度器的共同擴展類是 AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> , 它的功能類似Hadoop1的TaskScheduler ; 如果用戶要編寫自己的調度器,需要繼承抽象類AbstractYarnScheduler。
MapReduce在Hadoop2中稱為MR2或YARN,將JobTracker中的資源管理及任務生命周期管理(包括定時觸發及監控),拆分成兩個獨立的服務,用於管理全部資源的ResourceManager以及管理每個應用的ApplicationMaster,ResourceManager用於管理向應用程序分配計算資源,每個ApplicationMaster用於管理應用程序、調度以及協調。一個應用程序可以是經典的MapReduce架構中的一個單獨的任務,也可以是這些任務的一個DAG(有向無環圖)任務。ResourceManager及每台機上的NodeManager服務,用於管理那台機的用戶進程,形成計算架構。每個應用程序的ApplicationMaster實際上是一個框架具體庫,並負責從ResourceManager中協調資源及與NodeManager(s)協作執行並監控任務。 參考2
針對Hadoop 1.0中的MapReduce在擴展性和多框架支持等方面的不足,它將JobTracker中的資源管理和作業控制功能分開,分別由組件ResourceManager和ApplicationMaster實現,其中,ResourceManager負責所有應用程序的資源分配,而ApplicationMaster僅負責管理一個應用程序,進而誕生了全新的通用資源管理框架YARN。基於YARN,用戶可以運行各種類型的應用程序(不再像1.0那樣僅局限於MapReduce一類應用),從離線計算的MapReduce到在線計算(流式處理)的Storm等。Hadoop 2.0對應Hadoop版本為Apache Hadoop 0.23.x、2.x和CDH4。
架構圖:
其中ResourceManager包含兩個主要的組件:定時調用器(Scheduler)以及應用管理器(ApplicationManager)。
定時調用器(Scheduler): 定時調度器負責向應用程序分配置資源,它不做監控以及應用程序的狀 態跟蹤,並且它不保證會重啟由於應用程序本身或硬件出錯而執行失敗 的應用程序。
應用管理器(ApplicationManager): 應用程序管理器負責接收新任務,協調並提供在ApplicationMaster容 器失敗時的重啟功能。
節點管理器(NodeManager): NodeManager是ResourceManager在每台機器的上代理,負責容器的管 理,並監控他們的資源使用情況(cpu,內存,磁盤及網絡等),以及向 ResourceManager/Scheduler提供這些資源使用報告。
應用總管(ApplicationMaster): 每個應用程序的ApplicationMaster負責從Scheduler申請資源,以及 跟蹤這些資源的使用情況以及任務進度的監控。
1 調度器
我們先想分析調度器,首先要分析它的父類,以及父類的父類和實現接口,如 AbstractService, YarnScheduler, ResourceScheduler 以及 AbstractYarnScheduler, 如下所示:
AbstractService.java 在 hadoop-2.7.3-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/AbstractService.java
package org.apache.hadoop.service; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import com.google.common.annotations.VisibleForTesting; /** * This is the base implementation class for services. */ //這是服務的基本實現類。 @Public @Evolving public abstract class AbstractService implements Service { private static final Log LOG = LogFactory.getLog(AbstractService.class); /** * Service name. */ //服務名稱 private final String name; /** service state */ //服務狀態 private final ServiceStateModel stateModel; /** * Service start time. Will be zero until the service is started. */ //服務開始時間。在服務開始之前為0。 private long startTime; /** * The configuration. Will be null until the service is initialized. */ //配置。在服務初始化之前為null。 private volatile Configuration config; /** * List of state change listeners; it is final to ensure * that it will never be null. */ //狀態更改偵聽器列表;最終確保它不為null。 private final ServiceOperations.ServiceListeners listeners = new ServiceOperations.ServiceListeners(); /** * Static listeners to all events across all services */ //所有服務的所有事件的靜態監聽器 private static ServiceOperations.ServiceListeners globalListeners = new ServiceOperations.ServiceListeners(); /** * The cause of any failure -will be null. * if a service did not stop due to a failure. */ //任何失敗的原因 - 是因為null。 如果服務沒有因為故障停止。 private Exception failureCause; /** * the state in which the service was when it failed. * Only valid when the service is stopped due to a failure */ //服務失敗時的狀態。僅當服務由於失敗而停止時才有效。 private STATE failureState = null; /** * object used to co-ordinate {@link #waitForServiceToStop(long)} * across threads. */ //對象用於協調 {@link #waitForServiceToStop(long)} 跨線程。 private final AtomicBoolean terminationNotification = new AtomicBoolean(false); /** * History of lifecycle transitions */ //生命周期轉換的歷史 private final List<LifecycleEvent> lifecycleHistory = new ArrayList<LifecycleEvent>(5); /** * Map of blocking dependencies */ //阻止依賴關系的映射 private final Map<String,String> blockerMap = new HashMap<String, String>(); private final Object stateChangeLock = new Object(); /** * Construct the service. * @param name service name */ //構造服務 public AbstractService(String name) { this.name = name; stateModel = new ServiceStateModel(name); } /* * 獲取當前的服務狀態。 * 返回:服務的狀態 */ @Override public final STATE getServiceState() { return stateModel.getState(); } /* * 獲取服務失敗時引發的第一個異常。 如果為空,則不記錄任何異常 * 返回:在轉換到停止狀態期間日志記錄的故障 */ @Override public final synchronized Throwable getFailureCause() { return failureCause; } /* * 獲取發生在{@link #getFailureCause()}中失敗的狀態。 * 返回:狀態,如果沒有失敗,則為null */ @Override public synchronized STATE getFailureState() { return failureState; } /** * Set the configuration for this service. * This method is called during {@link #init(Configuration)} * and should only be needed if for some reason a service implementation * needs to override that initial setting -for example replacing * it with a new subclass of {@link Configuration} * @param conf new configuration. */ /* * 設置此服務的配置。當{@link #init(Configuration)}時該方法會被調用並且 * 只有在某些原因出現,服務實現需要覆蓋該初始設置的情況下才需要這樣做 - 例如 * 用{@link Configuration}的新子類替換它。 */ protected void setConfig(Configuration conf) { this.config = conf; } /** * {@inheritDoc} * This invokes {@link #serviceInit} * @param conf the configuration of the service. This must not be null * @throws ServiceStateException if the configuration was null, * the state change not permitted, or something else went wrong */ //這將調用{@link #serviceInit} //子類的serviceInit會初始化所需服務,會創建相應的服務類然后加入服務列表 @Override public void init(Configuration conf) { //服務配置是否為空 if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } //服務是否已經初始化 if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { //服務初始化,會進入子類的同名函數 serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } } /** * {@inheritDoc} * @throws ServiceStateException if the current service state does not permit * this action */ //開始服務 @Override public void start() { if (isInState(STATE.STARTED)) { return; } //enter the started state synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { startTime = System.currentTimeMillis(); serviceStart(); if (isInState(STATE.STARTED)) { //if the service started (and isn't now in a later state), notify if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } } /** * {@inheritDoc} */ //停止服務 @Override public void stop() { if (isInState(STATE.STOPPED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.STOPPED) != STATE.STOPPED) { try { serviceStop(); } catch (Exception e) { //stop-time exceptions are logged if they are the first one, noteFailure(e); throw ServiceStateException.convert(e); } finally { //report that the service has terminated terminationNotification.set(true); synchronized (terminationNotification) { terminationNotification.notifyAll(); } //notify anything listening for events notifyListeners(); } } else { //already stopped: note it if (LOG.isDebugEnabled()) { LOG.debug("Ignoring re-entrant call to stop()"); } } } } /** * Relay to {@link #stop()} * @throws IOException */ @Override public final void close() throws IOException { stop(); } /** * Failure handling: record the exception * that triggered it -if there was not one already. * Services are free to call this themselves. * @param exception the exception */ /* * 故障處理:記錄觸發它的異常 - 如果還沒有一個。 服務可以自由調用。 */ protected final void noteFailure(Exception exception) { if (LOG.isDebugEnabled()) { LOG.debug("noteFailure " + exception, null); } if (exception == null) { //make sure failure logic doesn't itself cause problems return; } //record the failure details, and log it //記錄故障細節,並記錄日志 synchronized (this) { if (failureCause == null) { failureCause = exception; failureState = getServiceState(); LOG.info("Service " + getName() + " failed in state " + failureState + "; cause: " + exception, exception); } } } /* * 阻止等待服務停止; 使用終止通知對象這樣做。 * 該方法只有在執行所有服務停止操作(成功或失敗)之后才返回,或超時已過 * 該方法可以在服務初始化或啟動之前調用; 這是為了消除任何競爭條件,服務在此事件發生之前停止。 */ @Override public final boolean waitForServiceToStop(long timeout) { boolean completed = terminationNotification.get(); while (!completed) { try { synchronized(terminationNotification) { terminationNotification.wait(timeout); } // here there has been a timeout, the object has terminated, // or there has been a spurious wakeup (which we ignore) //這里有一個超時,對象已經終止了,或者有一個虛假的喚醒(我們忽略) completed = true; } catch (InterruptedException e) { // interrupted; have another look at the flag completed = terminationNotification.get(); } } return terminationNotification.get(); } /* ===================================================================== */ /* Override Points */ /* ===================================================================== */ /** * All initialization code needed by a service. * * This method will only ever be called once during the lifecycle of * a specific service instance. * * Implementations do not need to be synchronized as the logic * in {@link #init(Configuration)} prevents re-entrancy. * * The base implementation checks to see if the subclass has created * a new configuration instance, and if so, updates the base class value * @param conf configuration * @throws Exception on a failure -these will be caught, * possibly wrapped, and wil; trigger a service stop */ /* * 服務所需的所有初始化代碼。 * 該方法只能在特定服務實例的生命周期中被調用一次。 * 實現不需要同步機制,因為{@link #init(Configuration))中的邏輯可以防止重新進入。 * 基本實現檢查子類是否已創建新的配置實例,如果是,則更新基類值。 */ protected void serviceInit(Configuration conf) throws Exception { if (conf != config) { LOG.debug("Config has been overridden during init"); setConfig(conf); } } /** * Actions called during the INITED to STARTED transition. * * This method will only ever be called once during the lifecycle of * a specific service instance. * * Implementations do not need to be synchronized as the logic * in {@link #start()} prevents re-entrancy. * * @throws Exception if needed -these will be caught, * wrapped, and trigger a service stop */ /* * 在INITED到STARTED過渡期間所采取的行動。 * 該方法只能在特定服務實例的生命周期中被調用一次。 * 實現不需要同步機制,因為{@link #start()}中的邏輯可以防止重新進入。 */ protected void serviceStart() throws Exception { } /** * Actions called during the transition to the STOPPED state. * * This method will only ever be called once during the lifecycle of * a specific service instance. * * Implementations do not need to be synchronized as the logic * in {@link #stop()} prevents re-entrancy. * * Implementations MUST write this to be robust against failures, including * checks for null references -and for the first failure to not stop other * attempts to shut down parts of the service. * * @throws Exception if needed -these will be caught and logged. */ /* * 在轉換到STOPPED狀態期間調用的動作。 * 該方法只能在特定服務實例的生命周期中被調用一次。 * 實現不需要同步機制,因為{@link #stop()}中的邏輯可以防止重入。 * 實現MUST寫入這個要健壯來避免失敗, 包括對空引用的檢查,以及第一個不能停止其他嘗試關閉部分服務的失敗。 */ protected void serviceStop() throws Exception { } /* * 將監聽器注冊到服務狀態更改事件。 * 如果提供的偵聽器已經在監聽此服務,則此方法是無效的。 * 參數 l 表示:一個新的監聽器 */ @Override public void registerServiceListener(ServiceStateChangeListener l) { listeners.add(l); } /* * 取消注冊先前注冊的服務狀態更改事件的偵聽器。 如果監聽器已經被注銷,則不用操作。 * 參數 l 表示:要注銷的監聽器 */ @Override public void unregisterServiceListener(ServiceStateChangeListener l) { listeners.remove(l); } /** * Register a global listener, which receives notifications * from the state change events of all services in the JVM * @param l listener */ //注冊一個全局監聽器,它從JVM中所有服務的狀態更改事件接收通知 public static void registerGlobalListener(ServiceStateChangeListener l) { globalListeners.add(l); } /** * unregister a global listener. * @param l listener to unregister * @return true if the listener was found (and then deleted) */ //取消注冊全局監聽器。 public static boolean unregisterGlobalListener(ServiceStateChangeListener l) { return globalListeners.remove(l); } /** * Package-scoped method for testing -resets the global listener list */ //用於測試的程序包范圍的方法 - 重新設置全局偵聽器列表 @VisibleForTesting static void resetGlobalListeners() { globalListeners.reset(); } /* * 獲取服務的名稱。 * 返回:服務的名稱 */ @Override public String getName() { return name; } /* * 獲取該服務的配置信息。 * 這通常不是一個克隆,並且可能被操縱,盡管不能保證這種行為的后果可能如何 * 返回:當前配置,除非具體實現選擇。 */ @Override public synchronized Configuration getConfig() { return config; } /* * 獲取服務的開始時間。 * 返回:服務的開始時間。 如果服務尚未啟動,則為零。 */ @Override public long getStartTime() { return startTime; } /** * Notify local and global listeners of state changes. * Exceptions raised by listeners are NOT passed up. */ //通知本地和全局監聽器的狀態變化。監聽器提出的異常情況不會被傳遞。 private void notifyListeners() { try { listeners.notifyListeners(this); globalListeners.notifyListeners(this); } catch (Throwable e) { LOG.warn("Exception while notifying listeners of " + this + ": " + e, e); } } /** * Add a state change event to the lifecycle history */ //將狀態更改事件添加到生命周期歷史記錄 private void recordLifecycleEvent() { LifecycleEvent event = new LifecycleEvent(); event.time = System.currentTimeMillis(); event.state = getServiceState(); lifecycleHistory.add(event); } /* * 獲取生命周期歷史的快照; 它是一個靜態列表 * 返回:一個可能是empty的但從不是null的生命周期事件列表。 */ @Override public synchronized List<LifecycleEvent> getLifecycleHistory() { return new ArrayList<LifecycleEvent>(lifecycleHistory); } /** * Enter a state; record this via {@link #recordLifecycleEvent} * and log at the info level. * @param newState the proposed new state * @return the original state * it wasn't already in that state, and the state model permits state re-entrancy. */ /* * 輸入狀態; 記錄這個通過{@link #recordLifecycleEvent}並以信息級別記錄在日志。 * 參數 newState 表示 提出新的狀態 * 返回:原來的狀態還沒有在這個狀態,狀態模式允許狀態重新進入。 */ private STATE enterState(STATE newState) { assert stateModel != null : "null state in " + name + " " + this.getClass(); STATE oldState = stateModel.enterState(newState); if (oldState != newState) { if (LOG.isDebugEnabled()) { LOG.debug( "Service: " + getName() + " entered state " + getServiceState()); } recordLifecycleEvent(); } return oldState; } /* * 查詢狀態是否處於特定狀態 * 參數 表示提出新的狀態 */ @Override public final boolean isInState(Service.STATE expected) { return stateModel.isInState(expected); } @Override public String toString() { return "Service " + name + " in state " + stateModel; } /** * Put a blocker to the blocker map -replacing any * with the same name. * @param name blocker name * @param details any specifics on the block. This must be non-null. */ /* * 將攔截器放在攔截器map上 - 重新放置任何具有相同名稱的。 * 參數 name 表示:攔截器名稱 * 參數 details 表示:詳細說明塊上的細節。 這必須是非空。 */ protected void putBlocker(String name, String details) { synchronized (blockerMap) { blockerMap.put(name, details); } } /** * Remove a blocker from the blocker map - * this is a no-op if the blocker is not present * @param name the name of the blocker */ /* * 從攔截器map中移除一個攔截器 - 如果攔截器不存在,這是空操作 * 參數 name 表示:攔截器的名稱 */ public void removeBlocker(String name) { synchronized (blockerMap) { blockerMap.remove(name); } } /* * 獲取一個服務的攔截器 - 遠程依賴關系,使服務不再是<i>live</i>。 * 返回:一個攔截器名稱-&gt的(快照)map;描述值 */ @Override public Map<String, String> getBlockers() { synchronized (blockerMap) { Map<String, String> map = new HashMap<String, String>(blockerMap); return map; } } }
YarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; import java.util.EnumSet; import java.util.List; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** * This interface is used by the components to talk to the * scheduler for allocating of resources, cleaning up resources. * */ //該接口用於組件與調度器對話以分配資源、清理資源。 public interface YarnScheduler extends EventHandler<SchedulerEvent> { /** * Get queue information * @param queueName queue name * @param includeChildQueues include child queues? * @param recursive get children queues? * @return queue information * @throws IOException */ /* 獲取隊列信息 參數queueName 表示隊列名稱 參數includeChildQueues 表示是否包含子隊列 參數recursive 表示遞歸得到孩子隊列? 返回QueueInfo 隊列信息 */ @Public @Stable public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, boolean recursive) throws IOException; /** * Get acls for queues for current user. * @return acls for queues for current user */ /* 獲取當前用戶的隊列的訪問控制列表(acls) */ @Public @Stable public List<QueueUserACLInfo> getQueueUserAclInfo(); /** * Get the whole resource capacity of the cluster. * @return the whole resource capacity of the cluster. */ /* 獲取集群的全部資源容量。 返回:集群的全部資源容量 */ @LimitedPrivate("yarn") @Unstable public Resource getClusterResource(); /** * Get minimum allocatable {@link Resource}. * @return minimum allocatable resource */ /* 獲取最小可分配資源。 返回:最小可分配資源。 */ @Public @Stable public Resource getMinimumResourceCapability(); /** * Get maximum allocatable {@link Resource} at the cluster level. * @return maximum allocatable resource */ /* 獲得最大的可分配的資源在集群級別。 返回:最大的可分配的資源 */ @Public @Stable public Resource getMaximumResourceCapability(); /** * Get maximum allocatable {@link Resource} for the queue specified. * @param queueName queue name * @return maximum allocatable resource */ /* 獲取指定隊列的最大可分配資源。 參數queueName 指隊列名 返回:最大可分配資源 */ @Public @Stable public Resource getMaximumResourceCapability(String queueName); @LimitedPrivate("yarn") @Evolving ResourceCalculator getResourceCalculator(); /** * Get the number of nodes available in the cluster. * @return the number of available nodes. */ /* 獲取集群中可用節點的數目。 返回:可用節點的數目 */ @Public @Stable public int getNumClusterNodes(); /** * The main api between the ApplicationMaster and the Scheduler. * The ApplicationMaster is updating his future resource requirements * and may release containers he doens't need. * * @param appAttemptId * @param ask * @param release * @param blacklistAdditions * @param blacklistRemovals * @return the {@link Allocation} for the application */ /* ApplicationMaster 和 Scheduler 之間的主要接口。ApplicationMaster 正在更新它的將來的資源需求以及可能釋放它不需要的 containers 。 返回:應用程序的 {@link Allocation} */ @Public @Stable Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals); /** * Get node resource usage report. * @param nodeId * @return the {@link SchedulerNodeReport} for the node or null * if nodeId does not point to a defined node. */ /* 獲取節點資源使用報告。 返回:節點的 {@link SchedulerNodeReport} ;或者null,當nodeId沒有指向一個已經定義的節點。 */ @LimitedPrivate("yarn") @Stable public SchedulerNodeReport getNodeReport(NodeId nodeId); /** * Get the Scheduler app for a given app attempt Id. * @param appAttemptId the id of the application attempt * @return SchedulerApp for this given attempt. */ /* 獲取調度器應用程序,通過一個應用程序的嘗試Id。 參數appAttemptId 應用程序嘗試的id 返回:對這個給定的嘗試返回 SchedulerApp */ @LimitedPrivate("yarn") @Stable SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId); /** * Get a resource usage report from a given app attempt ID. * @param appAttemptId the id of the application attempt * @return resource usage report for this given attempt */ /* 從給定的應用程序嘗試ID獲取資源使用報告。 參數appAttemptId表示應用程序嘗試的id 返回:給定的嘗試的資源使用報告 */ @LimitedPrivate("yarn") @Evolving ApplicationResourceUsageReport getAppResourceUsageReport( ApplicationAttemptId appAttemptId); /** * Get the root queue for the scheduler. * @return the root queue for the scheduler. */ /* 獲取調度器的根隊列。 返回:度器的根隊列 */ @LimitedPrivate("yarn") @Evolving QueueMetrics getRootQueueMetrics(); /** * Check if the user has permission to perform the operation. * If the user has {@link QueueACL#ADMINISTER_QUEUE} permission, * this user can view/modify the applications in this queue * @param callerUGI * @param acl * @param queueName * @return <code>true</code> if the user has the permission, * <code>false</code> otherwise */ /* 檢查用戶是否具有執行操作的權限。如果用戶有{@link QueueACL#ADMINISTER_QUEUE}這樣的權限,這個用戶就可以查看和修改這個隊列里的應用程序。 返回:<code>true</code>表示用戶有這樣的權限, 其它返回 <code>false</code> */ boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName); /** * Gets the apps under a given queue * @param queueName the name of the queue. * @return a collection of app attempt ids in the given queue. */ /* 獲取給定隊列下的應用程序。 參數 queueName指隊列的名稱 返回:給定隊列的應用程序嘗試的id的集合 */ @LimitedPrivate("yarn") @Stable public List<ApplicationAttemptId> getAppsInQueue(String queueName); /** * Get the container for the given containerId. * @param containerId * @return the container for the given containerId. */ /* 獲得給定containerId的容器。 返回:給定containerId的容器 */ @LimitedPrivate("yarn") @Unstable public RMContainer getRMContainer(ContainerId containerId); /** * Moves the given application to the given queue * @param appId * @param newQueue * @return the name of the queue the application was placed into * @throws YarnException if the move cannot be carried out */ /* 將給定的應用程序移動到給定的隊列。 返回:返回應用程序被放置的隊列的名稱 拋出YarnException異常,當移動不能進行的時候 */ @LimitedPrivate("yarn") @Evolving public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; /** * Completely drain sourceQueue of applications, by moving all of them to * destQueue. * * @param sourceQueue * @param destQueue * @throws YarnException */ /* 應用程序完全用完sourceQueue, 通過把它們都移動到destQueue */ void moveAllApps(String sourceQueue, String destQueue) throws YarnException; /** * Terminate all applications in the specified queue. * * @param queueName the name of queue to be drained * @throws YarnException */ /* 終止指定隊列中的所有應用程序。 參數queueName指資源被用完的隊列名稱 */ void killAllAppsInQueue(String queueName) throws YarnException; /** * Remove an existing queue. Implementations might limit when a queue could be * removed (e.g., must have zero entitlement, and no applications running, or * must be a leaf, etc..). * * @param queueName name of the queue to remove * @throws YarnException */ /* 刪除現有隊列。當隊列可以被移除時,實現可能會受到限制 (例如,必須有零個授權,並且沒有應用程序運行,或必須是葉子,等。) 參數 queueName指要刪除的隊列名 */ void removeQueue(String queueName) throws YarnException; /** * Add to the scheduler a new Queue. Implementations might limit what type of * queues can be dynamically added (e.g., Queue must be a leaf, must be * attached to existing parent, must have zero entitlement). * * @param newQueue the queue being added. * @throws YarnException */ /* 給調度器添加一個新隊列。實現可能會限制哪種類型的隊列能夠動態添加(例如,隊列必須是一個葉子,必須依附於現有的父級,必須有零的授權) */ void addQueue(Queue newQueue) throws YarnException; /** * This method increase the entitlement for current queue (must respect * invariants, e.g., no overcommit of parents, non negative, etc.). * Entitlement is a general term for weights in FairScheduler, capacity for * the CapacityScheduler, etc. * * @param queue the queue for which we change entitlement * @param entitlement the new entitlement for the queue (capacity, * maxCapacity, etc..) * @throws YarnException */ /* 此方法增加當前隊列的權限(必須遵守不變量,例如,沒有過度使用的雙親,非負,等等。)。 */ void setEntitlement(String queue, QueueEntitlement entitlement) throws YarnException; /** * Gets the list of names for queues managed by the Reservation System * @return the list of queues which support reservations */ /* 獲取由預訂系統管理的隊列的名稱列表。 返回:支持預定的隊列列表 */ public Set<String> getPlanQueues() throws YarnException; /** * Return a collection of the resource types that are considered when * scheduling * * @return an EnumSet containing the resource types */ /* 返回調度時所考慮的資源類型的集合 返回:返回一個EnumSet包含資源類型 */ public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes(); }
ResourceScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; /** * This interface is the one implemented by the schedulers. It mainly extends * {@link YarnScheduler}. * */ //這個接口是由調度器實現的。它主要擴展{@link YarnScheduler}. @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends YarnScheduler, Recoverable { /** * Set RMContext for <code>ResourceScheduler</code>. * This method should be called immediately after instantiating * a scheduler once. * @param rmContext created by ResourceManager */ /* * 為<code>ResourceScheduler</code>設置RMContext。 * 一旦實例化一個scheduler, 該方法應該立刻被調用。 * 參數: rmContext 被 ResourceManager 創建 */ void setRMContext(RMContext rmContext); /** * Re-initialize the <code>ResourceScheduler</code>. * @param conf configuration * @throws IOException */ /* * 重新初始化<code>ResourceScheduler</code>. * 參數conf表示配置 */ void reinitialize(Configuration conf, RMContext rmContext) throws IOException; }
AbstractYarnScheduler.java 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.util.concurrent.SettableFuture; @SuppressWarnings("unchecked") @Private @Unstable public abstract class AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> extends AbstractService implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); // Nodes in the cluster, indexed by NodeId // 在集群中的節點,用NodeId索引 protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>(); // Whole capacity of the cluster // 集群全部容量 protected Resource clusterResource = Resource.newInstance(0, 0); protected Resource minimumAllocation; private Resource maximumAllocation; private Resource configuredMaximumAllocation; private int maxNodeMemory = -1; private int maxNodeVCores = -1; private final ReadLock maxAllocReadLock; private final WriteLock maxAllocWriteLock; private boolean useConfiguredMaximumAllocationOnly = true; private long configuredMaximumAllocationWaitTime; protected RMContext rmContext; /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. */ // 所有繼承自AbstractYarnScheduler的調度器應該使用並行版本的'applications' map protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications; protected int nmExpireInterval; protected final static List<Container> EMPTY_CONTAINER_LIST = new ArrayList<Container>(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); /** * Construct the service. * * @param name service name */ /* * 構造服務 * 參數name表示服務名 */ public AbstractYarnScheduler(String name) { super(name); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.maxAllocReadLock = lock.readLock(); this.maxAllocWriteLock = lock.writeLock(); } // 服務所需的所有初始化代碼。 @Override public void serviceInit(Configuration conf) throws Exception { // getInt()表示獲取<code> name </code>屬性的值作為<code> int </code>。 // 如果沒有這樣的屬性,返回提供的默認值,或者如果指定的值不是有效的<code> int </ code>,那么會拋出一個錯誤。 // 第一個參數是String name,第二個參數int defaultValue // DEFAULT_RM_NM_EXPIRY_INTERVAL_MS指節點管理器被認為死所要的等待的時間,默認為600000ms。 nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); // 獲取<code> name </code>屬性的值作為<code> long </code>。 // 如果沒有這樣的屬性,返回所提供的默認值,或者如果指定的值不是有效的<code> long </ code>,則會拋出錯誤。 // 第一個參數是String name,第二個參數long defaultValue // DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS指,默認為10000ms。 configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); // createReleaseCache(); super.serviceInit(conf); } public List<Container> getTransferredContainers( ApplicationAttemptId currentAttempt) { // 從<code>ApplicationAttempId</code>中獲取<code>ApplicationId</code> ApplicationId appId = currentAttempt.getApplicationId(); // 調用的get()函數是Map類,返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。 SchedulerApplication<T> app = applications.get(appId); // 構造一個初始容量為十的空列表。它只接收Container類型 List<Container> containerList = new ArrayList<Container>(); // rmContext是接口RMContext的對象,而該接口只有一個實現類RMContextImpl, // rmContext.getRMApps()返回ConcurrentMap<ApplicationId, RMApp> // rmContext.getRMApps().get(appId)調用的是Map類的get()函數。 RMApp appImpl = this.rmContext.getRMApps().get(appId); // appImpl是接口RMApp對象, // appImpl.getApplicationSubmissionContext()此{@link RMApp}的應用程序提交上下文,返回ApplicationSubmissionContext // getUnmanagedAM()獲取是否RM應該管理AM的執行。 if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { return containerList; } if (app == null) { return containerList; } // getLiveContainers()獲取應用程序的活動容器並返回。 Collection<RMContainer> liveContainers = app.getCurrentAppAttempt().getLiveContainers(); // getMasterContainer()是供Application Master運行的容器, // Container類的getId()獲取容器的全局唯一標識符。 // 最后獲取的是Application Master的容器Id ContainerId amContainerId = rmContext.getRMApps().get(appId).getCurrentAppAttempt() .getMasterContainer().getId(); for (RMContainer rmContainer : liveContainers) { // 判斷當前的Id是否是Application Master的容器Id if (!rmContainer.getContainerId().equals(amContainerId)) { // 不相等,則往容器列表中添加容器 containerList.add(rmContainer.getContainer()); } } return containerList; } public Map<ApplicationId, SchedulerApplication<T>> getSchedulerApplications() { return applications; } // 獲取集群的整個資源容量。 @Override public Resource getClusterResource() { return clusterResource; } // 獲取最小可分配{@link Resource}。 @Override public Resource getMinimumResourceCapability() { return minimumAllocation; } // 在集群級別獲取最大可分配{@link Resource}。 @Override public Resource getMaximumResourceCapability() { Resource maxResource; maxAllocReadLock.lock(); try { // 類最開始定義useConfiguredMaximumAllocationOnly為true if (useConfiguredMaximumAllocationOnly) { // System.currentTimeMillis()產生一個當前的毫秒,這個毫秒其實就是自1970年1月1日0時起的毫秒數 // ResourceManager.getClusterTimeStamp()調用的也是System.currentTimeMillis(), // configuredMaximumAllocationWaitTime默認值為10000ms if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() > configuredMaximumAllocationWaitTime) { useConfiguredMaximumAllocationOnly = false; //設為false } // 克隆一份資源 maxResource = Resources.clone(configuredMaximumAllocation); } else { maxResource = Resources.clone(maximumAllocation); } } finally { maxAllocReadLock.unlock(); } return maxResource; } // @Override public Resource getMaximumResourceCapability(String queueName) { return getMaximumResourceCapability(); } // 初始化最大資源容量 protected void initMaximumResourceCapability(Resource maximumAllocation) { maxAllocWriteLock.lock(); try { if (this.configuredMaximumAllocation == null) { // 克隆資源 this.configuredMaximumAllocation = Resources.clone(maximumAllocation); this.maximumAllocation = Resources.clone(maximumAllocation); } } finally { maxAllocWriteLock.unlock(); } } // protected synchronized void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { // Get the application for the finished container // 獲取完成了的容器的應用程序 SchedulerApplicationAttempt application = getCurrentAttemptForContainer (containerId); if (application == null) { // getApplicationAttemptId()獲取分配了<code> Container </code>的應用程序的<code> ApplicationAttemptId </code>。 // getApplicationId() 獲取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。 LOG.info("Unknown application " + containerId.getApplicationAttemptId().getApplicationId() + " launched container " + containerId + " on node: " + node); // rmContext是接口RMContext的對象, rmContext.getDispatcher()返回接口Dispatcher的對象, // rmContext.getDispatcher().getEventHandler()返回接口EventHandler對象, 最后調用EventHandler的handle()方法 // RMNodeCleanContainerEvent表示資源管理器節點清除容器事件,構造函數內部有RMNodeEventType.CLEANUP_CONTAINER this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; } application.containerLaunchedOnNode(containerId, node.getNodeID()); } // public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { // getApplicationId() 獲取<code> ApplicationAttempId </code>的<code> ApplicationId </code>。 // 調用的get()函數是Map類,返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。 SchedulerApplication<T> app = applications.get(applicationAttemptId.getApplicationId()); // getCurrentAppAttempt()返回的是SchedulerApplicationAttempt類對象 return app == null ? null : app.getCurrentAppAttempt(); } // 從給定應用程序嘗試Id中獲取調度器應用程序 @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); if (attempt == null) { if (LOG.isDebugEnabled()) { LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); } return null; } // SchedulerAppReport類 表示應用程序嘗試,以及嘗試使用的資源。 return new SchedulerAppReport(attempt); } // 從給定的應用程序嘗試ID獲取資源使用情況報告。 @Override public ApplicationResourceUsageReport getAppResourceUsageReport( ApplicationAttemptId appAttemptId) { SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); if (attempt == null) { if (LOG.isDebugEnabled()) { LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); } return null; } // return attempt.getResourceUsageReport(); } // 根據容器Id獲取當前應用程序的嘗試 public T getCurrentAttemptForContainer(ContainerId containerId) { return getApplicationAttempt(containerId.getApplicationAttemptId()); } // 獲取給定containerId的容器。 @Override public RMContainer getRMContainer(ContainerId containerId) { SchedulerApplicationAttempt attempt = getCurrentAttemptForContainer(containerId); // getRMContainer()方法表示獲取資源管理器容器 return (attempt == null) ? null : attempt.getRMContainer(containerId); } // 獲取節點資源使用情況報告。 @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { // Map類方法get() N node = nodes.get(nodeId); // SchedulerNodeReport類表示節點使用報告 return node == null ? null : new SchedulerNodeReport(node); } // 將給定的應用程序移動到給定的隊列 @Override public String moveApplication(ApplicationId appId, String newQueue) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support moving apps between queues"); } // 移除一個已有的隊列 public void removeQueue(String queueName) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support removing queues"); } // 把一個新隊列添加到調度器。 @Override public void addQueue(Queue newQueue) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support this operation"); } // 此方法增加了當前隊列的權限 @Override public void setEntitlement(String queue, QueueEntitlement entitlement) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support this operation"); } //在節點上殺死孤立容器 private void killOrphanContainerOnNode(RMNode node, NMContainerStatus container) { // getContainerState()獲取容器的狀態 // Enum類的equals()函數表示 如果指定的對象等於此枚舉常量,則返回true。否則false。 // ContainerState類表示容器的狀態,有三種NEW, RUNNING, COMPLETE。COMPLETE表示完成的容器。 if (!container.getContainerState().equals(ContainerState.COMPLETE)) { // 在本類的containerLaunchedOnNode()函數中有一樣的,略 this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeCleanContainerEvent(node.getNodeID(), container.getContainerId())); } } // 在節點上恢復容器 public synchronized void recoverContainersOnNode( List<NMContainerStatus> containerReports, RMNode nm) { if (!rmContext.isWorkPreservingRecoveryEnabled() || containerReports == null || (containerReports != null && containerReports.isEmpty())) { return; } for (NMContainerStatus container : containerReports) { /* * container.getContainerId()獲取容器的<code> ContainerId </code>。 * getApplicationAttemptId() 獲取分配了<code> Container </code>的應用程序的<code> ApplicationAttemptId </code>。 * getApplicationId() 獲取<code> ApplicationAttempId </ code>的<code> ApplicationId </code>。 */ ApplicationId appId = container.getContainerId().getApplicationAttemptId().getApplicationId(); // RMApp rmApp = rmContext.getRMApps().get(appId); if (rmApp == null) { LOG.error("Skip recovering container " + container + " for unknown application."); killOrphanContainerOnNode(nm, container); continue; } // Unmanaged AM recovery is addressed in YARN-1815 // 未經管理的AM恢復在YARN-1815中得到解決 // rmApp.getApplicationSubmissionContext()函數表示{@link RMApp}的應用程序提交上下文 // getUnmanagedAM()獲取是否RM應該管理AM的執行。如果為真,則RM不會為AM分配容器並啟動它。 if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { LOG.info("Skip recovering container " + container + " for unmanaged AM." + rmApp.getApplicationId()); killOrphanContainerOnNode(nm, container); continue; } //Map類的get()函數 SchedulerApplication<T> schedulerApp = applications.get(appId); if (schedulerApp == null) { //rmApp.getState()表示{@link RMApp}的當前狀態。 LOG.info("Skip recovering container " + container + " for unknown SchedulerApplication. Application current state is " + rmApp.getState()); killOrphanContainerOnNode(nm, container); continue; } LOG.info("Recovering container " + container); SchedulerApplicationAttempt schedulerAttempt = schedulerApp.getCurrentAppAttempt(); // getKeepContainersAcrossApplicationAttempts()函數 獲取指示是否在應用程序嘗試中保留容器的標志 if (!rmApp.getApplicationSubmissionContext() .getKeepContainersAcrossApplicationAttempts()) { // Do not recover containers for stopped attempt or previous attempt. // 不要因為停止了的嘗試或以前的嘗試恢復容器。 if (schedulerAttempt.isStopped() || !schedulerAttempt.getApplicationAttemptId().equals( container.getContainerId().getApplicationAttemptId())) { LOG.info("Skip recovering container " + container + " for already stopped attempt."); killOrphanContainerOnNode(nm, container); continue; } } // create container // 創建容器 RMContainer rmContainer = recoverAndCreateContainer(container, nm); // recover RMContainer // 恢復 RMContainer rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), container)); // recover scheduler node // 恢復調度器節點 nodes.get(nm.getNodeID()).recoverContainer(rmContainer); // recover queue: update headroom etc. // 恢復隊列:更新凈空等等 Queue queue = schedulerAttempt.getQueue(); queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); // recover scheduler attempt // 恢復調度器嘗試 schedulerAttempt.recoverContainer(rmContainer); // set master container for the current running AMContainer for this // attempt. // 為這個嘗試 為當前運行的AMContainer設置主容器 RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); if (appAttempt != null) { // getMasterContainer()函數表示 ApplicationMaster運行在其上的容器 Container masterContainer = appAttempt.getMasterContainer(); // Mark current running AMContainer's RMContainer based on the master // container ID stored in AppAttempt. // 根據存儲在AppAttempt中的主容器ID,標記當前正在運行的AMContainer的RMContainer。 if (masterContainer != null && masterContainer.getId().equals(rmContainer.getContainerId())) { // 設置ApplicationMaster容器 ((RMContainerImpl)rmContainer).setAMContainer(true); } } synchronized (schedulerAttempt) { // 這個pendingRelease用於工作維護恢復方案,以跟蹤AM的未完成發布請求。 // RM恢復可以收到AM的發布請求表,在此之前從NM收到容器狀態以進行恢復。 // 在這種情況下,由NM報告的待回收容器不應該被收回。 Set<ContainerId> releases = schedulerAttempt.getPendingRelease(); // Set類中的contains()函數, //如果此集合包含指定的元素,則返回<tt> true </ tt>。 更正式地,當且僅當該集合包含元素<tt> e </ tt>時,返回<tt> true </ tt>, //這樣<tt>(o==null ? e==null : o.equals(e))</tt>. if (releases.contains(container.getContainerId())) { // release the container //釋放容器 rmContainer.handle(new RMContainerFinishedEvent(container .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED)); releases.remove(container.getContainerId()); LOG.info(container.getContainerId() + " is released by application."); } } } } // 恢復並創建容器 // NMContainerStatus包括容器的當前信息。 // RMNode類表示節點管理器有關可用資源和其他靜態信息的信息。 private RMContainer recoverAndCreateContainer(NMContainerStatus status, RMNode node) { // 創建Container實例 Container container = Container.newInstance(status.getContainerId(), node.getNodeID(), node.getHttpAddress(), status.getAllocatedResource(), status.getPriority(), null); // 獲取應用程序的嘗試Id ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); // 創建一個RMContainerImpl對象 RMContainer rmContainer = new RMContainerImpl(container, attemptId, node.getNodeID(), applications.get(attemptId.getApplicationId()).getUser(), rmContext, status.getCreationTime()); return rmContainer; } /** * Recover resource request back from RMContainer when a container is * preempted before AM pulled the same. If container is pulled by * AM, then RMContainer will not have resource request to recover. * @param rmContainer */ /* * 在AM拉出相同之前當容器被搶占時,從RMContainer恢復資源請求。如果容器被AM拉過來,則RMContainer將不會有資源請求恢復。 */ protected void recoverResourceRequestForContainer(RMContainer rmContainer) { // getResourceRequests()函數獲取資源請求 List<ResourceRequest> requests = rmContainer.getResourceRequests(); // If container state is moved to ACQUIRED, request will be empty. // 如果容器狀態被移動到 ACQUIRED,請求將為空。 if (requests == null) { return; } // Add resource request back to Scheduler. // 將資源請求添加回調度器。 SchedulerApplicationAttempt schedulerAttempt = getCurrentAttemptForContainer(rmContainer.getContainerId()); if (schedulerAttempt != null) { // 恢復資源請求 schedulerAttempt.recoverResourceRequests(requests); } } protected void createReleaseCache() { // Cleanup the cache after nm expire interval. // 在nm到期之際后清除緩存。 // Timer類創建一個新的計時器。schedule()函數表示在指定的延遲之后安排指定的任務執行。 new Timer().schedule(new TimerTask() { @Override public void run() { // Map類的values()函數表示 返回此map中包含的值的{@link Collection}視圖。 for (SchedulerApplication<T> app : applications.values()) { // 獲取當前應用程序的嘗試 T attempt = app.getCurrentAppAttempt(); synchronized (attempt) { // for (ContainerId containerId : attempt.getPendingRelease()) { // logFailure()函數表示 為失敗的事件創建可讀和可分析的審核日志字符串。 RMAuditLogger.logFailure( app.getUser(), AuditConstants.RELEASE_CONTAINER, "Unauthorized access or invalid container", "Scheduler", "Trying to release container not owned by app or with invalid id.", attempt.getApplicationId(), containerId); } // Set類的clear()函數表示 從此set中刪除所有元素(可選操作)。 此調用返回后,該組將為空。 attempt.getPendingRelease().clear(); } } LOG.info("Release request cache is cleaned up"); } }, nmExpireInterval); } // clean up a completed container // 清理完成的容器 protected abstract void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); // 清除容器 protected void releaseContainers(List<ContainerId> containers, SchedulerApplicationAttempt attempt) { for (ContainerId containerId : containers) { // 獲取給定containerId的容器。 RMContainer rmContainer = getRMContainer(containerId); if (rmContainer == null) { // if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() < nmExpireInterval) { LOG.info(containerId + " doesn't exist. Add the container" + " to the release request cache as it maybe on recovery."); synchronized (attempt) { // Set類的add()函數表示 如果指定的元素不存在,則將其指定的元素添加到這個set(可選操作)。 // 更正式地,如果set不包含元素<tt> e2 </tt>,則將指定的元素<tt> e </tt>添加到此set,以便 //<tt>(e==null ? e2==null : e.equals(e2))</tt>. attempt.getPendingRelease().add(containerId); } } else { // logFailure()函數表示 為失敗的事件創建可讀和可分析的審核日志字符串 RMAuditLogger.logFailure(attempt.getUser(), AuditConstants.RELEASE_CONTAINER, "Unauthorized access or invalid container", "Scheduler", "Trying to release container not owned by app or with invalid id.", attempt.getApplicationId(), containerId); } } // 清理完成的容器 // createAbnormalContainerStatus()函數表示在特殊情況下創建{@link ContainerStatus}的實用程序。 completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus(containerId, SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); } } // 獲取 // SchedulerNode類表示 從調度器的角度表示YARN集群節點。 public SchedulerNode getSchedulerNode(NodeId nodeId) { // Map類的get()函數表示 返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。 return nodes.get(nodeId); } // 完全排除應用程序的sourceQueue,將其全部移動到destQueue。 @Override public synchronized void moveAllApps(String sourceQueue, String destQueue) throws YarnException { // check if destination queue is a valid leaf queue // 檢查目標隊列是否是有效的葉隊列 try { getQueueInfo(destQueue, false, false); } catch (IOException e) { LOG.warn(e); throw new YarnException(e); } // check if source queue is a valid // 檢查源隊列是否有效 // getAppsInQueue()函數表示 獲取給定隊列下的應用程序 List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue); if (apps == null) { String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist"; LOG.warn(errMsg); throw new YarnException(errMsg); } // generate move events for each pending/running app // 為每個待處理/正在運行的應用生成移動事件 for (ApplicationAttemptId app : apps) { // SettableFuture<Object> future = SettableFuture.create(); // RMAppMoveEvent類構造函數內部有 RMAppEventType.MOVE事件。 this.rmContext .getDispatcher() .getEventHandler() .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); } } // 終止指定隊列中的所有應用程序。 @Override public synchronized void killAllAppsInQueue(String queueName) throws YarnException { // check if queue is a valid // 檢查隊列是否有效 // getAppsInQueue()函數表示 獲取給定隊列下的應用程序 List<ApplicationAttemptId> apps = getAppsInQueue(queueName); if (apps == null) { String errMsg = "The specified Queue: " + queueName + " doesn't exist"; LOG.warn(errMsg); throw new YarnException(errMsg); } // generate kill events for each pending/running app // 為每個待處理/正在運行的應用生成kill事件 for (ApplicationAttemptId app : apps) { this.rmContext .getDispatcher() .getEventHandler() .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL, "Application killed due to expiry of reservation queue " + queueName + ".")); } } /** * Process resource update on a node. */ // 在節點上處理資源更新。 public synchronized void updateNodeResource(RMNode nm, ResourceOption resourceOption) { SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); // 獲取節點上的總資源。 Resource oldResource = node.getTotalResource(); if(!oldResource.equals(newResource)) { // Log resource change // 日志記錄資源更改 LOG.info("Update resource on node: " + node.getNodeName() + " from: " + oldResource + ", to: " + newResource); // Map類的remove()函數表示 從該map中刪除一個鍵的映射,如果存在(可選的操作)。 // 更正式地,如果該map包含從<tt> k </tt>到值<tt> v </tt>的映射,使得<code>(key==null ? k==null : key.equals(k))</code>, // 該映射被刪除。(map最多可以包含一個這樣的映射。) nodes.remove(nm.getNodeID()); // updateMaximumAllocation(node, false); // update resource to node // 將資源更新到節點 // 在節點上設置總資源。 node.setTotalResource(newResource); // Map類的put()函數表示 將指定的值與該映射中的指定鍵相關聯(可選操作)。如果map先前包含了鍵的映射,則舊值將被指定的值替換。 nodes.put(nm.getNodeID(), (N)node); // updateMaximumAllocation(node, true); // update resource to clusterResource // 將資源更新到clusterResource // subtractFrom(clusterResource, oldResource)表示從clusterResource減去oldResource,資源包括內存和虛擬內核 Resources.subtractFrom(clusterResource, oldResource); // addTo(clusterResource, newResource)表示在clusterResource添加newResource,資源包括內存和虛擬內核 Resources.addTo(clusterResource, newResource); } else { // Log resource change // 日志記錄資源改變 LOG.warn("Update resource on node: " + node.getNodeName() + " with the same resource: " + newResource); } } /** {@inheritDoc} */ // 返回調度時考慮的資源類型的集合 @Override public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() { // EnumSet類的of()函數 創建一個最初包含指定元素的枚舉集。 return EnumSet.of(SchedulerResourceTypes.MEMORY); } // 獲取由預留系統管理的隊列的名稱列表 @Override public Set<String> getPlanQueues() throws YarnException { // Object類的getClass()函數 返回此{@code Object}的運行時類。 // 返回的{@code Class}對象是被表示類的{@code static synchronized}方法鎖定的對象。 // Class類的getSimpleName()函數返回源代碼中給出的基礎類的簡單名稱。 如果基礎類是匿名的,則返回一個空字符串。 throw new YarnException(getClass().getSimpleName() + " does not support reservations"); } // 更新最大可分配 protected void updateMaximumAllocation(SchedulerNode node, boolean add) { // 獲取節點上的總資源 Resource totalResource = node.getTotalResource(); maxAllocWriteLock.lock(); try { if (add) { // added node //添加節點 // 獲取資源的<em>memory</em> int nodeMemory = totalResource.getMemory(); if (nodeMemory > maxNodeMemory) { maxNodeMemory = nodeMemory; // 設置資源的<em>memory</em> // Math.min()返回兩個數的最小值 maximumAllocation.setMemory(Math.min( configuredMaximumAllocation.getMemory(), maxNodeMemory)); } // 獲取資源的<em>number of virtual cpu cores</em> int nodeVCores = totalResource.getVirtualCores(); if (nodeVCores > maxNodeVCores) { maxNodeVCores = nodeVCores; maximumAllocation.setVirtualCores(Math.min( configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); } } else { // removed node //刪除節點 if (maxNodeMemory == totalResource.getMemory()) { maxNodeMemory = -1; } if (maxNodeVCores == totalResource.getVirtualCores()) { maxNodeVCores = -1; } // We only have to iterate through the nodes if the current max memory // or vcores was equal to the removed node's // 如果當前的最大內存或虛擬內核等於被刪除的節點的,我們只需遍歷節點 if (maxNodeMemory == -1 || maxNodeVCores == -1) { // A map entry (key-value pair). entrySet()返回此map中包含的映射的{@link Set}視圖。 for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) { int nodeMemory = nodeEntry.getValue().getTotalResource().getMemory(); if (nodeMemory > maxNodeMemory) { maxNodeMemory = nodeMemory; } int nodeVCores = nodeEntry.getValue().getTotalResource().getVirtualCores(); if (nodeVCores > maxNodeVCores) { maxNodeVCores = nodeVCores; } } if (maxNodeMemory == -1) { // no nodes //無節點 maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); } else { maximumAllocation.setMemory( Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); } if (maxNodeVCores == -1) { // no nodes //無節點 maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); } else { maximumAllocation.setVirtualCores( Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); } } } } finally { maxAllocWriteLock.unlock(); } } // 刷新最大可分配 protected void refreshMaximumAllocation(Resource newMaxAlloc) { maxAllocWriteLock.lock(); try { configuredMaximumAllocation = Resources.clone(newMaxAlloc); int maxMemory = newMaxAlloc.getMemory(); if (maxNodeMemory != -1) { maxMemory = Math.min(maxMemory, maxNodeMemory); } int maxVcores = newMaxAlloc.getVirtualCores(); if (maxNodeVCores != -1) { maxVcores = Math.min(maxVcores, maxNodeVCores); } // maximumAllocation = Resources.createResource(maxMemory, maxVcores); } finally { maxAllocWriteLock.unlock(); } } // 為應用嘗試獲取待處理的資源請求 public List<ResourceRequest> getPendingResourceRequestsForAttempt( ApplicationAttemptId attemptId) { // 獲取應用程序嘗試 SchedulerApplicationAttempt attempt = getApplicationAttempt(attemptId); if (attempt != null) { // getAppSchedulingInfo()獲取應用程序調度信息。 getAllResourceRequests()獲取所有的資源請求。 return attempt.getAppSchedulingInfo().getAllResourceRequests(); } return null; } }
Hadoop 三大調度器包括, Fifo , Capacity 以及 Fair 調度器,如下所示:
(1) Fifo調度器
對應的類是 FifoScheduler.java , 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java 。
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") public class FifoScheduler extends AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); Configuration conf; private boolean usePortForNodeName; private ActiveUsersManager activeUsersManager; private static final String DEFAULT_QUEUE_NAME = "default"; private QueueMetrics metrics; private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); //創建一個默認隊列 private final Queue DEFAULT_QUEUE = new Queue() { @Override public String getQueueName() { return DEFAULT_QUEUE_NAME; } @Override public QueueMetrics getMetrics() { return metrics; } @Override public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName()); queueInfo.setCapacity(1.0f); if (clusterResource.getMemory() == 0) { queueInfo.setCurrentCapacity(0.0f); } else { queueInfo.setCurrentCapacity((float) usedResource.getMemory() / clusterResource.getMemory()); } queueInfo.setMaximumCapacity(1.0f); queueInfo.setChildQueues(new ArrayList<QueueInfo>()); queueInfo.setQueueState(QueueState.RUNNING); return queueInfo; } public Map<QueueACL, AccessControlList> getQueueAcls() { Map<QueueACL, AccessControlList> acls = new HashMap<QueueACL, AccessControlList>(); for (QueueACL acl : QueueACL.values()) { acls.put(acl, new AccessControlList("*")); } return acls; } @Override public List<QueueUserACLInfo> getQueueUserAclInfo( UserGroupInformation unused) { QueueUserACLInfo queueUserAclInfo = recordFactory.newRecordInstance(QueueUserACLInfo.class); queueUserAclInfo.setQueueName(DEFAULT_QUEUE_NAME); queueUserAclInfo.setUserAcls(Arrays.asList(QueueACL.values())); return Collections.singletonList(queueUserAclInfo); } @Override public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return getQueueAcls().get(acl).isUserAllowed(user); } @Override public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } increaseUsedResources(rmContainer); updateAppHeadRoom(schedulerAttempt); updateAvailableResourcesMetrics(); } @Override public Set<String> getAccessibleNodeLabels() { // TODO add implementation for FIFO scheduler return null; } @Override public String getDefaultNodeLabelExpression() { // TODO add implementation for FIFO scheduler return null; } }; public FifoScheduler() { super(FifoScheduler.class.getName()); } // 初始化調度器 private synchronized void initScheduler(Configuration conf) { // 驗證配置信息 validateConf(conf); //Use ConcurrentSkipListMap because applications need to be ordered // 使用ConcurrentSkipListMap,因為應用程序需要有序 // 該applications在它的父類抽象類AbstractYarnScheduler中聲明。 this.applications = new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>(); // createResource()函數表示創建資源。 getInt()函數表示獲取<code> name </code>屬性的值作為<code> int </code>。 // 其中DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024 this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); // 初始化最大資源容量。 // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192 // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4 initMaximumResourceCapability( Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB), conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES))); // getBoolean()函數表示獲取<code> name </code>屬性的值作為<code>boolean</code>。 // 其中 DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false this.usePortForNodeName = conf.getBoolean( YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); // this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, conf); // ActiveUsersManager 跟蹤系統中的活動用戶。 this.activeUsersManager = new ActiveUsersManager(metrics); } @Override public void serviceInit(Configuration conf) throws Exception { //初始化調度器 initScheduler(conf); super.serviceInit(conf); } @Override public void serviceStart() throws Exception { super.serviceStart(); } @Override public void serviceStop() throws Exception { super.serviceStop(); } @Override public synchronized void setConf(Configuration conf) { this.conf = conf; } //驗證配置信息 private void validateConf(Configuration conf) { // validate scheduler memory allocation setting // 驗證調度器內存分配設置 // 其中 DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB = 1024 int minMem = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); // 其中 DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192 int maxMem = conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); if (minMem <= 0 || minMem > maxMem) { throw new YarnRuntimeException("Invalid resource scheduler memory" + " allocation configuration" + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + "=" + minMem + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + "=" + maxMem + ", min and max should be greater than 0" + ", max should be no smaller than min."); } } @Override public synchronized Configuration getConf() { return conf; } @Override public int getNumClusterNodes() { // Map類的size()函數表示 返回此map中鍵值映射的數量。 return nodes.size(); } @Override public synchronized void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { setConf(conf); } // @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + "or non existant application " + applicationAttemptId); return EMPTY_ALLOCATION; } // Sanity check // 完整性檢查 SchedulerUtils.normalizeRequests(ask, resourceCalculator, clusterResource, minimumAllocation, getMaximumResourceCapability()); // Release containers // 釋放容器 releaseContainers(release, application); synchronized (application) { // make sure we aren't stopping/removing the application // when the allocate comes in // 確保在分配進來時我們不會停止/刪除應用程序 if (application.isStopped()) { LOG.info("Calling allocate on a stopped " + "application " + applicationAttemptId); return EMPTY_ALLOCATION; } if (!ask.isEmpty()) { LOG.debug("allocate: pre-update" + " applicationId=" + applicationAttemptId + " application=" + application); // application.showRequests(); // Update application requests // 更新應用程序請求 application.updateResourceRequests(ask); LOG.debug("allocate: post-update" + " applicationId=" + applicationAttemptId + " application=" + application); application.showRequests(); LOG.debug("allocate:" + " applicationId=" + applicationAttemptId + " #ask=" + ask.size()); } // 更新黑名單列表 application.updateBlacklist(blacklistAdditions, blacklistRemovals); // 創建容器令牌和NMToken,如果其中任何一個由於某些原因(如DNS不可用)而失敗, // 請不要返回此容器並將其保留在等待重新引導的newlyAllocatedContainers中。 ContainersAndNMTokensAllocation allocation = application.pullNewlyAllocatedContainersAndNMTokens(); // 在應用程序的用戶的資源方面獲得可用的余量。 Resource headroom = application.getHeadroom(); // application.setApplicationHeadroomForMetrics(headroom); // Allocation類 return new Allocation(allocation.getContainerList(), headroom, null, null, null, allocation.getNMTokenList()); } } private FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { // SchedulerApplication<FiCaSchedulerApp> application = new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user); // Map類函數put()表示將指定的值與該map中的指定鍵相關聯(可選操作)。 applications.put(applicationId, application); // metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } } else { rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } } @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { // Map類函數get()表示返回指定鍵映射到的值,如果此映射不包含該鍵的映射,則返回{@code null}。 SchedulerApplication<FiCaSchedulerApp> application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); // TODO: Fix store // 創建FiCaSchedulerApp類對象, 表示從FIFO或容量調度器的角度出發的應用程序嘗試。 FiCaSchedulerApp schedulerApp = new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext); if (transferStateFromPreviousAttempt) { schedulerApp.transferStateFromPreviousAttempt(application .getCurrentAppAttempt()); } application.setCurrentAppAttempt(schedulerApp); metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(appAttemptId + " is recovering. Skipping notifying ATTEMPT_ADDED"); } } else { rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); } } private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { SchedulerApplication<FiCaSchedulerApp> application = applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); return; } // Inform the activeUsersManager // 通知activeUsersManager activeUsersManager.deactivateApplication(application.getUser(), applicationId); application.stop(finalState); // Map類函數remove()表示 如果存在(從可選的操作),從該map中刪除一個鍵的映射。 applications.remove(applicationId); } private synchronized void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) throws IOException { FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); SchedulerApplication<FiCaSchedulerApp> application = applications.get(applicationAttemptId.getApplicationId()); if (application == null || attempt == null) { throw new IOException("Unknown application " + applicationAttemptId + " has completed!"); } // Kill all 'live' containers // 殺死所有的“活”容器 for (RMContainer container : attempt.getLiveContainers()) { if (keepContainers && container.getState().equals(RMContainerState.RUNNING)) { // do not kill the running container in the case of work-preserving AM // restart. // 在維護AM重新啟動的情況下,不要殺死正在運行的容器。 LOG.info("Skip killing " + container.getContainerId()); continue; } // createAbnormalContainerStatus()表示在特殊情況下創建{@link ContainerStatus}的實用程序。 completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); } // Clean up pending requests, metrics etc. // 清理待處理的請求,指標等 attempt.stop(rmAppAttemptFinalState); } /** * Heart of the scheduler... * * @param node node on which resources are available to be allocated */ // 調度器的核心... // 分配容器, 參數node表示資源可用於分配的節點 private void assignContainers(FiCaSchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " #applications=" + applications.size()); // Try to assign containers to applications in fifo order // 嘗試以fifo順序將容器分配給應用程序 for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications .entrySet()) { FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt(); if (application == null) { continue; } LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { // Check if this resource is on the blacklist // 檢查這個資源是否在黑名單上 if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) { continue; } for (Priority priority : application.getPriorities()) { // 獲取最大可分配容器 int maxContainers = getMaxAllocatableContainers(application, priority, node, NodeType.OFF_SWITCH); // Ensure the application needs containers of this priority // 確保應用程序需要這個優先級的容器 if (maxContainers > 0) { // 在節點上分配容器 int assignedContainers = assignContainersOnNode(node, application, priority); // Do not assign out of order w.r.t priorities // 分配不要違反w.r.t優先級 if (assignedContainers == 0) { break; } } } } LOG.debug("post-assignContainers"); application.showRequests(); // Done // 干 if (Resources.lessThan(resourceCalculator, clusterResource, node.getAvailableResource(), minimumAllocation)) { break; } } // Update the applications' headroom to correctly take into // account the containers assigned in this update. // 更新應用程序的余量,以正確地考慮在此更新中分配的容器。 for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) { // FiCaSchedulerApp attempt = (FiCaSchedulerApp) application.getCurrentAppAttempt(); if (attempt == null) { continue; } // updateAppHeadRoom(attempt); } } private int getMaxAllocatableContainers(FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, NodeType type) { int maxContainers = 0; ResourceRequest offSwitchRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (offSwitchRequest != null) { maxContainers = offSwitchRequest.getNumContainers(); } // 資源分類。 NODE_LOCAL(0) 表示同一節點, RACK_LOCAL(1) 同一機架上不同節點, OFF_SWITCH(2)不同機架 if (type == NodeType.OFF_SWITCH) { return maxContainers; } if (type == NodeType.RACK_LOCAL) { // getResourceRequest()獲取資源請求。 getRMNode()返回RMNode類對象, getRackName()函數此節點管理器的機架名稱。 ResourceRequest rackLocalRequest = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (rackLocalRequest == null) { return maxContainers; } maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers()); } if (type == NodeType.NODE_LOCAL) { // getResourceRequest()獲取資源請求。 getNodeAddress()該節點的ContainerManager地址。 ResourceRequest nodeLocalRequest = application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); if (nodeLocalRequest != null) { // getNumContainers()獲取所需規格的容器數量 maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers()); } } return maxContainers; } // 在節點上分配容器 private int assignContainersOnNode(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority ) { // Data-local // 數據本地 int nodeLocalContainers = assignNodeLocalContainers(node, application, priority); // Rack-local // 機架本地 int rackLocalContainers = assignRackLocalContainers(node, application, priority); // Off-switch // 非同一機架 int offSwitchContainers = assignOffSwitchContainers(node, application, priority); LOG.debug("assignContainersOnNode:" + " node=" + node.getRMNode().getNodeAddress() + " application=" + application.getApplicationId().getId() + " priority=" + priority.getPriority() + " #assigned=" + (nodeLocalContainers + rackLocalContainers + offSwitchContainers)); return (nodeLocalContainers + rackLocalContainers + offSwitchContainers); } // 分配節點本地容器 private int assignNodeLocalContainers(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; // getNodeName()獲取節點的名稱以調度匹配決策。 ResourceRequest request = application.getResourceRequest(priority, node.getNodeName()); if (request != null) { // Don't allocate on this node if we don't need containers on this rack // 如果我們不需要在此機架上的容器,則不要在此節點上分配 // getRackName()此節點管理器的機架名稱。 ResourceRequest rackRequest = application.getResourceRequest(priority, node.getRMNode().getRackName()); // getNumContainers()獲取所需規格的容器數量。 if (rackRequest == null || rackRequest.getNumContainers() <= 0) { return 0; } int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, NodeType.NODE_LOCAL), request.getNumContainers()); // 分配容器 assignedContainers = assignContainer(node, application, priority, assignableContainers, request, NodeType.NODE_LOCAL); } return assignedContainers; } // 分配機架本地容器 private int assignRackLocalContainers(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (request != null) { // Don't allocate on this rack if the application doens't need containers // 如果應用程序不需要容器,請不要在此機架上分配 ResourceRequest offSwitchRequest = application.getResourceRequest(priority, ResourceRequest.ANY); if (offSwitchRequest.getNumContainers() <= 0) { return 0; } int assignableContainers = Math.min( getMaxAllocatableContainers(application, priority, node, NodeType.RACK_LOCAL), request.getNumContainers()); // 分配容器 assignedContainers = assignContainer(node, application, priority, assignableContainers, request, NodeType.RACK_LOCAL); } return assignedContainers; } // 分配容器跨機架 private int assignOffSwitchContainers(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, ResourceRequest.ANY); if (request != null) { assignedContainers = assignContainer(node, application, priority, request.getNumContainers(), request, NodeType.OFF_SWITCH); } return assignedContainers; } // 分配容器 private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " application=" + application.getApplicationId().getId() + " priority=" + priority.getPriority() + " assignableContainers=" + assignableContainers + " request=" + request + " type=" + type); // 獲取請求的<code>Resource</code>容量。 Resource capability = request.getCapability(); int availableContainers = node.getAvailableResource().getMemory() / capability.getMemory(); // TODO: A buggy // application // with this // zero would // crash the // scheduler. int assignedContainers = Math.min(assignableContainers, availableContainers); if (assignedContainers > 0) { for (int i=0; i < assignedContainers; ++i) { // getNodeID()該節點的節點ID。 NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = BuilderUtils.newContainerId(application .getApplicationAttemptId(), application.getNewContainerId()); // Create the container // 創建容器 Container container = BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() .getHttpAddress(), capability, priority, null); // Allocate! // 分配! // Inform the application // 通知應用程序 RMContainer rmContainer = application.allocate(type, node, priority, request, container); // Inform the node // 通知節點 node.allocateContainer(rmContainer); // Update usage for this container // 更新此容器的使用 increaseUsedResources(rmContainer); } } return assignedContainers; } private synchronized void nodeUpdate(RMNode rmNode) { FiCaSchedulerNode node = getNode(rmNode.getNodeID()); // 獲取並清除在NM心跳中累積的containerUpdates列表。 List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); for(UpdatedContainerInfo containerInfo : containerInfoList) { // newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); // completedContainers.addAll(containerInfo.getCompletedContainers()); } // Processing the newly launched containers // 處理新發起的容器 for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } // Process completed containers // 處理完成的容器 for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; } if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, node.getAvailableResource(),minimumAllocation)) { LOG.debug("Node heartbeat " + rmNode.getNodeID() + " available resource = " + node.getAvailableResource()); // assignContainers(node); LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + node.getAvailableResource()); } updateAvailableResourcesMetrics(); } // 增加使用的資源, private void increaseUsedResources(RMContainer rmContainer) { // addTo()把后面的資源添加到前面 Resources.addTo(usedResource, rmContainer.getAllocatedResource()); } private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { // schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, usedResource)); } private void updateAvailableResourcesMetrics() { // 設置可用資源。 資源變得可用時由調度器定期調用。 metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, usedResource)); } @Override public void handle(SchedulerEvent event) { switch(event.getType()) { case NODE_ADDED: { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); recoverContainersOnNode(nodeAddedEvent.getContainerReports(), nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: { NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; case NODE_RESOURCE_UPDATE: { NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = (NodeResourceUpdateSchedulerEvent)event; updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); } break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), appAddedEvent.getQueue(), appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: { AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; doneApplication(appRemovedEvent.getApplicationID(), appRemovedEvent.getFinalState()); } break; case APP_ATTEMPT_ADDED: { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: { AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = (AppAttemptRemovedSchedulerEvent) event; try { doneApplicationAttempt( appAttemptRemovedEvent.getApplicationAttemptID(), appAttemptRemovedEvent.getFinalAttemptState(), appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); } catch(IOException ie) { LOG.error("Unable to remove application " + appAttemptRemovedEvent.getApplicationAttemptID(), ie); } } break; case CONTAINER_EXPIRED: { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; case CONTAINER_RESCHEDULED: { ContainerRescheduledEvent containerRescheduledEvent = (ContainerRescheduledEvent) event; RMContainer container = containerRescheduledEvent.getContainer(); recoverResourceRequestForContainer(container); } break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } // 清理完成的容器 @Lock(FifoScheduler.class) @Override protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); return; } // Get the application for the finished container // 獲取完成了的容器的應用程序 Container container = rmContainer.getContainer(); // 根據容器Id獲取當前應用程序的嘗試 FiCaSchedulerApp application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated // 獲取分配容器的節點 FiCaSchedulerNode node = getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + appId + " released container " + container.getId() + " on node: " + node + " with event: " + event); return; } // Inform the application // 通知應用程序 application.containerCompleted(rmContainer, containerStatus, event); // Inform the node // 通知節點 在此節點上釋放分配的容器。 node.releaseContainer(container); // Update total usage // 更新總的使用情況 Resources.subtractFrom(usedResource, container.getResource()); LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node + " with event: " + event); } private Resource usedResource = recordFactory.newRecordInstance(Resource.class); // 移除節點 private synchronized void removeNode(RMNode nodeInfo) { FiCaSchedulerNode node = getNode(nodeInfo.getNodeID()); if (node == null) { return; } // Kill running containers // 殺死正在運行的容器 for(RMContainer container : node.getRunningContainers()) { completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } //Remove the node // 移除節點 this.nodes.remove(nodeInfo.getNodeID()); updateMaximumAllocation(node, false); // Update cluster metrics // Resources.subtractFrom(clusterResource, node.getTotalResource()); } @Override public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, boolean recursive) { return DEFAULT_QUEUE.getQueueInfo(false, false); } @Override public List<QueueUserACLInfo> getQueueUserAclInfo() { return DEFAULT_QUEUE.getQueueUserAclInfo(null); } @Override public ResourceCalculator getResourceCalculator() { return resourceCalculator; } // 添加節點 private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); this.nodes.put(nodeManager.getNodeID(), schedulerNode); Resources.addTo(clusterResource, schedulerNode.getTotalResource()); updateMaximumAllocation(schedulerNode, true); } @Override public void recover(RMState state) { // NOT IMPLEMENTED } @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } @Override public QueueMetrics getRootQueueMetrics() { return DEFAULT_QUEUE.getMetrics(); } // 檢查用戶是否有權執行操作。 @Override public synchronized boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { return DEFAULT_QUEUE.hasAccess(acl, callerUGI); } @Override public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) { if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(applications.size()); for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) { attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); } return attempts; } else { return null; } } public Resource getUsedResource() { return usedResource; } }
Fifo隊列內部的調用過程是handle()函數內部NODE_UPDATE事件觸發nodeUpdate(...)函數; 該函數內部調用assignContainers(...); 該函數內部調用assignContainersOnNode(...); 該函數順序執行assignNodeLocalContainers(...), assignRackLocalContainers(...) 以及 assignOffSwitchContainers(...); 其中這三個函數內部都會順序執行 getResourceRequest(...), getMaxAllocatableContainers(...), assignContainer(...)
(2) Capacity調度器
對應的類是 CapacityScheduler.java ,在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java 。
(3) Fair調度器
對應的類是 FairScheduler.java, 在 hadoop-2.7.3-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java 。
這三個調度器類都
2 JobTracker中的資源管理和作業控制功能分開,分別由組件ResourceManager和ApplicationMaster實現。
(1) ResourceManager 負責所有應用程序的資源分配
對應的類是 ResourceManager.java 在
(2) ApplicationMaster 僅負責管理一個應用程序
對應的類是 ApplicationMaster.java 在
另外, NodeManager
3 編寫Hadoop調度器 參考如何編寫Hadoop調度器 以及 深入Hadoop的調度器
假設我們要編寫一個新的調度器,為MyHadoopScheduler,需要進行以下工作:
(1) 用戶需要自己實現的類
(2) 用戶要用到的系統類