一、概述
本文將介紹ResourceManager在Yarn中的功能作用,從更細的粒度分析RM內部組成的各個組件功能和他們相互的交互方式。
二、ResourceManager的交互協議與基本職能
1、ResourceManager交互協議
在整個Yarn框架中主要涉及到7個協議,分別是ApplicationClientProtocol、MRClientProtocol、ContainerManagementProtocol、ApplicationMasterProtocol、ResourceTracker、LocalizationProtocol、TaskUmbilicalProtocol,這些協議封裝了各個組件交互的信息。ResourceManager現實功能需要和NodeManager以及ApplicationMaster進行信息交互,其中涉及到的RPC協議有ResourceTrackerProtocol、ApplicationMasterProtocol和ResourceTrackerProtocol。
-
ResourceTracker
NodeManager通過該協議向ResourceManager中注冊、匯報節點健康情況以及Container的運行狀態,並且領取ResourceManager下達的重新初始化、清理Container等命令。NodeManager和ResourceManager這種RPC通信采用了和MRv1類似的“pull模型”(ResourceManager充當RPC server角色,NodeManager充當RPC client角色),NodeManager周期性主動地向ResourceManager發起請求,並且領取下達給自己的命令。
-
ApplicationMasterProtocol
應用程序的ApplicationMaster同過該協議向ResourceManager注冊、申請和釋放資源。該協議和上面協議同樣也是采用了“pull模型”,其中在RPC機制中,ApplicationMaster充當RPC client角色,ResourceManager充當RPC server角色。
-
ApplicationClientProtocol
-
客戶端通過該協議向ResourceManager提交應用程序、控制應用程序(如殺死job)以及查詢應用程序的運行狀態等。在該RPC 協議中應用程序客戶端充當RPC client角色,ResourceManager充當RPC server角色。
整理一下ResourceManager與NodeManager、ApplicationMaster和客戶端RPC協議交互的信息:
上圖中的ResourceTrackeServer、ApplicationMasterService 、ClientRMServer是ResourceManager中處理上述功能的組件。
1、ResourceManager基本職能
ResourceManager基本職能概括起來就以下幾方面:
-
與客戶端進行交互,處理來自於客戶端的請求,如查詢應用的運行情況等。
-
啟動和管理各個應用的ApplicationMaster,並且為ApplicationMaster申請第一個Container用於啟動和在它運行失敗時將它重新啟動。
-
管理NodeManager,接收來自NodeManager的資源和節點健康情況匯報,並向NodeManager下達管理資源命令,例如kill掉某個container。
-
資源管理和調度,接收來自ApplicationMaster的資源申請,並且為其進行分配。這個是它的最重要的職能。
三、ResourceManager內部組成架構分析
ResourceManager在底層代碼實現上將各個功能模塊分的比較細,各個模塊功能具有很強的獨立性。下圖所示的是ResourceManager中的大概的功能模塊組成:
1、用戶交互模塊
用戶交互模塊即上圖顯示的User Service管理模塊。在這里邊還可以看到根據不同的用戶類型啟用了不同的服務進行處理,AdminService處理管理員相關請求,ClientRMService處理普通客戶相關請求,這樣使得管理員不會因為普通客戶請求太多而造成堵塞。下面看看這2個服務的具體實現代碼:
-
ClientRMService
public class ClientRMService extends AbstractService implements ApplicationClientProtocol { private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>(); private static final Log LOG = LogFactory.getLog(ClientRMService.class); final private AtomicInteger applicationCounter = new AtomicInteger(0); final private YarnScheduler scheduler;//調度器
final private RMContext rmContext;//RM上下文對象,其包含了RM大部分運行時信息,如節點列表、隊列列表、應用程序列表等
private final RMAppManager rmAppManager;//app管理對象
private Server server;//一個RPC Server
protected RMDelegationTokenSecretManager rmDTSecretManager; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); InetSocketAddress clientBindAddress; //訪問控制對象,例如,一些應用程序在提交時設置了查看權限的話,其他普通用戶就無法查看。
private final ApplicationACLsManager applicationsACLsManager; private final QueueACLsManager queueACLsManager; ...... @Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); this.server = //實現RPC協議ApplicationClientProtocol
rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, conf, this.rmDTSecretManager, conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); // Enable service authorization?
if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new RMPolicyProvider()); } this.server.start(); ...... }
從上面ClientRMService的基本代碼架構我們可以看出:
(1)ClientRMService是一個RPC Server,主要為來自於普通客戶端的各種RPC請求。從代碼實現的角度看,它是ApplicationClientProtocol協議的一個實現。
(2)之前我們已經說了,普通用戶可以通過該服務來獲得正在運行應用程序的相關信息,如進度情況、應用程序列表等。上面代碼中都將ResourceManager運行信息封裝在RMContxt接口中了,下面來看看這個接口的一個實現對象RMContextImpl:
public class RMContextImpl implements RMContext { //中央異步調度器。RM中的各個服務和組件以及它們處理和輸出的事件類型都是通過中央異步調度器組織在一起的,這樣可以有效提高系統的吞吐量。
private final Dispatcher rmDispatcher; private final ConcurrentMap<ApplicationId, RMApp> applications//應用程序列表
= new ConcurrentHashMap<ApplicationId, RMApp>(); private final ConcurrentMap<NodeId, RMNode> nodes//節點列表
= new ConcurrentHashMap<NodeId, RMNode>(); private final ConcurrentMap<String, RMNode> inactiveNodes//非活躍節點列表
= new ConcurrentHashMap<String, RMNode>(); //正在運行中的AP心跳監控對象
private AMLivelinessMonitor amLivelinessMonitor;//正在運行中的AP心跳監控對象 //運行完畢后的AM心跳監控對象
private AMLivelinessMonitor amFinishingMonitor; //用於存儲ResourceManager運行狀態
private RMStateStore stateStore = null; //用於Container的超時監控,應用程序必須在一定時間內(默認10Min)使用分配到的Container去運行task,否則會被回收
private ContainerAllocationExpirer containerAllocationExpirer; //下面變量都是與安全管理相關的對象
private final DelegationTokenRenewer delegationTokenRenewer; private final AMRMTokenSecretManager amRMTokenSecretManager; private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; private ClientRMService clientRMService; private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; ...... }
AdminService
AdminService和ClientRMService一樣都是作為RPC的服務端,它針對的處理管理員RPC請求,負責訪問權限的控制,中Yarn中管理員權限的設定可以在yarn-site.xml中yarn.admi.acl項進行設置,該項的默認值是*,也就是說如果不進行設置的話就當所有的用戶都是管理員。從代碼上看,它是ResourceManagerAdministrationProtocol協議的一個實現:
public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol { private static final Log LOG = LogFactory.getLog(AdminService.class); private final Configuration conf; private final ResourceScheduler scheduler; private final RMContext rmContext; private final NodesListManager nodesListManager; private final ClientRMService clientRMService; private final ApplicationMasterService applicationMasterService; private final ResourceTrackerService resourceTrackerService; private Server server; private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ..... }
AdminService代碼和ClientRMService比較相似,它各類功能對象也差不多。
2、NodeManager管理
NodeManager主要是通過NMLivelinessMonitor、ResourceTrackerService和NodeListManager這3大組件來對NodeManager的生命周期、心跳處理以及黑名單處理。
(1)ResourceTrackerService
ResourceTrackerService是RPC協議ResourceTracker的一個實現,它作為一個RPC Server端接收NodeManager的RPC請求,請求主要包含2種信息,注冊NodeManager和處理心跳信息。NodeManger啟動時第一件事就是像ResourceManager注冊,注冊時NodeManager發給ResourceTrackerService的RPC包主要包含NodeManager所在節點的可用資源總量、對外開放的htpp端口、節點的host和port等信息,具體代碼看ResourceTrackerService#registerNodeManager方法:
@SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { NodeId nodeId = request.getNodeId();//從NodeManager帶來的NodeID
String host = nodeId.getHost();//NodeManager所在節點的host
int cmPort = nodeId.getPort(); //NodeManager所在節點的port
int httpPort = request.getHttpPort();//對外開放的http端口
Resource capability = request.getResource();//獲得NodeManager所在節點的資源上限
RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); // Check if this node is a 'valid' node //檢測節點host名稱的的合法性
if (!this.nodesListManager.isValidNode(host)) { String message =
"Disallowed NodeManager from " + host + ", Sending SHUTDOWN signal to the NodeManager."; LOG.info(message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } ..... }
ResourceTrackerService另外一種功能就是處理心跳信息了,當NodeManager啟動后,它會周期性地調用RPC函數ResourceTracker#nodeHeartbeat匯報心跳,心跳信息主要包含該節點的各個Container的運行狀態、正在運行的Application列表、節點的健康狀況等,隨后ResourceManager為該NodeManager返回需要釋放的Container列表、Application列表等信息。其中心跳信息處理的流程:首先,從NodeManager發來的心跳包中獲得節點的狀態狀態信息,然后檢測該節點是否已經注冊過,然后檢測該節點的host名稱是否合法,例如是否在excluded列表中,然后再檢測該次心跳是不是第一次心跳信息,這點非常重要,因為關系到心跳的重復發送與應答的相關問題。其實ResourceTrackerService和NodeManager的心跳處理機制和之前Hadoop1.x中的JobTracker與TaskTacker之間的心跳處理很相像,再然后,為NodeManager返回心跳應答信息,最后,向RMNode發送該NodeManager的狀態信息並且保存最近一次心跳應答信息。再具體看看ResourceTracker#nodeHeart方法:
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { //從RPC Clinet中獲得nodeManager所在節點的健康狀況
NodeStatus remoteNodeStatus = request.getNodeStatus(); /** * Here is the node heartbeat sequence... * 1. Check if it's a registered node * 2. Check if it's a valid (i.e. not excluded) node * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 4. Send healthStatus to RMNode */ NodeId nodeId = remoteNodeStatus.getNodeId(); // 1. Check if it's a registered node
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { /* node does not exist */ String message = "Node not found resyncing " + remoteNodeStatus.getNodeId(); LOG.info(message); resync.setDiagnosticsMessage(message); return resync; } // Send ping
this.nmLivelinessMonitor.receivedPing(nodeId); // 2. Check if it's a valid (i.e. not excluded) node
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) { String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ rmNode.getNodeAddress(); LOG.info(message); shutDown.setDiagnosticsMessage(message); this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); return shutDown; } // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse .getResponseId()) { LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress()); return lastNodeHeartbeatResponse; } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse .getResponseId()) { String message =
"Too far behind rm response id:"
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId(); LOG.info(message); resync.setDiagnosticsMessage(message); // TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); return resync; } // Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); // 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); return nodeHeartBeatResponse; }
(2)NodeListManager
NodeListManager主要分管黑名單(include列表)和白名單(exlude列表)管理功能,分別有yarnresouecemanager.nodes.include-path和yarnresourcemanager.nodes.exclude-path指定。黑名單列表中的nodes不能夠和RM直接通信(直接拋出RPC異常),管理員可以對這兩個列表進行編輯,然后使用$HADOOP_HOME/bin/yarn rmadmin -refreshNodes動態加載修改后的列表,使之生效。
(3)NMLivelinessMonitor
NMLivelinessMonitor主要是分管心跳異常請求。該服務會周期性地遍歷集群中的所有NodeManager,如果某個NodeManager在一定時間內(默認10min,可以有參數yarn.nm.liveness-monitor.expiry-interval-ms配置)沒有進行心跳匯報,那么則認為它已經死掉,同時在該節點上運行的Container也會被置為運行失敗釋放資源。那么這些被置為失敗的Container是不會直接被RM分配執行的,RM只是負責將這些被置為失敗的Container信息告訴它們所對應的ApplicationMaster,需不需要重新運行它說的算,如果需要重新運行的話,該ApplicationMaster要重新向RM申請資源,然后由ApplicationMaster與對應的NodeManager通信以重新運行之前失敗的Container。
2、ApplicationMaster管理模塊
ApplicationMaster的管理主要是用ResouceManager內部的3個組件來完成:ApplicationMasterLauncher、AMLivelinessMonitor、ApplicationMasterService。
(1)先說說ApplicationMaster和ResourceManager整個的交互流程:
步驟一:
當ResourceManager接收到客戶端提交應用程序請求時就會立馬向資源管理器申請一個資源用於啟動該應用程序所對應的ApplicationMaster,申請到資源后由ApplicationMasterLaucher與對應的NodeManager進行通信,要求該NodemManager在其所在節點啟動該ApplicationMaster。
步驟二:
ApplicationMaster啟動完畢后,ApplicationMasterLuacher通過事件的形式將剛剛啟動的ApplicationMaster注冊到AMLivelinessMonitor,以啟動心跳監控。
步驟三:
ApplicationMaster啟動后主動向ApplicationMasterService注冊,並將自己所在host、端口等信息向其匯報。
步驟四:
ApplicationMaster在運行的過程中不斷向ApplicationMasterService發送心跳。
步驟五:
ApplicationMasterService每次收到ApplicationMaster的心跳信息后,會同時AMLivelinessMonitor更新其最近一次發送心跳的時間。
步驟六:
當應用程序運行完畢后,ApplicationMaster向ApplicationMasterService請求注銷自己。
步驟七:
ApplicationMasterService收到注銷請求后,會將該應用程序的運行狀態標注為完成,並且同時AMLivelinessMonitor移除對該ApplicationMaster的心跳監控。
(2)內置管理組件的詳細說明
這里展開說說這3個組件的一些運行機理。
-
ApplicationMasterLaucher
ApplicationMasterLaucher是以線程池方式實現的一個事件處理器,其主要處理AMLaucherEvent類型的事件,包括啟動(LAUNCH)和清除(CLEANUP)一個ApplicationMaster的事件。
當接收到LAUNCH類型的事件,ApplicationMasterLaucher立馬會和對應的NodeManager進行通信,並且帶上啟動該ApplicationMaster所需要的各種信息,包括:啟動命令、JAR包、環境變量等信息。NodeManager接收到來自ApplicationMasterLaucher的啟動命令就會啟動ApplicationMaster。
當接收到CLEANUP類型事件,ApplicationMasterLaucher立馬會和對應的NodeManager進行通信,要求NodeManager殺死該ApplicationMaster,並釋放掉資源。
-
AMLivelinessMonitor
AMLivelinessMonitor的功能和NMLivelinessMonitor的功能幾乎一樣,只不過AMLivelinessMonitor監控的是ApplicationMaster,而NMLivelinessMonitor監控的是NodeManager。
AMLivelinessMonitor會周期性地遍歷集群中的所有ApplicationMaster,如果某個ApplicationMaster在一定時間內(默認10min,可以有參數yarn.am.liveness-monitor.expiry-interval-ms配置)沒有進行心跳匯報,那么則認為它已經死掉,同時該ApplicationMaster關聯運行的Container也會被置為運行失敗釋放資源。如果Application運行失敗,則有ResourceManager重新為它申請資源,並且在另外的節點上啟動它(AM啟動嘗試次數由參數yarn.resourcemanager.am.max-attempts控制,默認2)。那么這些被置為失敗的Container是不會直接被RM分配執行的,RM只是負責將這些被置為失敗的Container信息告訴它們所對應的ApplicationMaster,需不需要重新運行它說的算,如果需要從新運行的話,該ApplicationMaster要從新向RM申請資源,然后由ApplicationMaster與對應的NodeManager通信以從新運行之前失敗的Container。
-
ApplicationMasterService
ApplicationMasterService的主要職能是處理來自ApplicationMaster的心跳請求,另外也還處理Application的注冊和清理請求。注冊是Application啟動完成后發生的,它向ApplicationMasterService發送注冊請求包,包含:ApplicationMaster所在的節點、RPC端口、tracking url等信息。
處理心跳信息是周期型行為,只要ApplicationMaster還在運行此類請求都會發生。ApplicationMaster向ApplicationMasterService發送的心跳請求包,包含信息:請求資源類型的描述、待釋放的container列表等。ApplicationMasterService返回的心跳應答信息包含:ApplicationMasterService為之分配的Container、失敗的Container等信息。
清理請求是在ApplicationMaster運行完畢后,向ApplicationMasterService發送的,主要是叫其回收釋放資源。