1、前言
Flink作業提交到Yarn上之后,后續的AM的生成、Job的處理過程和Flink基本沒什么關系了,但是為大致了解Flink on yarn的Per-Job模式的整體過程,這里還是將這系列博客歸到Flink源碼閱讀系列了,本系列博客計划三篇。
本文着重分析submitApplication之后,Yarn的ResourceManager為任務的ApplicationMater分配container的過程。
說明:文中源碼是從Flink 1.9中跳轉過去,主要涉及hadoop-yarn-server-resourcemanager-2.4.1.jar、flink-shaded-hadoop-2-2.4.1-7.0.jar。
博主水平有限,歡迎大伙留言交流。
涉及的重要概念【1】:
1)RMApp:每個application是一個RMApp對象,其包含了application的各種信息,實現類為RMAppImpl;
2)RMAppAttempt:RMApp可以有多個app attempt,即對應着多個RMAppAttempt對象,也就是任務狀態的變化的過程。具體對應着那個,取決於前面的RMAppAttempt是否執行成功,如果不成功,會啟動另外一個,直到運行成功;
3)Dispatcher:中央事件調度器,各個狀態機的事件調度器會在中央事件調度器中注冊。該調度器維護了一個事件隊列,其會不斷掃描整個隊列,取出事件並檢查事件類型,然后交給相應的事件調度器處理。其實現類為AsyncDispatcher和MultiThreadedDispatcher,后者是創建一個list用於放AsyncDispatcher
2、事件的提交到調度
1、在Flink on yarn的Per-job模式源碼解析一文中提到,client提交的報文被封裝成request后被ClientRMService.submitApplication()方法處理。其過程如下:
1)在該方法中會先檢查與Yarn RM相互獨立的配置,比如applicationId、提交到的資源對列名、任務名等;
2)調用RMAppManager.submitApplication()提交任務。
代碼如下:

1 public SubmitApplicationResponse submitApplication( 2 SubmitApplicationRequest request) throws YarnException { 3 ApplicationSubmissionContext submissionContext = request 4 .getApplicationSubmissionContext(); 5 ApplicationId applicationId = submissionContext.getApplicationId(); 6 7 // ApplicationSubmissionContext needs to be validated for safety - only 8 // those fields that are independent of the RM's configuration will be 9 // checked here, those that are dependent on RM configuration are validated 10 // in RMAppManager. 11 12 String user = null; 13 try { 14 // Safety 15 user = UserGroupInformation.getCurrentUser().getShortUserName(); 16 } catch (IOException ie) { 17 LOG.warn("Unable to get the current user.", ie); 18 RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, 19 ie.getMessage(), "ClientRMService", 20 "Exception in submitting application", applicationId); 21 throw RPCUtil.getRemoteException(ie); 22 } 23 //開始檢查applicationId是否已存在,檢查對列是否設置等 24 // Check whether app has already been put into rmContext, 25 // If it is, simply return the response 26 if (rmContext.getRMApps().get(applicationId) != null) { 27 LOG.info("This is an earlier submitted application: " + applicationId); 28 return SubmitApplicationResponse.newInstance(); 29 } 30 31 if (submissionContext.getQueue() == null) { 32 submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); 33 } 34 if (submissionContext.getApplicationName() == null) { 35 submissionContext.setApplicationName( 36 YarnConfiguration.DEFAULT_APPLICATION_NAME); 37 } 38 if (submissionContext.getApplicationType() == null) { 39 submissionContext 40 .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); 41 } else { 42 if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { 43 submissionContext.setApplicationType(submissionContext 44 .getApplicationType().substring(0, 45 YarnConfiguration.APPLICATION_TYPE_LENGTH)); 46 } 47 } 48 49 try { 50 //提交application 51 // call RMAppManager to submit application directly 52 rmAppManager.submitApplication(submissionContext, 53 System.currentTimeMillis(), user); 54 55 LOG.info("Application with id " + applicationId.getId() + 56 " submitted by user " + user); 57 RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, 58 "ClientRMService", applicationId); 59 } catch (YarnException e) { 60 LOG.info("Exception in submitting application with id " + 61 applicationId.getId(), e); 62 RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, 63 e.getMessage(), "ClientRMService", 64 "Exception in submitting application", applicationId); 65 throw e; 66 } 67 68 SubmitApplicationResponse response = recordFactory 69 .newRecordInstance(SubmitApplicationResponse.class); 70 return response; 71 }
2、在RMAppManager.submitApplication()中主要干兩件事:一是啟動狀態機;二是根據是否開啟安全認證(Yarn的配置)走不同的分支去調度,代碼如下:

1 protected void submitApplication( 2 ApplicationSubmissionContext submissionContext, long submitTime, 3 String user) throws YarnException { 4 ApplicationId applicationId = submissionContext.getApplicationId(); 5 6 //創建application,在構造函數中會啟動狀態機,此外,會根據設置決定是否保存記錄 7 RMAppImpl application = 8 createAndPopulateNewRMApp(submissionContext, submitTime, user); 9 ApplicationId appId = submissionContext.getApplicationId(); 10 //開啟安全認證 11 if (UserGroupInformation.isSecurityEnabled()) { 12 Credentials credentials = null; 13 try { 14 credentials = parseCredentials(submissionContext); 15 this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, 16 credentials, submissionContext.getCancelTokensWhenComplete()); 17 } catch (Exception e) { 18 LOG.warn("Unable to parse credentials.", e); 19 // Sending APP_REJECTED is fine, since we assume that the 20 // RMApp is in NEW state and thus we haven't yet informed the 21 // scheduler about the existence of the application 22 assert application.getState() == RMAppState.NEW; 23 this.rmContext.getDispatcher().getEventHandler() 24 .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); 25 throw RPCUtil.getRemoteException(e); 26 } 27 } else { 28 //Dispatcher不會立即啟動,事件會被塞進對列 29 //啟動RMApp的狀態機,等待dispathcer調用對應的事件調度器 30 // Dispatcher is not yet started at this time, so these START events 31 // enqueued should be guaranteed to be first processed when dispatcher 32 // gets started. 33 this.rmContext.getDispatcher().getEventHandler() 34 .handle(new RMAppEvent(applicationId, RMAppEventType.START)); 35 } 36 }
3、Resourcemanager中有AsyncDispatcher來調度事件,各種事件對應的調度器可以看看ResourceManager類。
這里僅給出其部分結構圖,在這里我們主要關注ApplicationEventDispatcher。
1 public static final class ApplicationEventDispatcher implements 2 EventHandler<RMAppEvent> { 3 4 private final RMContext rmContext; 5 6 public ApplicationEventDispatcher(RMContext rmContext) { 7 this.rmContext = rmContext; 8 } 9 10 @Override 11 public void handle(RMAppEvent event) { 12 ApplicationId appID = event.getApplicationId(); 13 RMApp rmApp = this.rmContext.getRMApps().get(appID); 14 if (rmApp != null) { 15 try { 16 //調用RMApp的狀態機 17 rmApp.handle(event); 18 } catch (Throwable t) { 19 LOG.error("Error in handling event type " + event.getType() 20 + " for application " + appID, t); 21 } 22 } 23 } 24 }
這里的handle(...)方法比較簡單,就不貼源碼分析了。下面我們着重分析application狀態的變化過程。
3、Application狀態的變化
1、從NEW->NEW_SAVING
1)在RMAppImpl.java類中
1 private static final StateMachineFactory<RMAppImpl, 2 RMAppState, 3 RMAppEventType, 4 RMAppEvent> stateMachineFactory 5 = new StateMachineFactory<RMAppImpl, 6 RMAppState, 7 RMAppEventType, 8 RMAppEvent>(RMAppState.NEW) 9 10 //這里僅給出部分 11 // Transitions from NEW state 12 .addTransition(RMAppState.NEW, RMAppState.NEW, 13 RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) 14 //把RMApp的狀態從NEW變成NEW_SAVING 15 .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, 16 RMAppEventType.START, new RMAppNewlySavingTransition()) 17 .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, 18 RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, 19 RMAppState.KILLED, RMAppState.FINAL_SAVING), 20 RMAppEventType.RECOVER, new RMAppRecoveredTransition()) 21 .........
2)我們沿着new RMAppNewlySavingTransition()分析,其源碼如下:
1 private static final class RMAppNewlySavingTransition extends RMAppTransition { 2 @Override 3 public void transition(RMAppImpl app, RMAppEvent event) { 4 5 // If recovery is enabled then store the application information in a 6 // non-blocking call so make sure that RM has stored the information 7 // needed to restart the AM after RM restart without further client 8 // communication 9 LOG.info("Storing application with id " + app.applicationId); 10 app.rmContext.getStateStore().storeNewApplication(app); 11 } 12 }
3)從其注釋,我們可以清晰的得到的,這里主要是保存application的信息,其主要調用的是RMStateStore.storeNewApplication(),代碼如下:
1 /** 2 * Non-Blocking API 3 * ResourceManager services use this to store the application's state 4 * This does not block the dispatcher threads 5 * RMAppStoredEvent will be sent on completion to notify the RMApp 6 */ 7 @SuppressWarnings("unchecked") 8 public synchronized void storeNewApplication(RMApp app) { 9 ApplicationSubmissionContext context = app 10 .getApplicationSubmissionContext(); 11 assert context instanceof ApplicationSubmissionContextPBImpl; 12 ApplicationState appState = 13 new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, 14 app.getUser()); 15 dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); //下面重點說明 16 }
4)從上面代碼的15行(標有重點說明),我們可以看到storeAppEvent的動作又交給Dispatcher去調度了,在類RMStateStore中的下面部分(266~277)
1 //定義 2 AsyncDispatcher dispatcher; 3 4 @Override 5 protected void serviceInit(Configuration conf) throws Exception{ 6 // create async handler 7 dispatcher = new AsyncDispatcher(); 8 dispatcher.init(conf); 9 //ForwardingEventHandler是具體實施動作的類 10 dispatcher.register(RMStateStoreEventType.class, 11 new ForwardingEventHandler()); 12 dispatcher.setDrainEventsOnStop(); 13 initInternal(conf); 14 }
2、從NEW_SAVING ->SUBMIT
1)進去內部類ForwardingEventHandler,我們發現其主要是重寫了handle()方法,調用RMStateStore.handleStoreEvent(),跟進這個方法,代碼如下:
1 //代碼太多沒貼全,詳細可以看看RMStateStore類的598~699行 2 // Dispatcher related code 3 protected void handleStoreEvent(RMStateStoreEvent event) { 4 if (event.getType().equals(RMStateStoreEventType.STORE_APP) 5 || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { 6 ApplicationState appState = null; 7 if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { 8 appState = ((RMStateStoreAppEvent) event).getAppState(); 9 } else { 10 assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); 11 appState = ((RMStateUpdateAppEvent) event).getAppState(); 12 } 13 14 Exception storedException = null; 15 ApplicationStateDataPBImpl appStateData = 16 (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl 17 .newApplicationStateData(appState.getSubmitTime(), 18 appState.getStartTime(), appState.getUser(), 19 appState.getApplicationSubmissionContext(), appState.getState(), 20 appState.getDiagnostics(), appState.getFinishTime()); 21 22 ApplicationId appId = 23 appState.getApplicationSubmissionContext().getApplicationId(); 24 25 LOG.info("Storing info for app: " + appId); 26 try { 27 if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { 28 //狀態的保存可以保存在ZooKeeper、內存和文件系統中(整個過程未細看) 29 storeApplicationStateInternal(appId, appStateData); 30 //將event的狀態修改為APP_NEW_SAVED,然后進入RMAppImpl類的狀態機進行下一步轉換 31 notifyDoneStoringApplication(appId, storedException); 32 } else { 33 assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); 34 updateApplicationStateInternal(appId, appStateData); 35 notifyDoneUpdatingApplication(appId, storedException); 36 } 37 } catch (Exception e) { 38 LOG.error("Error storing/updating app: " + appId, e); 39 notifyStoreOperationFailed(e); 40 } 41 } else if{......} 42 }
2)這里給出RMAppImpl類的關鍵代碼(152~156),如下:
1 // Transitions from NEW_SAVING state 2 .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, 3 RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) 4 .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, 5 RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) //關鍵
3、SUBMIT - >ACCEPTED
1)沿着代碼,我們看一下AddApplicationToSchedulerTransition類
1 private static final class AddApplicationToSchedulerTransition extends 2 RMAppTransition { 3 @Override 4 public void transition(RMAppImpl app, RMAppEvent event) { 5 if (event instanceof RMAppNewSavedEvent) { 6 RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event; 7 // For HA this exception needs to be handled by giving up 8 // master status if we got fenced 9 if (((RMAppNewSavedEvent) event).getStoredException() != null) { 10 LOG.error( 11 "Failed to store application: " + storeEvent.getApplicationId(), 12 storeEvent.getStoredException()); 13 ExitUtil.terminate(1, storeEvent.getStoredException()); 14 } 15 } 16 //這里會調用yarn的scheduler去處理,這里我們分析目前社區推薦的CapacityScheduler 17 app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, 18 app.submissionContext.getQueue(), app.user)); 19 } 20 }
2)在CapacityScheduler中的882~887行,
1 case APP_ADDED: 2 { 3 AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; 4 //這里主要是對queue做一些檢查,然后將Event提交到queue, 5 //事件的狀態又為APP_ACCEPTED,從而重新觸發RMAppImpl中的狀態機 6 //感興趣的可以去看一下511~546行 7 addApplication(appAddedEvent.getApplicationId(), 8 appAddedEvent.getQueue(), appAddedEvent.getUser()); 9 }
3)重新回到RMAppImpl類中
1 .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, 2 RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
4、RMAppAttempt狀態變化
1、NEW -> SUBMIT
1)我們這里跟進去StartAppAttemptTransition,看看其干了什么,經過跟蹤代碼,代碼如下(657~679):
1 private void 2 createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { 3 //這里會記錄同一個application的重試次數 4 createNewAttempt(); 5 //在新建RMAppStartAttemptEvent對象時會觸發RMAppAttemptEventType.START事件, 6 handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), 7 transferStateFromPreviousAttempt)); 8 }
2)在類RMAppAttemptImpl的狀態機中,我們可以看到RMAppAttemptState的狀態從NEW變為SUBMITTED
1 // Transitions from NEW State 2 .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, 3 RMAppAttemptEventType.START, new AttemptStartedTransition())
2、SUBMITTED -> SCHEDULED
1)和上面的流程一樣,我們跟進AttemptStartedTransition類,發現其主要是重寫了transtition方法,具體代碼如下:
1 private static final class AttemptStartedTransition extends BaseTransition { 2 @Override 3 public void transition(RMAppAttemptImpl appAttempt, 4 RMAppAttemptEvent event) { 5 6 boolean transferStateFromPreviousAttempt = false; 7 if (event instanceof RMAppStartAttemptEvent) { 8 transferStateFromPreviousAttempt = 9 ((RMAppStartAttemptEvent) event) 10 .getTransferStateFromPreviousAttempt(); 11 } 12 appAttempt.startTime = System.currentTimeMillis(); 13 14 // Register with the ApplicationMasterService 15 appAttempt.masterService 16 .registerAppAttempt(appAttempt.applicationAttemptId); 17 18 if (UserGroupInformation.isSecurityEnabled()) { 19 appAttempt.clientTokenMasterKey = 20 appAttempt.rmContext.getClientToAMTokenSecretManager() 21 .createMasterKey(appAttempt.applicationAttemptId); 22 } 23 24 // create AMRMToken 25 AMRMTokenIdentifier id = 26 new AMRMTokenIdentifier(appAttempt.applicationAttemptId); 27 appAttempt.amrmToken = 28 new Token<AMRMTokenIdentifier>(id, 29 appAttempt.rmContext.getAMRMTokenSecretManager()); 30 31 // Add the applicationAttempt to the scheduler and inform the scheduler 32 // whether to transfer the state from previous attempt. 33 //觸發scheduler的APP_ATTEMPT_ADDED事件,這里分析CapacityScheduler 34 appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( 35 appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); 36 } 37 }
2)在CapacityScheduler中,我們分析其分支代碼如下:
1 case APP_ATTEMPT_ADDED: 2 { 3 AppAttemptAddedSchedulerEvent appAttemptAddedEvent = 4 (AppAttemptAddedSchedulerEvent) event; 5 addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), 6 appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); 7 } 8 break; 9 10 //-------------------------------------------------- 11 private synchronized void addApplicationAttempt( 12 ApplicationAttemptId applicationAttemptId, 13 boolean transferStateFromPreviousAttempt) { 14 SchedulerApplication application = 15 applications.get(applicationAttemptId.getApplicationId()); 16 CSQueue queue = (CSQueue) application.getQueue(); 17 18 FiCaSchedulerApp attempt = 19 new FiCaSchedulerApp(applicationAttemptId, application.getUser(), 20 queue, queue.getActiveUsersManager(), rmContext); 21 if (transferStateFromPreviousAttempt) { 22 attempt.transferStateFromPreviousAttempt(application 23 .getCurrentAppAttempt()); 24 } 25 application.setCurrentAppAttempt(attempt); 26 27 queue.submitApplicationAttempt(attempt, application.getUser()); 28 LOG.info("Added Application Attempt " + applicationAttemptId 29 + " to scheduler from user " + application.getUser() + " in queue " 30 + queue.getQueueName()); 31 //這里重新觸發RMAppAttemptImpl的狀態機,RMAppAttemptState從SUBMITTED變為SCHEDULED 32 rmContext.getDispatcher().getEventHandler() .handle( 33 new RMAppAttemptEvent(applicationAttemptId, 34 RMAppAttemptEventType.ATTEMPT_ADDED)); 35 }
3)重新回到RMAppAttemptImpl類中,分析狀態機是如何處理事件類型為ATTEMPT_ADDED的,代碼如下:
1 // Transitions from SUBMITTED state 2 .addTransition(RMAppAttemptState.SUBMITTED, 3 EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 4 RMAppAttemptState.SCHEDULED), 5 RMAppAttemptEventType.ATTEMPT_ADDED, 6 new ScheduleTransition()) 7 //--------------------------------------------------- 8 9 private static final class ScheduleTransition 10 implements 11 MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { 12 @Override 13 public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, 14 RMAppAttemptEvent event) { 15 //沒有對應的ApplicationMater 16 if (!appAttempt.submissionContext.getUnmanagedAM()) { 17 // Request a container for the AM. 18 //申請一個container,容器大小由我們設定 19 ResourceRequest request = 20 BuilderUtils.newResourceRequest( 21 AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt 22 .getSubmissionContext().getResource(), 1); 23 24 // SchedulerUtils.validateResourceRequests is not necessary because 25 // AM resource has been checked when submission 26 //根據心跳信息找到有資源的節點,分配一個container作為AM,后面代碼的邏輯比較清楚了 27 Allocation amContainerAllocation = appAttempt.scheduler.allocate( 28 appAttempt.applicationAttemptId, 29 Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null); 30 if (amContainerAllocation != null 31 && amContainerAllocation.getContainers() != null) { 32 assert (amContainerAllocation.getContainers().size() == 0); 33 } 34 return RMAppAttemptState.SCHEDULED; 35 } else { 36 // save state and then go to LAUNCHED state 37 appAttempt.storeAttempt(); 38 return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; 39 } 40 } 41 }
Ref:
【1】https://www.iteye.com/blog/humingminghz-2326608