由腳本找到 RM 主類
這部分,我們從腳本作為入口去逐步深入ResourceManager源碼。
從 Hadoop 官方文檔 中可以看到 ResourceManager 的啟動命令為:
Usage: yarn resourcemanager [-format-state-store]
COMMAND_OPTIONS | Description |
---|---|
-format-state-store | Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running. |
-remove-application-from-state-store <appId> | Remove the application from RMStateStore. This should be run only when the ResourceManager is not running. |
定位到 源代碼 hadoop-yarn-project > hadoop-yarn > bin > start-yarn.sh
# start resourceManager HARM=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.enabled 2>&-) # 查看配置,是否啟用 ResourceManager 的 HA 機制
# 未啟用 ResourceManager 的 HA 機制 if [[ ${HARM} = "false" ]]; then echo "Starting resourcemanager" hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \ --config "${HADOOP_CONF_DIR}" \ --daemon start \ resourcemanager (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) else # 啟用ResourceManager的 HA 機制 logicals=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.rm-ids 2>&-) # yarn.resoucemanager.ha.rm-ids 表示 RM 的邏輯Ids,多個按逗號分割 logicals=${logicals//,/ } # 按逗號分割成多個 RM id for id in ${logicals} do rmhost=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey "yarn.resourcemanager.hostname.${id}" 2>&-) RMHOSTS="${RMHOSTS} ${rmhost}" # 最終,RMHOSTS 變量會是由空格分割的 hostname 字符串 done echo "Starting resourcemanagers on [${RMHOSTS}]" hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \ # 運行 yarn 命令 --config "${HADOOP_CONF_DIR}" \ --daemon start \ --workers \ --hostnames "${RMHOSTS}" \ resourcemanager (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) # 累加上一個命令的返回值 fi
首先解釋 shell 分割字符串的語法:
$ aa='1,2,3';for i in ${aa//,/ }; do echo $i; done; 1 2 3
參照 官方的配置sample 會比較容易理解,下面已經啟用了HA,並且 RM ids 有 rm1,rm2, 其中rm1 的hostname 是 master1, rm2 的 hostname 是 master2,:
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>master2</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>
然后再結合 yarn 腳本,可以得出,resourcemanager 的 入口類是 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager,參數為 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" 以及經由 shell函數 傳遞的參數值(不做具體分析)
分析 RM 服務初始化過程
分析ResouceManager 類繼承關系
接下來,終於到了入口類 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager, 該類在 hadoop-yarn-server-resourcemanager 的子 mudule 下。
先來看 RM 對象的 聲明, 繼承了 CompositeService 服務類,說明 RM 是一個組件服務,實現了ResourceManagerMXBean接口,可以交給 JMX 管理:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
分析 ResourceManager 的入口函數
然后,找到 Main 函數:
public static void main(String argv[]) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); # 解析參數 argv = hParser.getRemainingArgs(); # --參數名 參數值之外的剩余以"-"開頭的參數,第一次,沒有指定剩余參數 // If -format-state-store, then delete RMStateStore; else startup normally if (argv.length >= 1) { if (argv[0].equals("-format-state-store")) { deleteRMStateStore(conf); } else if (argv[0].equals("-remove-application-from-state-store") && argv.length == 2) { removeApplication(conf, argv[1]); } else { printUsage(System.err); } } else { ResourceManager resourceManager = new ResourceManager();
// 初始化RM對象實例,在超類中初始化服務名稱為 “ResouceManager” ,並實例化了狀態模型成員字段 stateModel,初始化狀態為 Service.State.NOTINITED ,后面詳細介紹 ShutdownHookManager.get().addShutdownHook( // 添加服務組件關閉的回調函數 new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); resourceManager.init(conf); // 初始化 RM 服務 resourceManager.start(); // 啟動 RM 服務 } } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); System.exit(-1); } }
分析 ResourceManager的 初始化過程
@Override // 定義在其父類 AbstractService 中 public void init(Configuration conf) { if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { // 服務沒有沒有被初始化過 setConfig(conf); // 設值 conf 對象 try { serviceInit(config); // 初始化服務 if (isInState(STATE.INITED)) { // 如果服務正確初始化 //if the service ended up here during init, //notify the listeners notifyListeners(); // 通知 listener } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
serviceInit 方法在 ResouceManager 類中有實現:
@Override protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; // 1. 初始化服務上下文 // RMContextImpl 保存了兩類服務的上下文 // 一類是 serviceContext : 這類服務是 Always On 服務,即不考慮HA狀態的一直運行的服務 // 一類是 activeServiceCotext : 活動的服務上下文,即需要運行在Active RM 節點上的服務 this.rmContext = new RMContextImpl(); rmContext.setResourceManager(this); // 2. 設置配置的provider this.configurationProvider = ConfigurationProviderFactory.getConfigurationProvider(conf); this.configurationProvider.init(this.conf); rmContext.setConfigurationProvider(configurationProvider); // 3.加載 core-site.xml loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml // Or use RM specific configurations to overwrite the common ones first // if they exist RMServerUtils.processRMProxyUsersConf(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf); // 4. 加載 yarn-site.xml loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); // 5. 配置校驗 validateConfigs(this.conf); // 6. login // Set HA configuration should be done before login this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { // 如果RM 啟用了 HA,設置 HA 的配置 HAUtil.verifyAndSetConfiguration(this.conf); } // Set UGI and do login // If security is enabled, use login user // If security is not enabled, use current user // 如果是啟用了 安全認證,比如 kerberos,使用kerberos 登陸用戶,否則默認使用當前用戶 this.rmLoginUGI = UserGroupInformation.getCurrentUser(); try { doSecureLogin(); } catch(IOException ie) { throw new YarnRuntimeException("Failed to login", ie); } // register the handlers for all AlwaysOn services using setupDispatcher(). // 7. 初始化所有的一直運行的服務的事件的handler rmDispatcher = setupDispatcher(); addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); // The order of services below should not be changed as services will be // started in same order // As elector service needs admin service to be initialized and started, // first we add admin service then elector service // 8. 創建 AdminService adminService = createAdminService(); addService(adminService); rmContext.setRMAdminService(adminService); // elector must be added post adminservice if (this.rmContext.isHAEnabled()) { // If the RM is configured to use an embedded leader elector, // initialize the leader elector. if (HAUtil.isAutomaticFailoverEnabled(conf) && HAUtil.isAutomaticFailoverEmbedded(conf)) { EmbeddedElector elector = createEmbeddedElector(); addIfService(elector); rmContext.setLeaderElectorService(elector); } } // 9. 設置 Yarn Configuration rmContext.setYarnConfiguration(conf); // 10. 創建並初始化 Active Service createAndInitActiveServices(false); // 11. 獲取 yarn wenApp地址 webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); // 12. 創建 RMApplicationHistoryWriter 服務 RMApplicationHistoryWriter rmApplicationHistoryWriter = createRMApplicationHistoryWriter(); addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); // initialize the RM timeline collector first so that the system metrics // publisher can bind to it // 13. 創建 RM timeline collector if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) { RMTimelineCollectorManager timelineCollectorManager = createRMTimelineCollectorManager(); addService(timelineCollectorManager); rmContext.setRMTimelineCollectorManager(timelineCollectorManager); } // 14. 設置 SystemMetricsPublisher SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); addIfService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); // 15. 注冊 JMX registerMXBean(); // 16. 調用父類的服務 init 方法 super.serviceInit(this.conf); }
下面逐一查看初始化的各個子步驟
初始化服務上下文
public RMContextImpl() { // 一直運行的服務上下文 this.serviceContext = new RMServiceContext(); // 只運行在 active RM 節點上的 上下文 this.activeServiceContext = new RMActiveServiceContext(); }
設置配置的 provider
這里使用了工廠模式和配置提供了默認的ConfigurationProvider ,並且用戶可以實現 ConfigurationProvider 自定義 provider。
provider 其實在其他的源碼中也經常用到。在這里,provider 提供了可以做一些內部的初始化以及返回 配置文件的 inputstream 流對象,關閉流對象等操作。對於處理解析配置的類來說,只需要一個輸入流即可。
// ConfigurationProviderFactory 是一個工廠類 /** * Creates an instance of {@link ConfigurationProvider} using given * configuration. * @param bootstrapConf * @return configurationProvider */ @SuppressWarnings("unchecked") public static ConfigurationProvider getConfigurationProvider(Configuration bootstrapConf) { Class<? extends ConfigurationProvider> defaultProviderClass; try { // 默認的 provider class 是org.apache.hadoop.yarn.LocalConfigurationProvider defaultProviderClass = (Class<? extends ConfigurationProvider>) Class.forName( YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS); } catch (Exception e) { throw new YarnRuntimeException( "Invalid default configuration provider class" + YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e); } ConfigurationProvider configurationProvider = // 從緩存池中獲取到該類的 構造方法,然后根據構造方法反射得到 provider實例 // 可以 通過 yarn.resourcemanager.configuration.provider-class 參數指定 provider ReflectionUtils.newInstance(bootstrapConf.getClass( YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, defaultProviderClass, ConfigurationProvider.class), bootstrapConf); return configurationProvider; }
加載 core-site.xml 文件
private void loadConfigurationXml(String configurationFile) throws YarnException, IOException { InputStream configurationInputStream = this.configurationProvider.getConfigurationInputStream(this.conf, configurationFile); if (configurationInputStream != null) { this.conf.addResource(configurationInputStream, configurationFile); } }
加載 yarn-site.xml
跟加載 core-site.xml 文件操作類似
校驗配置文件
主要校驗 最大嘗試次數 和 過期會話時長 和 心跳間隔的關系
protected static void validateConfigs(Configuration conf) { // validate max-attempts int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); if (globalMaxAppAttempts <= 0) { throw new YarnRuntimeException("Invalid global max attempts configuration" + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS + "=" + globalMaxAppAttempts + ", it should be a positive integer."); } // validate expireIntvl >= heartbeatIntvl long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); long heartbeatIntvl = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); if (expireIntvl < heartbeatIntvl) { throw new YarnRuntimeException("Nodemanager expiry interval should be no" + " less than heartbeat interval, " + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "=" + heartbeatIntvl); } }
用戶登陸
第一步:校驗是否啟用了HA, 如果啟用了HA,需要配置HA 的相關信息,因為 用戶登陸,是每個節點都需要登陸的。
第二步:獲取當前的用戶, 如果啟用了 kerberos,那么是當前登陸kerberos的用戶,否則是當前用戶
@InterfaceAudience.Public @InterfaceStability.Evolving public static UserGroupInformation getCurrentUser() throws IOException { AccessControlContext context = AccessController.getContext(); Subject subject = Subject.getSubject(context); if (subject == null || subject.getPrincipals(User.class).isEmpty()) { return getLoginUser(); } else { return new UserGroupInformation(subject); } }
第三步: 調用安全API登陸,並獲取登陸用戶
protected void doSecureLogin() throws IOException { InetSocketAddress socAddr = getBindAddress(conf); SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB, YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName()); // if security is enable, set rmLoginUGI as UGI of loginUser if (UserGroupInformation.isSecurityEnabled()) { this.rmLoginUGI = UserGroupInformation.getLoginUser(); } }
初始化所有一直運行的服務事件的handler
private Dispatcher setupDispatcher() { // 創建 dispatcher Dispatcher dispatcher = createDispatcher(); // 將 RMFatalEventType 事件的handler RMFatalEventDispatcher // 注冊到 dispatcher dispatcher.register(RMFatalEventType.class, new ResourceManager.RMFatalEventDispatcher()); return dispatcher; } protected Dispatcher createDispatcher() { return new AsyncDispatcher("RM Event dispatcher"); }
AsyncDispatcher 內部是 有一個 阻塞的 事件隊列,有一個一直運行的 執行線程,當阻塞隊列中有事件被放入,執行線程會把事件取出來,並獲取事件的類型,從事件注冊器Map<Class<? extends Enum>, EventHandler>中 獲取到對應的 EventHandler 對象,並調用 該對象的 dispatch 方法。這樣就完成了一次異步事件調用。
創建 AdminService
protected AdminService createAdminService() { return new AdminService(this); }
設置 Yarn Configuration
rmContext.setYarnConfiguration(conf); // 調用了 public void setYarnConfiguration(Configuration yarnConfiguration) { serviceContext.setYarnConfiguration(yarnConfiguration); }
創建並初始化 Active Service
protected void createAndInitActiveServices(boolean fromActive) { activeServices = new RMActiveServices(this); activeServices.fromActive = fromActive; activeServices.init(conf); } // 其中,init 方法如下 @Override public void init(Configuration conf) { if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } } // 調用的 serviceInit 方法如下,后面具體分析 @Override protected void serviceInit(Configuration configuration) throws Exception { standByTransitionRunnable = new StandByTransitionRunnable(); rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); addService(containerAllocationExpirer); rmContext.setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor(); addService(rmAppLifetimeMonitor); rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); RMNodeLabelsManager nlm = createNodeLabelManager(); nlm.setRMContext(rmContext); addService(nlm); rmContext.setNodeLabelManager(nlm); AllocationTagsManager allocationTagsManager = createAllocationTagsManager(); rmContext.setAllocationTagsManager(allocationTagsManager); PlacementConstraintManagerService placementConstraintManager = createPlacementConstraintManager(); addService(placementConstraintManager); rmContext.setPlacementConstraintManager(placementConstraintManager); // add resource profiles here because it's used by AbstractYarnScheduler ResourceProfilesManager resourceProfilesManager = createResourceProfileManager(); resourceProfilesManager.init(conf); rmContext.setResourceProfilesManager(resourceProfilesManager); RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { addService(delegatedNodeLabelsUpdater); rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater); } recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null; if (recoveryEnabled) { rmStore = RMStateStoreFactory.getStore(conf); boolean isWorkPreservingRecoveryEnabled = conf.getBoolean( YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); rmContext .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); } else { rmStore = new NullRMStateStore(); } try { rmStore.setResourceManager(rm); rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced LOG.error("Failed to init state store", e); throw e; } rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler scheduler = createScheduler(); scheduler.setRMContext(rmContext); addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); // Register event handler for RmAppEvents rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext)); // Register event handler for RmAppAttemptEvents rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext)); // Register event handler for RmNodes rmDispatcher.register( RMNodeEventType.class, new NodeEventDispatcher(rmContext)); nmLivelinessMonitor = createNMLivelinessMonitor(); addService(nmLivelinessMonitor); resourceTracker = createResourceTrackerService(); addService(resourceTracker); rmContext.setResourceTrackerService(resourceTracker); MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager"); if (fromActive) { JvmMetrics.reattach(ms, jvmMetrics); UserGroupInformation.reattachMetrics(); } else { jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); } JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(); addService(pauseMonitor); jvmMetrics.setPauseMonitor(pauseMonitor); // Initialize the Reservation system if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) { reservationSystem = createReservationSystem(); if (reservationSystem != null) { reservationSystem.setRMContext(rmContext); addIfService(reservationSystem); rmContext.setReservationSystem(reservationSystem); LOG.info("Initialized Reservation system"); } } masterService = createApplicationMasterService(); createAndRegisterOpportunisticDispatcher(masterService); addService(masterService) ; rmContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); queueACLsManager = createQueueACLsManager(scheduler, conf); rmAppManager = createRMAppManager(); // Register event handler for RMAppManagerEvents rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService(); addService(clientRM); rmContext.setClientRMService(clientRM); applicationMasterLauncher = createAMLauncher(); rmDispatcher.register(AMLauncherEventType.class, applicationMasterLauncher); addService(applicationMasterLauncher); if (UserGroupInformation.isSecurityEnabled()) { addService(delegationTokenRenewer); delegationTokenRenewer.setRMContext(rmContext); } if(HAUtil.isFederationEnabled(conf)) { String cId = YarnConfiguration.getClusterId(conf); if (cId.isEmpty()) { String errMsg = "Cannot initialize RM as Federation is enabled" + " but cluster id is not configured."; LOG.error(errMsg); throw new YarnRuntimeException(errMsg); } federationStateStoreService = createFederationStateStoreService(); addIfService(federationStateStoreService); LOG.info("Initialized Federation membership."); } new RMNMInfo(rmContext, scheduler); if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, false)) { SystemServiceManager systemServiceManager = createServiceManager(); addIfService(systemServiceManager); } super.serviceInit(conf); }
獲取 yarn wenApp地址
// yarn.resourcemanager.bind-host 可以根據這個參數來動態指定 RM HOST webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
創建 RMApplicationHistoryWriter 服務
protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { return new RMApplicationHistoryWriter(); } RMApplicationHistoryWriter rmApplicationHistoryWriter = createRMApplicationHistoryWriter(); addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
創建 RM timeline collector
private RMTimelineCollectorManager createRMTimelineCollectorManager() { return new RMTimelineCollectorManager(this); } if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) { RMTimelineCollectorManager timelineCollectorManager = createRMTimelineCollectorManager(); addService(timelineCollectorManager); rmContext.setRMTimelineCollectorManager(timelineCollectorManager); }
設置 SystemMetricsPublisher
protected SystemMetricsPublisher createSystemMetricsPublisher() { List<SystemMetricsPublisher> publishers = new ArrayList<SystemMetricsPublisher>(); // 使用 v1 if (YarnConfiguration.timelineServiceV1Enabled(conf)) { SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher(); publishers.add(publisherV1); } // 使用 v2 if (YarnConfiguration.timelineServiceV2Enabled(conf)) { // we're dealing with the v.2.x publisher LOG.info("system metrics publisher with the timeline service V2 is " + "configured"); SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher( rmContext.getRMTimelineCollectorManager()); publishers.add(publisherV2); } // 如果沒有 publisher, 給一個 空的 publisher,這里運用了null object 模式,防止了空指針的出現。 if (publishers.isEmpty()) { LOG.info("TimelineServicePublisher is not configured"); SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher(); publishers.add(noopPublisher); } for (SystemMetricsPublisher publisher : publishers) { addIfService(publisher); } SystemMetricsPublisher combinedPublisher = new CombinedSystemMetricsPublisher(publishers); return combinedPublisher; }
注冊 JMX
/** * Register ResourceManagerMXBean. */ private void registerMXBean() { MBeans.register("ResourceManager", "ResourceManager", this); }
調用父類的服務 init 方法
// 在這里,之前初始化過程中創建的任何被加入到服務列表中的服務,都會被初始化。 protected void serviceInit(Configuration conf) throws Exception { List<Service> services = getServices(); if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": initing services, size=" + services.size()); } for (Service service : services) { service.init(conf); } super.serviceInit(conf); } // 奇怪,為什么不直接 返回呢?ArrayList 的構造方法里面做的事就是 Arrays.copyOf 的工作(淺拷貝),防止了外部應用更新或刪除服務列表。這是一個建議的做法,還可以返回一個 iterator 對象 public List<Service> getServices() { synchronized (serviceList) { return new ArrayList<Service>(serviceList); } }
至此,初始化的大致代碼,基本上走完了,后續涉及到哪部分代碼,再回來具體看。