Flink源碼閱讀(一)——Per-job之Yarn的作業調度(一)


1、前言

  Flink作業提交到Yarn上之后,后續的AM的生成、Job的處理過程和Flink基本沒什么關系了,但是為大致了解Flink on yarn的Per-Job模式的整體過程,這里還是將這系列博客歸到Flink源碼閱讀系列了,本系列博客計划三篇。

  本文着重分析submitApplication之后,Yarn的ResourceManager為任務的ApplicationMater分配container的過程。

  說明:文中源碼是從Flink 1.9中跳轉過去,主要涉及hadoop-yarn-server-resourcemanager-2.4.1.jar、flink-shaded-hadoop-2-2.4.1-7.0.jar。

  博主水平有限,歡迎大伙留言交流。

涉及的重要概念【1】:

  1)RMApp:每個application是一個RMApp對象,其包含了application的各種信息,實現類為RMAppImpl;

  2)RMAppAttempt:RMApp可以有多個app attempt,即對應着多個RMAppAttempt對象,也就是任務狀態的變化的過程。具體對應着那個,取決於前面的RMAppAttempt是否執行成功,如果不成功,會啟動另外一個,直到運行成功;

  3)Dispatcher:中央事件調度器,各個狀態機的事件調度器會在中央事件調度器中注冊。該調度器維護了一個事件隊列,其會不斷掃描整個隊列,取出事件並檢查事件類型,然后交給相應的事件調度器處理。其實現類為AsyncDispatcher和MultiThreadedDispatcher,后者是創建一個list用於放AsyncDispatcher

2、事件的提交到調度  

  1、在Flink on yarn的Per-job模式源碼解析一文中提到,client提交的報文被封裝成request后被ClientRMService.submitApplication()方法處理。其過程如下:

  1)在該方法中會先檢查與Yarn RM相互獨立的配置,比如applicationId、提交到的資源對列名、任務名等;

  2)調用RMAppManager.submitApplication()提交任務。

  代碼如下:

 1 public SubmitApplicationResponse submitApplication(
 2       SubmitApplicationRequest request) throws YarnException {
 3     ApplicationSubmissionContext submissionContext = request
 4         .getApplicationSubmissionContext();
 5     ApplicationId applicationId = submissionContext.getApplicationId();
 6 
 7     // ApplicationSubmissionContext needs to be validated for safety - only
 8     // those fields that are independent of the RM's configuration will be
 9     // checked here, those that are dependent on RM configuration are validated
10     // in RMAppManager.
11 
12     String user = null;
13     try {
14       // Safety
15       user = UserGroupInformation.getCurrentUser().getShortUserName();
16     } catch (IOException ie) {
17       LOG.warn("Unable to get the current user.", ie);
18       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
19           ie.getMessage(), "ClientRMService",
20           "Exception in submitting application", applicationId);
21       throw RPCUtil.getRemoteException(ie);
22     }
23     //開始檢查applicationId是否已存在,檢查對列是否設置等
24     // Check whether app has already been put into rmContext,
25     // If it is, simply return the response
26     if (rmContext.getRMApps().get(applicationId) != null) {
27       LOG.info("This is an earlier submitted application: " + applicationId);
28       return SubmitApplicationResponse.newInstance();
29     }
30 
31     if (submissionContext.getQueue() == null) {
32       submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
33     }
34     if (submissionContext.getApplicationName() == null) {
35       submissionContext.setApplicationName(
36           YarnConfiguration.DEFAULT_APPLICATION_NAME);
37     }
38     if (submissionContext.getApplicationType() == null) {
39       submissionContext
40         .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
41     } else {
42       if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
43         submissionContext.setApplicationType(submissionContext
44           .getApplicationType().substring(0,
45             YarnConfiguration.APPLICATION_TYPE_LENGTH));
46       }
47     }
48 
49     try {
50     //提交application
51       // call RMAppManager to submit application directly
52       rmAppManager.submitApplication(submissionContext,
53           System.currentTimeMillis(), user);
54 
55       LOG.info("Application with id " + applicationId.getId() + 
56           " submitted by user " + user);
57       RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
58           "ClientRMService", applicationId);
59     } catch (YarnException e) {
60       LOG.info("Exception in submitting application with id " +
61           applicationId.getId(), e);
62       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
63           e.getMessage(), "ClientRMService",
64           "Exception in submitting application", applicationId);
65       throw e;
66     }
67 
68     SubmitApplicationResponse response = recordFactory
69         .newRecordInstance(SubmitApplicationResponse.class);
70     return response;
71   }
View Code

   2、在RMAppManager.submitApplication()中主要干兩件事:一是啟動狀態機;二是根據是否開啟安全認證(Yarn的配置)走不同的分支去調度,代碼如下:

 1 protected void submitApplication(
 2       ApplicationSubmissionContext submissionContext, long submitTime,
 3       String user) throws YarnException {
 4     ApplicationId applicationId = submissionContext.getApplicationId();
 5 
 6     //創建application,在構造函數中會啟動狀態機,此外,會根據設置決定是否保存記錄
 7     RMAppImpl application =
 8         createAndPopulateNewRMApp(submissionContext, submitTime, user);
 9     ApplicationId appId = submissionContext.getApplicationId();
10     //開啟安全認證
11     if (UserGroupInformation.isSecurityEnabled()) {
12       Credentials credentials = null;
13       try {
14         credentials = parseCredentials(submissionContext);
15         this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
16           credentials, submissionContext.getCancelTokensWhenComplete());
17       } catch (Exception e) {
18         LOG.warn("Unable to parse credentials.", e);
19         // Sending APP_REJECTED is fine, since we assume that the
20         // RMApp is in NEW state and thus we haven't yet informed the
21         // scheduler about the existence of the application
22         assert application.getState() == RMAppState.NEW;
23         this.rmContext.getDispatcher().getEventHandler()
24           .handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
25         throw RPCUtil.getRemoteException(e);
26       }
27     } else {
28       //Dispatcher不會立即啟動,事件會被塞進對列
29       //啟動RMApp的狀態機,等待dispathcer調用對應的事件調度器
30       // Dispatcher is not yet started at this time, so these START events
31       // enqueued should be guaranteed to be first processed when dispatcher
32       // gets started.
33       this.rmContext.getDispatcher().getEventHandler()
34         .handle(new RMAppEvent(applicationId, RMAppEventType.START));
35     }
36   }
View Code

  3、Resourcemanager中有AsyncDispatcher來調度事件,各種事件對應的調度器可以看看ResourceManager類。

  這里僅給出其部分結構圖,在這里我們主要關注ApplicationEventDispatcher。

 1 public static final class ApplicationEventDispatcher implements
 2       EventHandler<RMAppEvent> {
 3 
 4     private final RMContext rmContext;
 5 
 6     public ApplicationEventDispatcher(RMContext rmContext) {
 7       this.rmContext = rmContext;
 8     }
 9 
10     @Override
11     public void handle(RMAppEvent event) {
12       ApplicationId appID = event.getApplicationId();
13       RMApp rmApp = this.rmContext.getRMApps().get(appID);
14       if (rmApp != null) {
15         try {
16         //調用RMApp的狀態機
17           rmApp.handle(event);
18         } catch (Throwable t) {
19           LOG.error("Error in handling event type " + event.getType()
20               + " for application " + appID, t);
21         }
22       }
23     }
24   }

  這里的handle(...)方法比較簡單,就不貼源碼分析了。下面我們着重分析application狀態的變化過程。

3、Application狀態的變化

  1、從NEW->NEW_SAVING

   1)在RMAppImpl.java類中

 1 private static final StateMachineFactory<RMAppImpl,
 2                                            RMAppState,
 3                                            RMAppEventType,
 4                                            RMAppEvent> stateMachineFactory
 5                                = new StateMachineFactory<RMAppImpl,
 6                                            RMAppState,
 7                                            RMAppEventType,
 8                                            RMAppEvent>(RMAppState.NEW)
 9 
10      //這里僅給出部分
11      // Transitions from NEW state
12     .addTransition(RMAppState.NEW, RMAppState.NEW,
13         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
14     //把RMApp的狀態從NEW變成NEW_SAVING
15     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
16         RMAppEventType.START, new RMAppNewlySavingTransition())
17     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
18             RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
19             RMAppState.KILLED, RMAppState.FINAL_SAVING),
20         RMAppEventType.RECOVER, new RMAppRecoveredTransition())
21         .........

   2)我們沿着new RMAppNewlySavingTransition()分析,其源碼如下:

 1  private static final class RMAppNewlySavingTransition extends RMAppTransition {
 2     @Override
 3     public void transition(RMAppImpl app, RMAppEvent event) {
 4 
 5       // If recovery is enabled then store the application information in a
 6       // non-blocking call so make sure that RM has stored the information
 7       // needed to restart the AM after RM restart without further client
 8       // communication
 9       LOG.info("Storing application with id " + app.applicationId);
10       app.rmContext.getStateStore().storeNewApplication(app);
11     }
12   }

  3)從其注釋,我們可以清晰的得到的,這里主要是保存application的信息,其主要調用的是RMStateStore.storeNewApplication(),代碼如下:

 1   /**
 2    * Non-Blocking API
 3    * ResourceManager services use this to store the application's state
 4    * This does not block the dispatcher threads
 5    * RMAppStoredEvent will be sent on completion to notify the RMApp
 6    */
 7   @SuppressWarnings("unchecked")
 8   public synchronized void storeNewApplication(RMApp app) {
 9     ApplicationSubmissionContext context = app
10                                             .getApplicationSubmissionContext();
11     assert context instanceof ApplicationSubmissionContextPBImpl;
12     ApplicationState appState =
13         new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
14           app.getUser());
15     dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));     //下面重點說明
16   }

  4)從上面代碼的15行(標有重點說明),我們可以看到storeAppEvent的動作又交給Dispatcher去調度了,在類RMStateStore中的下面部分(266~277)

 1      //定義
 2    AsyncDispatcher dispatcher;
 3 
 4   @Override
 5   protected void serviceInit(Configuration conf) throws Exception{
 6     // create async handler
 7     dispatcher = new AsyncDispatcher();
 8     dispatcher.init(conf);
 9     //ForwardingEventHandler是具體實施動作的類
10     dispatcher.register(RMStateStoreEventType.class, 
11                         new ForwardingEventHandler());
12     dispatcher.setDrainEventsOnStop();
13     initInternal(conf);
14   }

  2、從NEW_SAVING ->SUBMIT

  1)進去內部類ForwardingEventHandler,我們發現其主要是重寫了handle()方法,調用RMStateStore.handleStoreEvent(),跟進這個方法,代碼如下:

 1 //代碼太多沒貼全,詳細可以看看RMStateStore類的598~699行
 2 // Dispatcher related code
 3   protected void handleStoreEvent(RMStateStoreEvent event) {
 4     if (event.getType().equals(RMStateStoreEventType.STORE_APP)
 5         || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
 6       ApplicationState appState = null;
 7       if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
 8         appState = ((RMStateStoreAppEvent) event).getAppState();
 9       } else {
10         assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
11         appState = ((RMStateUpdateAppEvent) event).getAppState();
12       }
13 
14       Exception storedException = null;
15       ApplicationStateDataPBImpl appStateData =
16           (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
17             .newApplicationStateData(appState.getSubmitTime(),
18               appState.getStartTime(), appState.getUser(),
19               appState.getApplicationSubmissionContext(), appState.getState(),
20               appState.getDiagnostics(), appState.getFinishTime());
21 
22       ApplicationId appId =
23           appState.getApplicationSubmissionContext().getApplicationId();
24 
25       LOG.info("Storing info for app: " + appId);
26       try {
27         if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
28         //狀態的保存可以保存在ZooKeeper、內存和文件系統中(整個過程未細看)
29           storeApplicationStateInternal(appId, appStateData);
30          //將event的狀態修改為APP_NEW_SAVED,然后進入RMAppImpl類的狀態機進行下一步轉換
31           notifyDoneStoringApplication(appId, storedException);
32         } else {
33           assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
34           updateApplicationStateInternal(appId, appStateData);
35           notifyDoneUpdatingApplication(appId, storedException);
36         }
37       } catch (Exception e) {
38         LOG.error("Error storing/updating app: " + appId, e);
39         notifyStoreOperationFailed(e);
40       }
41     } else if{......}
42     }

 2)這里給出RMAppImpl類的關鍵代碼(152~156),如下:

1 // Transitions from NEW_SAVING state
2     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
3         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
4     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
5         RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())  //關鍵

3、SUBMIT - >ACCEPTED

1)沿着代碼,我們看一下AddApplicationToSchedulerTransition類

 1 private static final class AddApplicationToSchedulerTransition extends
 2       RMAppTransition {
 3     @Override
 4     public void transition(RMAppImpl app, RMAppEvent event) {
 5       if (event instanceof RMAppNewSavedEvent) {
 6         RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
 7         // For HA this exception needs to be handled by giving up
 8         // master status if we got fenced
 9         if (((RMAppNewSavedEvent) event).getStoredException() != null) {
10           LOG.error(
11             "Failed to store application: " + storeEvent.getApplicationId(),
12             storeEvent.getStoredException());
13           ExitUtil.terminate(1, storeEvent.getStoredException());
14         }
15       }
16       //這里會調用yarn的scheduler去處理,這里我們分析目前社區推薦的CapacityScheduler
17       app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
18         app.submissionContext.getQueue(), app.user));
19     }
20   }

2)在CapacityScheduler中的882~887行,

1 case APP_ADDED:
2     {
3       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
4       //這里主要是對queue做一些檢查,然后將Event提交到queue,
5       //事件的狀態又為APP_ACCEPTED,從而重新觸發RMAppImpl中的狀態機
6       //感興趣的可以去看一下511~546行
7       addApplication(appAddedEvent.getApplicationId(),
8         appAddedEvent.getQueue(), appAddedEvent.getUser());
9     }

3)重新回到RMAppImpl類中

1     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
2         RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())

 4、RMAppAttempt狀態變化

1、NEW -> SUBMIT

1)我們這里跟進去StartAppAttemptTransition,看看其干了什么,經過跟蹤代碼,代碼如下(657~679):

1 private void
2       createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
3       //這里會記錄同一個application的重試次數
4     createNewAttempt();
5     //在新建RMAppStartAttemptEvent對象時會觸發RMAppAttemptEventType.START事件,
6     handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
7       transferStateFromPreviousAttempt));
8   }

2)在類RMAppAttemptImpl的狀態機中,我們可以看到RMAppAttemptState的狀態從NEW變為SUBMITTED

1        // Transitions from NEW State
2       .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
3           RMAppAttemptEventType.START, new AttemptStartedTransition())

2、SUBMITTED -> SCHEDULED

 1)和上面的流程一樣,我們跟進AttemptStartedTransition類,發現其主要是重寫了transtition方法,具體代碼如下:

 1 private static final class AttemptStartedTransition extends BaseTransition {
 2     @Override
 3     public void transition(RMAppAttemptImpl appAttempt,
 4         RMAppAttemptEvent event) {
 5 
 6         boolean transferStateFromPreviousAttempt = false;
 7       if (event instanceof RMAppStartAttemptEvent) {
 8         transferStateFromPreviousAttempt =
 9             ((RMAppStartAttemptEvent) event)
10               .getTransferStateFromPreviousAttempt();
11       }
12       appAttempt.startTime = System.currentTimeMillis();
13 
14       // Register with the ApplicationMasterService
15       appAttempt.masterService
16           .registerAppAttempt(appAttempt.applicationAttemptId);
17 
18       if (UserGroupInformation.isSecurityEnabled()) {
19         appAttempt.clientTokenMasterKey =
20             appAttempt.rmContext.getClientToAMTokenSecretManager()
21               .createMasterKey(appAttempt.applicationAttemptId);
22       }
23 
24       // create AMRMToken
25       AMRMTokenIdentifier id =
26           new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
27       appAttempt.amrmToken =
28           new Token<AMRMTokenIdentifier>(id,
29             appAttempt.rmContext.getAMRMTokenSecretManager());
30 
31       // Add the applicationAttempt to the scheduler and inform the scheduler
32       // whether to transfer the state from previous attempt.
33       //觸發scheduler的APP_ATTEMPT_ADDED事件,這里分析CapacityScheduler
34       appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
35         appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
36     }
37   }

 2)在CapacityScheduler中,我們分析其分支代碼如下:

 1    case APP_ATTEMPT_ADDED:
 2     {
 3       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
 4           (AppAttemptAddedSchedulerEvent) event;
 5       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
 6         appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
 7     }
 8     break;
 9 
10 //--------------------------------------------------
11     private synchronized void addApplicationAttempt(
12       ApplicationAttemptId applicationAttemptId,
13       boolean transferStateFromPreviousAttempt) {
14     SchedulerApplication application =
15         applications.get(applicationAttemptId.getApplicationId());
16     CSQueue queue = (CSQueue) application.getQueue();
17 
18     FiCaSchedulerApp attempt =
19         new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
20           queue, queue.getActiveUsersManager(), rmContext);
21     if (transferStateFromPreviousAttempt) {
22       attempt.transferStateFromPreviousAttempt(application
23         .getCurrentAppAttempt());
24     }
25     application.setCurrentAppAttempt(attempt);
26 
27     queue.submitApplicationAttempt(attempt, application.getUser());
28     LOG.info("Added Application Attempt " + applicationAttemptId
29         + " to scheduler from user " + application.getUser() + " in queue "
30         + queue.getQueueName());
31     //這里重新觸發RMAppAttemptImpl的狀態機,RMAppAttemptState從SUBMITTED變為SCHEDULED
32     rmContext.getDispatcher().getEventHandler() .handle(
33         new RMAppAttemptEvent(applicationAttemptId,
34           RMAppAttemptEventType.ATTEMPT_ADDED));
35   }

 3)重新回到RMAppAttemptImpl類中,分析狀態機是如何處理事件類型為ATTEMPT_ADDED的,代碼如下:

 1 // Transitions from SUBMITTED state
 2       .addTransition(RMAppAttemptState.SUBMITTED, 
 3           EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
 4                      RMAppAttemptState.SCHEDULED),
 5           RMAppAttemptEventType.ATTEMPT_ADDED,
 6           new ScheduleTransition())
 7     //---------------------------------------------------
 8 
 9     private static final class ScheduleTransition
10       implements
11       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
12     @Override
13     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
14         RMAppAttemptEvent event) {
15         //沒有對應的ApplicationMater
16       if (!appAttempt.submissionContext.getUnmanagedAM()) {
17         // Request a container for the AM.
18         //申請一個container,容器大小由我們設定
19         ResourceRequest request =
20             BuilderUtils.newResourceRequest(
21                 AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
22                     .getSubmissionContext().getResource(), 1);
23 
24         // SchedulerUtils.validateResourceRequests is not necessary because
25         // AM resource has been checked when submission
26         //根據心跳信息找到有資源的節點,分配一個container作為AM,后面代碼的邏輯比較清楚了
27         Allocation amContainerAllocation = appAttempt.scheduler.allocate(
28             appAttempt.applicationAttemptId,
29             Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null);
30         if (amContainerAllocation != null
31             && amContainerAllocation.getContainers() != null) {
32           assert (amContainerAllocation.getContainers().size() == 0);
33         }
34         return RMAppAttemptState.SCHEDULED;
35       } else {
36         // save state and then go to LAUNCHED state
37         appAttempt.storeAttempt();
38         return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
39       }
40     }
41   }

Ref:

【1】https://www.iteye.com/blog/humingminghz-2326608 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM