Checkpoint 源碼流程:
Flink CheckpointCoordinator 啟動流程
開局一張圖,其他全靠吹,來一張官網 Flink 集群解析圖:
官網地址:https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html#anatomy-of-a-flink-cluster
關於 CheckpointCoordinator,引用一段代碼的注釋:
```txt
The checkpoint coordinator coordinates the distributed snapshots of operators and state. It triggers the checkpoint by sending the messages to the relevant tasks and collects the checkpoint acknowledgements. It also collects and maintains the overview of the state handles reported by the tasks that acknowledge the checkpoint.
checkpoint 協調器協調 operators 和 state 的分布式快照。 它通過將消息發送到相關任務來觸發 checkpoint,並收集 checkpoint 確認。 它還收集並維護由確認 checkpoint 的任務報告的狀態句柄的概述。
```
在 CheckpointCoordinator 的構造方法處添加斷點,啟動任務可以看到如下調用棧
先在 Dispatcher 中 new 了一個 JobManagerRunnerImpl
JobManagerRunnerImpl 構造方法調用了
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService(); // Dispatcher 線程 return CompletableFuture.supplyAsync( () -> { try { // 下面是單獨的線程在執行,異步調用, 創建 JobManagerRunner return jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler); } catch (Exception e) { throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e)); } }, rpcService.getExecutor()); }
createJobMasterService 中 new 了個 JobMaster,JobMaster 的構造方法中調用了 createScheduler(jobManagerJobMetricGroup)
private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception { return schedulerNGFactory.createInstance( log, jobGraph, backPressureStatsTracker, scheduledExecutorService, jobMasterConfiguration.getConfiguration(), scheduler, scheduledExecutorService, userCodeLoader, highAvailabilityServices.getCheckpointRecoveryFactory(), rpcTimeout, blobWriter, jobManagerJobMetricGroup, jobMasterConfiguration.getSlotRequestTimeout(), shuffleMaster, partitionTracker); }
createScheduler 中調用了 schedulerNGFactory.createInstance 方法,實際上會調用到 DefaultSchedulerFactory.createInstance 方法上
DefaultSchedulerFactory.createInstance 方法調用了 new DefaultScheduler,在這個方法中還會使用 jobGraph 和 restartStrategy 生成 restartBackoffTimeStrategy
用於生成 DefaultScheduler。
DefaultScheduler 的構造方法中直接調用了父類的構造方法
super( log, jobGraph, backPressureStatsTracker, ioExecutor, jobMasterConfiguration, new ThrowingSlotProvider(), // this is not used any more in the new scheduler futureExecutor, userCodeLoader, checkpointRecoveryFactory, rpcTimeout, new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(), blobWriter, jobManagerJobMetricGroup, Time.seconds(0), // this is not used any more in the new scheduler shuffleMaster, partitionTracker, executionVertexVersioner, false);
DefaultScheduler 的構造方法中還將 restartBackoffTimeStrategy 生成了 ExecutionFailureHandler,(DefaultScheduler 中有 handleTaskFailure/handleGlobalFailure 目測是任務失敗的時候調用的 )
this.executionFailureHandler = new ExecutionFailureHandler( getSchedulingTopology(), failoverStrategy, restartBackoffTimeStrategy) @Override public void handleGlobalFailure(final Throwable error) { setGlobalFailureCause(error); log.info("Trying to recover from a global failure.", error); final FailureHandlingResult failureHandlingResult = executionFailureHandler.getGlobalFailureHandlingResult(error); maybeRestartTasks(failureHandlingResult); } private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) { if (failureHandlingResult.canRestart()) { restartTasksWithDelay(failureHandlingResult); } else { failJob(failureHandlingResult.getError()); } }
回到主流程 DefaultScheduler 調用父類 SchedulerBase 的構造方法
SchedulerBase 的構造方法中會調用 createAndRestoreExecutionGraph, createAndRestoreExecutionGraph 中就會生成 ExecutionGraph 了
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker)); private ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker) throws Exception { ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); if (checkpointCoordinator != null) { // check whether we find a valid checkpoint if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll( new HashSet<>(newExecutionGraph.getAllVertices().values()), false)) { // check whether we can restore from a savepoint tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); } } return newExecutionGraph; }
createAndRestoreExecutionGraph 方法中調用了 ExecutionGraphBuilder.buildGraph 生成 ExecutionGraph (到這里三層的抽象圖結構就都生成好了)
private ExecutionGraph createExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, final JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException { final FailoverStrategy.Factory failoverStrategy = legacyScheduling ? FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) : new NoOpFailoverStrategy.Factory(); return ExecutionGraphBuilder.buildGraph( null, jobGraph, jobMasterConfiguration, futureExecutor, ioExecutor, slotProvider, userCodeLoader, checkpointRecoveryFactory, rpcTimeout, restartStrategy, currentJobManagerJobMetricGroup, blobWriter, slotRequestTimeout, log, shuffleMaster, partitionTracker, failoverStrategy); }
在 buildGraph 方法中會生成 ExecutionGra
final ExecutionGraph executionGraph; try { executionGraph = (prior != null) ? prior : new ExecutionGraph( jobInformation, futureExecutor, ioExecutor, rpcTimeout, restartStrategy, maxPriorAttemptsHistoryLength, failoverStrategyFactory, slotProvider, classLoader, blobWriter, allocationTimeout, partitionReleaseStrategyFactory, shuffleMaster, partitionTracker, jobGraph.getScheduleMode()); } catch (IOException e) { throw new JobException("Could not create the ExecutionGraph.", e); }
同時,如果 checkpoint 配置不是 null ,就會調用 executionGraph.enableCheckpointing 方法
if (snapshotSettings != null) { executionGraph.enableCheckpointing( chkConfig, triggerVertices, ackVertices, confirmVertices, hooks, checkpointIdCounter, completedCheckpoints, rootBackend, checkpointStatsTracker) }
new 出 一個 CheckpointCoordinator
checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), chkConfig, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, operatorCoordinators, checkpointIDCounter, checkpointStore, checkpointStateBackend, ioExecutor, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager);
到這里就開始創建 CheckpointCoordinator 了
以上的調用棧,都是 JobManager 內 JobMaster 的內容,而 JobManager 包含:ResourceManager、Dispatcher、JobMaster 三個組件(以上調用棧最前面就是 Dispatcher )
簡化下內容就是這樣的了:
Dispatcher.createJobManagerRunner DefaultJobManagerRunnerFactory.createJobManagerRunner new JobManagerRunnerImpl JobManagerRunnerImpl.JobManagerRunnerImpl 構造方法 DefaultJobMasterServiceFactory.createJobMasterService new JobMaster JobMaster.JobMaster 構造方法 ----> createScheduler JobMaster.createScheduler ----> schedulerNGFactory.createInstance DefaultSchedulerFactory.createInstance ---------> FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration) DefaultScheduler.DefaultScheduler ---------> super() SchedulerBase.SchedulerBase 構造方法 ------> createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker)); SchedulerBase.createAndRestoreExecutionGraph -------> createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); SchedulerBase.createExecutionGraph ----> ExecutionGraphBuilder.buildGraph ExecutionGraph.buildGraph ----> executionGraph.enableCheckpointing ExecutionGraph.enableCheckpointing --------> new CheckpointCoordinator 至此,checkpointCoordinator 啟動
列下上游的調用棧,結合上一篇:Flink MiniCluster 啟動流程
從 MiniCluster.start() 開始:
PerJobMiniClusterFactor.submitJob ----> miniCluster.start(); miniCluster.submitJob(jobGraph) PerJobMiniClusterFactor.submitJob -----> 多線程調用 dispatcherGateway.submitJob(jobGraph, rpcTimeout)) (進到里面就是另一個線程了) Dispatcher.submitJob ----> internalSubmitJob Dispatcher.internalSubmitJob ------> waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) 調用 persistAndRunJob Dispatcher.persistAndRunJob ----> runJob Dispatcher.runJob -----> createJobManagerRunner Dispatcher.createJobManagerRunner --------> 多線程調用 jobManagerRunnerFactory.createJobManagerRunner 跟最開頭就接上了
這樣就都接上了
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文