Flink源碼閱讀(一)——Flink on Yarn的Per-job模式源碼簡析


 一、前言

  個人感覺學習Flink其實最不應該錯過的博文是Flink社區的博文系列,里面的文章是不會讓人失望的。強烈安利:https://ververica.cn/developers-resources/。  

  本文是自己第一次嘗試寫源碼閱讀的文章,會努力將原理和源碼實現流程結合起來。文中有幾個點目前也是沒有弄清楚,若是寫在一篇博客里,時間跨度太大,但又怕后期遺忘,所以先記下來,后期進一步閱讀源碼后再添上,若是看到不完整版博文的看官,對不住!

  文中若是寫的不准確的地方歡迎留言指出

  源碼系列基於Flink 1.9

二、Per-job提交任務原理

  Flink on Yarn模式下提交任務整體流程圖如下(圖源自Flink社區,鏈接見Ref  [1]

  圖1 Flink Runtime層架構圖

     2.1. Runtime層架構簡介

  Flink采取的是經典的master-salve模式,圖中的AM(ApplicationMater)為master,TaskManager是salve。

  AM中的Dispatcher用於接收client提交的任務和啟動相應的JobManager ;JobManager用於任務的接收,task的分配、管理task manager等;ResourceManager主要用於資源的申請和分配。

  這里有點需要注意:Flink本身也是具有ResourceManager和TaskManager的,這里雖然是on Yarn模式,但Flink本身也是擁有一套資源管理架構,雖然各個組件的名字一樣,但這里yarn只是一個資源的提供者,若是standalone模式,資源的提供者就是物理機或者虛擬機了。 

  2.2. Flink on Yarn 的Per-job模式提交任務的整體流程:

  1)執行Flink程序,就類似client,主要是將代碼進行優化形成JobGraph,向yarn的ResourceManager中的ApplicationManager申請資源啟動AM(ApplicationMater),AM所在節點是Yarn上的NodeManager上;

  2)當AM起來之后會啟動Dispatcher、ResourceManager,其中Dispatcher會啟動JobManager,ResourceManager會啟動slotManager用於slot的管理和分配;

  3)JobManager向ResourceManager(RM)申請資源用於任務的執行,最初TaskManager還沒有啟動,此時,RM會向yarn去申請資源,獲得資源后,會在資源中啟動TaskManager,相應啟動的slot會向slotManager中注冊,然后slotManager會將slot分配給只需資源的task,即向JobManager注冊信息,然后JobManager就會將任務提交到對應的slot中執行。其實Flink on yarn的session模式和Per-job模式最大的區別是,提交任務時RM已向Yarn申請了固定大小的資源,其TaskManager是已經啟動的。

  資源分配如詳細過程圖下:

 圖2 slot管理圖,源自Ref[1]

  更詳細的過程解析,強烈推薦Ref [2],是阿里Flink大牛寫的,本博客在后期的源碼分析過程也多依據此博客。 

三、源碼簡析

  提交任務語句

./flink run -m yarn-cluster ./flinkExample.jar

    1、Client端提交任務階段分析

  flink腳本的入口類是org.apache.flink.client.cli.CliFrontend。

  1)在CliFronted類的main()方法中,會加載flnk以及一些全局的配置項之后,根據命令行參數run,調用run()->runProgram()->deployJobCluster(),具體的代碼如下:

private <T> void runProgram(
            CustomCommandLine<T> customCommandLine,
            CommandLine commandLine,
            RunOptions runOptions,
            PackagedProgram program) throws ProgramInvocationException, FlinkException {
        final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);

        try {
            final T clusterId = customCommandLine.getClusterId(commandLine);

            final ClusterClient<T> client;

            // directly deploy the job if the cluster is started in job mode and detached
            if (clusterId == null && runOptions.getDetachedMode()) {
                int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
          //構建JobGraph
                final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

                final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
          //將任務提交到yarn上 client
= clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); ...................... } else{........}

  2)提交任務會調用YarnClusterDescriptor 類中deployJobCluster()->AbstractYarnClusterDescriptor類中deployInteral(),該方法會一直阻塞直到ApplicationMaster/JobManager在yarn上部署成功,其中最關鍵的調用是對startAppMaster()方法的調用,代碼如下:

 1 protected ClusterClient<ApplicationId>     deployInternal(
 2             ClusterSpecification clusterSpecification,
 3             String applicationName,
 4             String yarnClusterEntrypoint,
 5             @Nullable JobGraph jobGraph,
 6             boolean detached) throws Exception {
 7 
 8         //1、驗證集群是否可以訪問
 9         //2、若用戶組是否開啟安全認證
10         //3、檢查配置以及vcore是否滿足flink集群申請的需求
11         //4、指定的對列是否存在
12         //5、檢查內存是否滿足flink JobManager、NodeManager所需
13         //....................................
14 
15         //Entry
16         ApplicationReport report = startAppMaster(
17                 flinkConfiguration,
18                 applicationName,
19                 yarnClusterEntrypoint,
20                 jobGraph,
21                 yarnClient,
22                 yarnApplication,
23                 validClusterSpecification);
24 
25         //6、獲取flink集群端口、地址信息
26         //..........................................
27     }

  3)方法AbstractYarnClutserDescriptor.startAppMaster()主要是將配置文件和相關文件上傳至分布式存儲如HDFS,以及向Yarn上提交任務等,源碼分析如下:

 1 public ApplicationReport startAppMaster(
 2             Configuration configuration,
 3             String applicationName,
 4             String yarnClusterEntrypoint,
 5             JobGraph jobGraph,
 6             YarnClient yarnClient,
 7             YarnClientApplication yarnApplication,
 8             ClusterSpecification clusterSpecification) throws Exception {
 9 
10         // .......................
11 
12         //1、上傳conf目錄下logback.xml、log4j.properties
13 
14         //2、上傳環境變量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar
15         addEnvironmentFoldersToShipFiles(systemShipFiles);
16         //...........
17         //3、設置applications的高可用的方案,通過設置AM重啟次數,默認為1
18         //4、上傳ship files、user jars、
19         //5、為TaskManager設置slots、heap memory
20         //6、上傳flink-conf.yaml
21         //7、序列化JobGraph后上傳
22         //8、登錄權限檢查
23 
24         //.................
25 
26         //獲得啟動AM container的Java命令
27         final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
28                 yarnClusterEntrypoint,
29                 hasLogback,
30                 hasLog4j,
31                 hasKrb5,
32                 clusterSpecification.getMasterMemoryMB());
33 
34         //9、為aAM啟動綁定環境參數以及classpath和環境變量
35 
36         //..........................
37 
38         final String customApplicationName = customName != null ? customName : applicationName;
39         //10、應用名稱、應用類型、用戶提交的應用ContainerLaunchContext
40         appContext.setApplicationName(customApplicationName);
41         appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
42         appContext.setAMContainerSpec(amContainer);
43         appContext.setResource(capability);
44 
45         if (yarnQueue != null) {
46             appContext.setQueue(yarnQueue);
47         }
48 
49         setApplicationNodeLabel(appContext);
50 
51         setApplicationTags(appContext);
52 
53         //11、部署失敗刪除yarnFilesDir
54         // add a hook to clean up in case deployment fails
55         Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
56         Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
57 
58         LOG.info("Submitting application master " + appId);
59         
60         //Entry
61         yarnClient.submitApplication(appContext);
62 
63         LOG.info("Waiting for the  cluster to be allocated");
64         final long startTime = System.currentTimeMillis();
65         ApplicationReport report;
66         YarnApplicationState lastAppState = YarnApplicationState.NEW;
67         //12、阻塞等待直到running
68         loop: while (true) {
69             //...................
70             //每隔250ms通過YarnClient獲取應用報告
71             Thread.sleep(250);
72         }
73         //...........................
74         //13、部署成功刪除shutdown回調
75         // since deployment was successful, remove the hook
76         ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
77         return report;
78     }

   4)應用提交的Entry是YarnClientImpl.submitApplication(),該方法在於調用了ApplicationClientProtocolPBClientImpl.submitApplication(),其具體代碼如下:

 1 public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException {
 2 //取出報文
 3         SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto();
 4 
 5         try {
 6 //將報文發送發送到服務端,並將返回結果構成response
 7             return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto));
 8         } catch (ServiceException var4) {
 9             RPCUtil.unwrapAndThrowException(var4);
10             return null;
11         }
12     }
View Code

   報文就會通過RPC到達服務端,服務端處理報文的方法是ApplicationClientProtocolPBServiceImpl.submitApplication(),方法中會重新構建報文,然后通過ClientRMService.submitApplication()將應用請求提交到Yarn上的RMAppManager去提交任務(在Yarn的分配過后面會專門寫一系列的博客去說明)。

  至此,client端的流程就走完了,應用請求已提交到Yarn的ResourceManager上了,下面着重分析Flink Cluster啟動流程。

  2、Flink Cluster啟動流程分析

  1)在ClientRMService類的submitApplication()方法中,會先檢查任務是否已經提交(通過applicationID)、Yarn的queue是否為空等,然后將請求提交到RMAppManager(ARN RM內部管理應用生命周期的組件),若提交成功會輸出Application with id  {applicationId.getId()}  submitted by user  {user}的信息,具體分析如下:

 1 public SubmitApplicationResponse submitApplication(
 2             SubmitApplicationRequest request) throws YarnException {
 3         ApplicationSubmissionContext submissionContext = request
 4                 .getApplicationSubmissionContext();
 5         ApplicationId applicationId = submissionContext.getApplicationId();
 6 
 7         // ApplicationSubmissionContext needs to be validated for safety - only
 8         // those fields that are independent of the RM's configuration will be
 9         // checked here, those that are dependent on RM configuration are validated
10         // in RMAppManager.
11         //這里僅驗證不屬於RM的配置,屬於RM的配置將在RMAppManager驗證
12 
13         //1、檢查application是否已提交
14         //2、檢查提交的queue是否為null,是,則設置為默認queue(default)
15         //3、檢查是否設置application名,否,則為默認(N/A)
16         //4、檢查是否設置application類型,否,則為默認(YARN);是,若名字長度大於給定的長度(20),則會截斷
17         //.............................
18         
19         try {
20             // call RMAppManager to submit application directly
21             //直接submit任務
22             rmAppManager.submitApplication(submissionContext,
23                     System.currentTimeMillis(), user);
24 
25             //submit成功
26             LOG.info("Application with id " + applicationId.getId() +
27                     " submitted by user " + user);
28             RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
29                     "ClientRMService", applicationId);
30         } catch (YarnException e) {
31             //失敗會拋出異常
32         }
33         //..................
34     }

   2)RMAppManager類的submitApplication()方法主要是創建RMApp和向ResourceScheduler申請AM container,該部分直到在NodeManager上啟動AM container都是Yarn本身所為,其中具體過程在這里不詳細分析,詳細過程后期會分析,這里僅給出入口,代碼如下:

 1 protected void submitApplication(
 2             ApplicationSubmissionContext submissionContext, long submitTime,
 3             String user) throws YarnException {
 4         ApplicationId applicationId = submissionContext.getApplicationId();
 5 
 6         //1、創建RMApp,若具有相同的applicationId會拋出異常
 7         RMAppImpl application =
 8                 createAndPopulateNewRMApp(submissionContext, submitTime, user);
 9         ApplicationId appId = submissionContext.getApplicationId();
10 
11         //security模式有simple和kerberos,在配置文件中配置
12         //開始kerberos
13         if (UserGroupInformation.isSecurityEnabled()) {
14             //..................
15         } else {
16             //simple模式
17             // Dispatcher is not yet started at this time, so these START events
18             // enqueued should be guaranteed to be first processed when dispatcher
19             // gets started.
20             //2、向ResourceScheduler(可插拔的資源調度器)提交任務??????????
21             this.rmContext.getDispatcher().getEventHandler()
22                     .handle(new RMAppEvent(applicationId, RMAppEventType.START));
23         }
24     }

   3)Flink在Per-job模式下,AM container加載運行的入口是YarnJobClusterEntryPoint中的main()方法,源碼分析如下:

 1 public static void main(String[] args) {
 2         // startup checks and logging
 3         //1、輸出環境信息如用戶、環境變量、Java版本等,以及JVM參數
 4         EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
 5         //2、注冊處理各種SIGNAL的handler:記錄到日志
 6         SignalHandler.register(LOG);
 7         //3、注冊JVM關閉保障的shutdown hook:避免JVM退出時被其他shutdown hook阻塞
 8         JvmShutdownSafeguard.installAsShutdownHook(LOG);
 9 
10         Map<String, String> env = System.getenv();
11 
12         final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
13         Preconditions.checkArgument(
14                 workingDirectory != null,
15                 "Working directory variable (%s) not set",
16                 ApplicationConstants.Environment.PWD.key());
17 
18         try {
19             //4、輸出Yarn運行的用戶信息
20             YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
21         } catch (IOException e) {
22             LOG.warn("Could not log YARN environment information.", e);
23         }
24         //5、加載flink的配置
25         Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
26 
27         YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(
28                 configuration,
29                 workingDirectory);
30         //6、Entry  創建並啟動各類內部服務
31         ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
32     }

  4)后續的調用過程:ClusterEntrypoint類中runClusterEntrypoint()->startCluster()->runCluster(),該過程比較簡單,這里着實分析runCluster()方法,如下:

 1 //#ClusterEntrypint.java
 2     private void runCluster(Configuration configuration) throws Exception {
 3         synchronized (lock) {
 4             initializeServices(configuration);
 5 
 6             // write host information into configuration
 7             configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 8             configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 9             //1、創建dispatcherResour、esourceManager對象,其中有從本地重新創建JobGraph的過程
10             final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
11             //2、Entry 啟動RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等
12             clusterComponent = dispatcherResourceManagerComponentFactory.create(
13                     configuration,
14                     commonRpcService,
15                     haServices,
16                     blobServer,
17                     heartbeatServices,
18                     metricRegistry,
19                     archivedExecutionGraphStore,
20                     new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
21                     this);
22 
23             //............
24         }
25     }

   4)在create()方法中,會啟動Flink的諸多組件,其中與提交任務強相關的是Dispatcher、ResourceManager,具體代碼如下:

  1 public DispatcherResourceManagerComponent<T> create(
  2             Configuration configuration,
  3             RpcService rpcService,
  4             HighAvailabilityServices highAvailabilityServices,
  5             BlobServer blobServer,
  6             HeartbeatServices heartbeatServices,
  7             MetricRegistry metricRegistry,
  8             ArchivedExecutionGraphStore archivedExecutionGraphStore,
  9             MetricQueryServiceRetriever metricQueryServiceRetriever,
 10             FatalErrorHandler fatalErrorHandler) throws Exception {
 11 
 12         LeaderRetrievalService dispatcherLeaderRetrievalService = null;
 13         LeaderRetrievalService resourceManagerRetrievalService = null;
 14         WebMonitorEndpoint<U> webMonitorEndpoint = null;
 15         ResourceManager<?> resourceManager = null;
 16         JobManagerMetricGroup jobManagerMetricGroup = null;
 17         T dispatcher = null;
 18 
 19         try {
 20             dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 21 
 22             resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
 23 
 24             final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 25                     rpcService,
 26                     DispatcherGateway.class,
 27                     DispatcherId::fromUuid,
 28                     10,
 29                     Time.milliseconds(50L));
 30 
 31             final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 32                     rpcService,
 33                     ResourceManagerGateway.class,
 34                     ResourceManagerId::fromUuid,
 35                     10,
 36                     Time.milliseconds(50L));
 37 
 38             final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
 39                     configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
 40                     configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
 41                     "DispatcherRestEndpoint");
 42 
 43             final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
 44             final MetricFetcher metricFetcher = updateInterval == 0
 45                     ? VoidMetricFetcher.INSTANCE
 46                     : MetricFetcherImpl.fromConfiguration(
 47                     configuration,
 48                     metricQueryServiceRetriever,
 49                     dispatcherGatewayRetriever,
 50                     executor);
 51 
 52             webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
 53                     configuration,
 54                     dispatcherGatewayRetriever,
 55                     resourceManagerGatewayRetriever,
 56                     blobServer,
 57                     executor,
 58                     metricFetcher,
 59                     highAvailabilityServices.getWebMonitorLeaderElectionService(),
 60                     fatalErrorHandler);
 61 
 62             log.debug("Starting Dispatcher REST endpoint.");
 63             webMonitorEndpoint.start();
 64 
 65             final String hostname = getHostname(rpcService);
 66 
 67             jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
 68                     metricRegistry,
 69                     hostname,
 70                     ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
 71             //1、返回的是new YarnResourceManager
 72             /*調度過程:AbstractDispatcherResourceManagerComponentFactory
 73                         ->ActiveResourceManagerFactory
 74                         ->YarnResourceManagerFactory
 75              */
 76             ResourceManager<?> resourceManager1 = resourceManagerFactory.createResourceManager(
 77                     configuration,
 78                     ResourceID.generate(),
 79                     rpcService,
 80                     highAvailabilityServices,
 81                     heartbeatServices,
 82                     metricRegistry,
 83                     fatalErrorHandler,
 84                     new ClusterInformation(hostname, blobServer.getPort()),
 85                     webMonitorEndpoint.getRestBaseUrl(),
 86                     jobManagerMetricGroup);
 87             resourceManager = resourceManager1;
 88 
 89             final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
 90             //2、在此反序列化獲取JobGraph實例;返回new MiniDispatcher
 91             dispatcher = dispatcherFactory.createDispatcher(
 92                     configuration,
 93                     rpcService,
 94                     highAvailabilityServices,
 95                     resourceManagerGatewayRetriever,
 96                     blobServer,
 97                     heartbeatServices,
 98                     jobManagerMetricGroup,
 99                     metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
100                     archivedExecutionGraphStore,
101                     fatalErrorHandler,
102                     historyServerArchivist);
103 
104             log.debug("Starting ResourceManager.");
105             //啟動resourceManager,此過程中會經歷以下階段
106             //leader選舉->(ResourceManager.java中)
107             // ->grantLeadership(...)
108             // ->tryAcceptLeadership(...)
109             // ->slotManager的啟動
110             resourceManager.start();
111             resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
112 
113             log.debug("Starting Dispatcher.");
114 
115             //啟動Dispatcher,經歷以下階段:
116             //leader選舉->(Dispatcher.java中)grantLeadership->tryAcceptLeadershipAndRunJobs
117             // ->createJobManagerRunner->startJobManagerRunner->jobManagerRunner.start()
118             //
119             //->(JobManagerRunner.java中)start()->leaderElectionService.start(...)
120             //->grantLeadership(...)->verifyJobSchedulingStatusAndStartJobManager(...)
121             //->startJobMaster(leaderSessionId)這里的startJobmaster應該是啟動的JobManager
122             //
123             //->(JobManagerRunner.java中)jobMasterService.start(...)
124             //->(JobMaster.java)startJobExecution(...)
125             // ->{startJobMasterServices()在該方法中會啟動slotPool->resourceManagerLeaderRetriever.start(...)}
126             //->startJobExecution(...)->
127             dispatcher.start();
128             dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
129 
130             return createDispatcherResourceManagerComponent(
131                     dispatcher,
132                     resourceManager,
133                     dispatcherLeaderRetrievalService,
134                     resourceManagerRetrievalService,
135                     webMonitorEndpoint,
136                     jobManagerMetricGroup);
137 
138         } catch (Exception exception) {
139             // clean up all started components
140             //失敗會清除已啟動的組件
141             //..............
142         }
143     }

  5)此后,JobManager中的slotPool會向SlotManager申請資源,而SlotManager則向Yarn的ResourceManager申請,申請到后會啟動TaskManager,然后將slot信息注冊到slotManager和slotPool中,詳細過程在此就不展開分析了,留作后面分析。 

四、總結

  該博客中還有諸多不完善的地方,需要自己后進一步的閱讀源碼、弄清設計架構后等一系列之后才能有更好的完善,此外,后期也會對照着Flink 的Per-job模式下任務提交的詳細日志進一步驗證。

   若是文中有描述不清的,非常建議參考以下博文;若是存在不對的地方,非常歡迎大伙留言指出,謝謝了!

Ref

[1]https://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdf

[2]https://yq.aliyun.com/articles/719262?spm=a2c4e.11153940.0.0.3ea9469ei7H3Wx#

[3]https://www.jianshu.com/p/52da8b2e4ccc

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM