一、前言
個人感覺學習Flink其實最不應該錯過的博文是Flink社區的博文系列,里面的文章是不會讓人失望的。強烈安利:https://ververica.cn/developers-resources/。
本文是自己第一次嘗試寫源碼閱讀的文章,會努力將原理和源碼實現流程結合起來。文中有幾個點目前也是沒有弄清楚,若是寫在一篇博客里,時間跨度太大,但又怕后期遺忘,所以先記下來,后期進一步閱讀源碼后再添上,若是看到不完整版博文的看官,對不住!
文中若是寫的不准確的地方歡迎留言指出。
源碼系列基於Flink 1.9
二、Per-job提交任務原理
Flink on Yarn模式下提交任務整體流程圖如下(圖源自Flink社區,鏈接見Ref [1])
圖1 Flink Runtime層架構圖
2.1. Runtime層架構簡介
Flink采取的是經典的master-salve模式,圖中的AM(ApplicationMater)為master,TaskManager是salve。
AM中的Dispatcher用於接收client提交的任務和啟動相應的JobManager ;JobManager用於任務的接收,task的分配、管理task manager等;ResourceManager主要用於資源的申請和分配。
這里有點需要注意:Flink本身也是具有ResourceManager和TaskManager的,這里雖然是on Yarn模式,但Flink本身也是擁有一套資源管理架構,雖然各個組件的名字一樣,但這里yarn只是一個資源的提供者,若是standalone模式,資源的提供者就是物理機或者虛擬機了。
2.2. Flink on Yarn 的Per-job模式提交任務的整體流程:
1)執行Flink程序,就類似client,主要是將代碼進行優化形成JobGraph,向yarn的ResourceManager中的ApplicationManager申請資源啟動AM(ApplicationMater),AM所在節點是Yarn上的NodeManager上;
2)當AM起來之后會啟動Dispatcher、ResourceManager,其中Dispatcher會啟動JobManager,ResourceManager會啟動slotManager用於slot的管理和分配;
3)JobManager向ResourceManager(RM)申請資源用於任務的執行,最初TaskManager還沒有啟動,此時,RM會向yarn去申請資源,獲得資源后,會在資源中啟動TaskManager,相應啟動的slot會向slotManager中注冊,然后slotManager會將slot分配給只需資源的task,即向JobManager注冊信息,然后JobManager就會將任務提交到對應的slot中執行。其實Flink on yarn的session模式和Per-job模式最大的區別是,提交任務時RM已向Yarn申請了固定大小的資源,其TaskManager是已經啟動的。
資源分配如詳細過程圖下:
圖2 slot管理圖,源自Ref[1]
更詳細的過程解析,強烈推薦Ref [2],是阿里Flink大牛寫的,本博客在后期的源碼分析過程也多依據此博客。
三、源碼簡析
提交任務語句
./flink run -m yarn-cluster ./flinkExample.jar
1、Client端提交任務階段分析
flink腳本的入口類是org.apache.flink.client.cli.CliFrontend。
1)在CliFronted類的main()方法中,會加載flnk以及一些全局的配置項之后,根據命令行參數run,調用run()->runProgram()->deployJobCluster(),具體的代碼如下:
private <T> void runProgram( CustomCommandLine<T> customCommandLine, CommandLine commandLine, RunOptions runOptions, PackagedProgram program) throws ProgramInvocationException, FlinkException { final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); try { final T clusterId = customCommandLine.getClusterId(commandLine); final ClusterClient<T> client; // directly deploy the job if the cluster is started in job mode and detached if (clusterId == null && runOptions.getDetachedMode()) { int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); //構建JobGraph final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
//將任務提交到yarn上 client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); ...................... } else{........}
2)提交任務會調用YarnClusterDescriptor 類中deployJobCluster()->AbstractYarnClusterDescriptor類中deployInteral(),該方法會一直阻塞直到ApplicationMaster/JobManager在yarn上部署成功,其中最關鍵的調用是對startAppMaster()方法的調用,代碼如下:
1 protected ClusterClient<ApplicationId> deployInternal( 2 ClusterSpecification clusterSpecification, 3 String applicationName, 4 String yarnClusterEntrypoint, 5 @Nullable JobGraph jobGraph, 6 boolean detached) throws Exception { 7 8 //1、驗證集群是否可以訪問 9 //2、若用戶組是否開啟安全認證 10 //3、檢查配置以及vcore是否滿足flink集群申請的需求 11 //4、指定的對列是否存在 12 //5、檢查內存是否滿足flink JobManager、NodeManager所需 13 //.................................... 14 15 //Entry 16 ApplicationReport report = startAppMaster( 17 flinkConfiguration, 18 applicationName, 19 yarnClusterEntrypoint, 20 jobGraph, 21 yarnClient, 22 yarnApplication, 23 validClusterSpecification); 24 25 //6、獲取flink集群端口、地址信息 26 //.......................................... 27 }
3)方法AbstractYarnClutserDescriptor.startAppMaster()主要是將配置文件和相關文件上傳至分布式存儲如HDFS,以及向Yarn上提交任務等,源碼分析如下:
1 public ApplicationReport startAppMaster( 2 Configuration configuration, 3 String applicationName, 4 String yarnClusterEntrypoint, 5 JobGraph jobGraph, 6 YarnClient yarnClient, 7 YarnClientApplication yarnApplication, 8 ClusterSpecification clusterSpecification) throws Exception { 9 10 // ....................... 11 12 //1、上傳conf目錄下logback.xml、log4j.properties 13 14 //2、上傳環境變量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar 15 addEnvironmentFoldersToShipFiles(systemShipFiles); 16 //........... 17 //3、設置applications的高可用的方案,通過設置AM重啟次數,默認為1 18 //4、上傳ship files、user jars、 19 //5、為TaskManager設置slots、heap memory 20 //6、上傳flink-conf.yaml 21 //7、序列化JobGraph后上傳 22 //8、登錄權限檢查 23 24 //................. 25 26 //獲得啟動AM container的Java命令 27 final ContainerLaunchContext amContainer = setupApplicationMasterContainer( 28 yarnClusterEntrypoint, 29 hasLogback, 30 hasLog4j, 31 hasKrb5, 32 clusterSpecification.getMasterMemoryMB()); 33 34 //9、為aAM啟動綁定環境參數以及classpath和環境變量 35 36 //.......................... 37 38 final String customApplicationName = customName != null ? customName : applicationName; 39 //10、應用名稱、應用類型、用戶提交的應用ContainerLaunchContext 40 appContext.setApplicationName(customApplicationName); 41 appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink"); 42 appContext.setAMContainerSpec(amContainer); 43 appContext.setResource(capability); 44 45 if (yarnQueue != null) { 46 appContext.setQueue(yarnQueue); 47 } 48 49 setApplicationNodeLabel(appContext); 50 51 setApplicationTags(appContext); 52 53 //11、部署失敗刪除yarnFilesDir 54 // add a hook to clean up in case deployment fails 55 Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir); 56 Runtime.getRuntime().addShutdownHook(deploymentFailureHook); 57 58 LOG.info("Submitting application master " + appId); 59 60 //Entry 61 yarnClient.submitApplication(appContext); 62 63 LOG.info("Waiting for the cluster to be allocated"); 64 final long startTime = System.currentTimeMillis(); 65 ApplicationReport report; 66 YarnApplicationState lastAppState = YarnApplicationState.NEW; 67 //12、阻塞等待直到running 68 loop: while (true) { 69 //................... 70 //每隔250ms通過YarnClient獲取應用報告 71 Thread.sleep(250); 72 } 73 //........................... 74 //13、部署成功刪除shutdown回調 75 // since deployment was successful, remove the hook 76 ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); 77 return report; 78 }
4)應用提交的Entry是YarnClientImpl.submitApplication(),該方法在於調用了ApplicationClientProtocolPBClientImpl.submitApplication(),其具體代碼如下:

1 public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException, IOException { 2 //取出報文 3 SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto(); 4 5 try { 6 //將報文發送發送到服務端,並將返回結果構成response 7 return new SubmitApplicationResponsePBImpl(this.proxy.submitApplication((RpcController)null, requestProto)); 8 } catch (ServiceException var4) { 9 RPCUtil.unwrapAndThrowException(var4); 10 return null; 11 } 12 }
報文就會通過RPC到達服務端,服務端處理報文的方法是ApplicationClientProtocolPBServiceImpl.submitApplication(),方法中會重新構建報文,然后通過ClientRMService.submitApplication()將應用請求提交到Yarn上的RMAppManager去提交任務(在Yarn的分配過后面會專門寫一系列的博客去說明)。
至此,client端的流程就走完了,應用請求已提交到Yarn的ResourceManager上了,下面着重分析Flink Cluster啟動流程。
2、Flink Cluster啟動流程分析
1)在ClientRMService類的submitApplication()方法中,會先檢查任務是否已經提交(通過applicationID)、Yarn的queue是否為空等,然后將請求提交到RMAppManager(ARN RM內部管理應用生命周期的組件),若提交成功會輸出Application with id {applicationId.getId()} submitted by user {user}的信息,具體分析如下:
1 public SubmitApplicationResponse submitApplication( 2 SubmitApplicationRequest request) throws YarnException { 3 ApplicationSubmissionContext submissionContext = request 4 .getApplicationSubmissionContext(); 5 ApplicationId applicationId = submissionContext.getApplicationId(); 6 7 // ApplicationSubmissionContext needs to be validated for safety - only 8 // those fields that are independent of the RM's configuration will be 9 // checked here, those that are dependent on RM configuration are validated 10 // in RMAppManager. 11 //這里僅驗證不屬於RM的配置,屬於RM的配置將在RMAppManager驗證 12 13 //1、檢查application是否已提交 14 //2、檢查提交的queue是否為null,是,則設置為默認queue(default) 15 //3、檢查是否設置application名,否,則為默認(N/A) 16 //4、檢查是否設置application類型,否,則為默認(YARN);是,若名字長度大於給定的長度(20),則會截斷 17 //............................. 18 19 try { 20 // call RMAppManager to submit application directly 21 //直接submit任務 22 rmAppManager.submitApplication(submissionContext, 23 System.currentTimeMillis(), user); 24 25 //submit成功 26 LOG.info("Application with id " + applicationId.getId() + 27 " submitted by user " + user); 28 RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, 29 "ClientRMService", applicationId); 30 } catch (YarnException e) { 31 //失敗會拋出異常 32 } 33 //.................. 34 }
2)RMAppManager類的submitApplication()方法主要是創建RMApp和向ResourceScheduler申請AM container,該部分直到在NodeManager上啟動AM container都是Yarn本身所為,其中具體過程在這里不詳細分析,詳細過程后期會分析,這里僅給出入口,代碼如下:
1 protected void submitApplication( 2 ApplicationSubmissionContext submissionContext, long submitTime, 3 String user) throws YarnException { 4 ApplicationId applicationId = submissionContext.getApplicationId(); 5 6 //1、創建RMApp,若具有相同的applicationId會拋出異常 7 RMAppImpl application = 8 createAndPopulateNewRMApp(submissionContext, submitTime, user); 9 ApplicationId appId = submissionContext.getApplicationId(); 10 11 //security模式有simple和kerberos,在配置文件中配置 12 //開始kerberos 13 if (UserGroupInformation.isSecurityEnabled()) { 14 //.................. 15 } else { 16 //simple模式 17 // Dispatcher is not yet started at this time, so these START events 18 // enqueued should be guaranteed to be first processed when dispatcher 19 // gets started. 20 //2、向ResourceScheduler(可插拔的資源調度器)提交任務?????????? 21 this.rmContext.getDispatcher().getEventHandler() 22 .handle(new RMAppEvent(applicationId, RMAppEventType.START)); 23 } 24 }
3)Flink在Per-job模式下,AM container加載運行的入口是YarnJobClusterEntryPoint中的main()方法,源碼分析如下:
1 public static void main(String[] args) { 2 // startup checks and logging 3 //1、輸出環境信息如用戶、環境變量、Java版本等,以及JVM參數 4 EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); 5 //2、注冊處理各種SIGNAL的handler:記錄到日志 6 SignalHandler.register(LOG); 7 //3、注冊JVM關閉保障的shutdown hook:避免JVM退出時被其他shutdown hook阻塞 8 JvmShutdownSafeguard.installAsShutdownHook(LOG); 9 10 Map<String, String> env = System.getenv(); 11 12 final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); 13 Preconditions.checkArgument( 14 workingDirectory != null, 15 "Working directory variable (%s) not set", 16 ApplicationConstants.Environment.PWD.key()); 17 18 try { 19 //4、輸出Yarn運行的用戶信息 20 YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); 21 } catch (IOException e) { 22 LOG.warn("Could not log YARN environment information.", e); 23 } 24 //5、加載flink的配置 25 Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG); 26 27 YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint( 28 configuration, 29 workingDirectory); 30 //6、Entry 創建並啟動各類內部服務 31 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); 32 }
4)后續的調用過程:ClusterEntrypoint類中runClusterEntrypoint()->startCluster()->runCluster(),該過程比較簡單,這里着實分析runCluster()方法,如下:
1 //#ClusterEntrypint.java 2 private void runCluster(Configuration configuration) throws Exception { 3 synchronized (lock) { 4 initializeServices(configuration); 5 6 // write host information into configuration 7 configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); 8 configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); 9 //1、創建dispatcherResour、esourceManager對象,其中有從本地重新創建JobGraph的過程 10 final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); 11 //2、Entry 啟動RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等 12 clusterComponent = dispatcherResourceManagerComponentFactory.create( 13 configuration, 14 commonRpcService, 15 haServices, 16 blobServer, 17 heartbeatServices, 18 metricRegistry, 19 archivedExecutionGraphStore, 20 new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 21 this); 22 23 //............ 24 } 25 }
4)在create()方法中,會啟動Flink的諸多組件,其中與提交任務強相關的是Dispatcher、ResourceManager,具體代碼如下:
1 public DispatcherResourceManagerComponent<T> create( 2 Configuration configuration, 3 RpcService rpcService, 4 HighAvailabilityServices highAvailabilityServices, 5 BlobServer blobServer, 6 HeartbeatServices heartbeatServices, 7 MetricRegistry metricRegistry, 8 ArchivedExecutionGraphStore archivedExecutionGraphStore, 9 MetricQueryServiceRetriever metricQueryServiceRetriever, 10 FatalErrorHandler fatalErrorHandler) throws Exception { 11 12 LeaderRetrievalService dispatcherLeaderRetrievalService = null; 13 LeaderRetrievalService resourceManagerRetrievalService = null; 14 WebMonitorEndpoint<U> webMonitorEndpoint = null; 15 ResourceManager<?> resourceManager = null; 16 JobManagerMetricGroup jobManagerMetricGroup = null; 17 T dispatcher = null; 18 19 try { 20 dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); 21 22 resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); 23 24 final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( 25 rpcService, 26 DispatcherGateway.class, 27 DispatcherId::fromUuid, 28 10, 29 Time.milliseconds(50L)); 30 31 final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( 32 rpcService, 33 ResourceManagerGateway.class, 34 ResourceManagerId::fromUuid, 35 10, 36 Time.milliseconds(50L)); 37 38 final ExecutorService executor = WebMonitorEndpoint.createExecutorService( 39 configuration.getInteger(RestOptions.SERVER_NUM_THREADS), 40 configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), 41 "DispatcherRestEndpoint"); 42 43 final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); 44 final MetricFetcher metricFetcher = updateInterval == 0 45 ? VoidMetricFetcher.INSTANCE 46 : MetricFetcherImpl.fromConfiguration( 47 configuration, 48 metricQueryServiceRetriever, 49 dispatcherGatewayRetriever, 50 executor); 51 52 webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 53 configuration, 54 dispatcherGatewayRetriever, 55 resourceManagerGatewayRetriever, 56 blobServer, 57 executor, 58 metricFetcher, 59 highAvailabilityServices.getWebMonitorLeaderElectionService(), 60 fatalErrorHandler); 61 62 log.debug("Starting Dispatcher REST endpoint."); 63 webMonitorEndpoint.start(); 64 65 final String hostname = getHostname(rpcService); 66 67 jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( 68 metricRegistry, 69 hostname, 70 ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); 71 //1、返回的是new YarnResourceManager 72 /*調度過程:AbstractDispatcherResourceManagerComponentFactory 73 ->ActiveResourceManagerFactory 74 ->YarnResourceManagerFactory 75 */ 76 ResourceManager<?> resourceManager1 = resourceManagerFactory.createResourceManager( 77 configuration, 78 ResourceID.generate(), 79 rpcService, 80 highAvailabilityServices, 81 heartbeatServices, 82 metricRegistry, 83 fatalErrorHandler, 84 new ClusterInformation(hostname, blobServer.getPort()), 85 webMonitorEndpoint.getRestBaseUrl(), 86 jobManagerMetricGroup); 87 resourceManager = resourceManager1; 88 89 final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); 90 //2、在此反序列化獲取JobGraph實例;返回new MiniDispatcher 91 dispatcher = dispatcherFactory.createDispatcher( 92 configuration, 93 rpcService, 94 highAvailabilityServices, 95 resourceManagerGatewayRetriever, 96 blobServer, 97 heartbeatServices, 98 jobManagerMetricGroup, 99 metricRegistry.getMetricQueryServiceGatewayRpcAddress(), 100 archivedExecutionGraphStore, 101 fatalErrorHandler, 102 historyServerArchivist); 103 104 log.debug("Starting ResourceManager."); 105 //啟動resourceManager,此過程中會經歷以下階段 106 //leader選舉->(ResourceManager.java中) 107 // ->grantLeadership(...) 108 // ->tryAcceptLeadership(...) 109 // ->slotManager的啟動 110 resourceManager.start(); 111 resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); 112 113 log.debug("Starting Dispatcher."); 114 115 //啟動Dispatcher,經歷以下階段: 116 //leader選舉->(Dispatcher.java中)grantLeadership->tryAcceptLeadershipAndRunJobs 117 // ->createJobManagerRunner->startJobManagerRunner->jobManagerRunner.start() 118 // 119 //->(JobManagerRunner.java中)start()->leaderElectionService.start(...) 120 //->grantLeadership(...)->verifyJobSchedulingStatusAndStartJobManager(...) 121 //->startJobMaster(leaderSessionId)這里的startJobmaster應該是啟動的JobManager 122 // 123 //->(JobManagerRunner.java中)jobMasterService.start(...) 124 //->(JobMaster.java)startJobExecution(...) 125 // ->{startJobMasterServices()在該方法中會啟動slotPool->resourceManagerLeaderRetriever.start(...)} 126 //->startJobExecution(...)-> 127 dispatcher.start(); 128 dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); 129 130 return createDispatcherResourceManagerComponent( 131 dispatcher, 132 resourceManager, 133 dispatcherLeaderRetrievalService, 134 resourceManagerRetrievalService, 135 webMonitorEndpoint, 136 jobManagerMetricGroup); 137 138 } catch (Exception exception) { 139 // clean up all started components 140 //失敗會清除已啟動的組件 141 //.............. 142 } 143 }
5)此后,JobManager中的slotPool會向SlotManager申請資源,而SlotManager則向Yarn的ResourceManager申請,申請到后會啟動TaskManager,然后將slot信息注冊到slotManager和slotPool中,詳細過程在此就不展開分析了,留作后面分析。
四、總結
該博客中還有諸多不完善的地方,需要自己后進一步的閱讀源碼、弄清設計架構后等一系列之后才能有更好的完善,此外,后期也會對照着Flink 的Per-job模式下任務提交的詳細日志進一步驗證。
若是文中有描述不清的,非常建議參考以下博文;若是存在不對的地方,非常歡迎大伙留言指出,謝謝了!
Ref
[1]https://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdf
[2]https://yq.aliyun.com/articles/719262?spm=a2c4e.11153940.0.0.3ea9469ei7H3Wx#
[3]https://www.jianshu.com/p/52da8b2e4ccc