Flink CheckpointCoordinator 啟動流程


Checkpoint 源碼流程:

  Flink MiniCluster 啟動流程 

  Flink CheckpointCoordinator 啟動流程  

  Flink Checkpoint 流程 

 

開局一張圖,其他全靠吹,來一張官網 Flink 集群解析圖:

 

 

官網地址:https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html#anatomy-of-a-flink-cluster

關於 CheckpointCoordinator,引用一段代碼的注釋:
```txt
The checkpoint coordinator coordinates the distributed snapshots of operators and state. It triggers the checkpoint by sending the messages to the relevant tasks and collects the checkpoint acknowledgements. It also collects and maintains the overview of the state handles reported by the tasks that acknowledge the checkpoint.

checkpoint 協調器協調 operators 和 state 的分布式快照。 它通過將消息發送到相關任務來觸發 checkpoint,並收集 checkpoint 確認。 它還收集並維護由確認 checkpoint 的任務報告的狀態句柄的概述。
```

在 CheckpointCoordinator 的構造方法處添加斷點,啟動任務可以看到如下調用棧

 

 先在 Dispatcher 中 new 了一個 JobManagerRunnerImpl

JobManagerRunnerImpl 構造方法調用了

private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
    final RpcService rpcService = getRpcService();
    // Dispatcher 線程
    return CompletableFuture.supplyAsync(
        () -> {
            try {
                // 下面是單獨的線程在執行,異步調用, 創建 JobManagerRunner
                return jobManagerRunnerFactory.createJobManagerRunner(
                    jobGraph,
                    configuration,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    jobManagerSharedServices,
                    new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                    fatalErrorHandler);
            } catch (Exception e) {
                throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
            }
        },
        rpcService.getExecutor());
}

createJobMasterService 中 new 了個 JobMaster,JobMaster 的構造方法中調用了 createScheduler(jobManagerJobMetricGroup)

private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
    return schedulerNGFactory.createInstance(
        log,
        jobGraph,
        backPressureStatsTracker,
        scheduledExecutorService,
        jobMasterConfiguration.getConfiguration(),
        scheduler,
        scheduledExecutorService,
        userCodeLoader,
        highAvailabilityServices.getCheckpointRecoveryFactory(),
        rpcTimeout,
        blobWriter,
        jobManagerJobMetricGroup,
        jobMasterConfiguration.getSlotRequestTimeout(),
        shuffleMaster,
        partitionTracker);
}

createScheduler 中調用了 schedulerNGFactory.createInstance 方法,實際上會調用到 DefaultSchedulerFactory.createInstance 方法上

DefaultSchedulerFactory.createInstance 方法調用了 new DefaultScheduler,在這個方法中還會使用 jobGraph 和 restartStrategy 生成 restartBackoffTimeStrategy
用於生成 DefaultScheduler。

DefaultScheduler 的構造方法中直接調用了父類的構造方法

super(
    log,
    jobGraph,
    backPressureStatsTracker,
    ioExecutor,
    jobMasterConfiguration,
    new ThrowingSlotProvider(), // this is not used any more in the new scheduler
    futureExecutor,
    userCodeLoader,
    checkpointRecoveryFactory,
    rpcTimeout,
    new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
    blobWriter,
    jobManagerJobMetricGroup,
    Time.seconds(0), // this is not used any more in the new scheduler
    shuffleMaster,
    partitionTracker,
    executionVertexVersioner,
    false);

DefaultScheduler 的構造方法中還將 restartBackoffTimeStrategy 生成了 ExecutionFailureHandler,(DefaultScheduler 中有 handleTaskFailure/handleGlobalFailure 目測是任務失敗的時候調用的 )

this.executionFailureHandler = new ExecutionFailureHandler(
        getSchedulingTopology(),
        failoverStrategy,
        restartBackoffTimeStrategy)

@Override
public void handleGlobalFailure(final Throwable error) {
    setGlobalFailureCause(error);

    log.info("Trying to recover from a global failure.", error);
    final FailureHandlingResult failureHandlingResult = executionFailureHandler.getGlobalFailureHandlingResult(error);
    maybeRestartTasks(failureHandlingResult);
}

private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
    if (failureHandlingResult.canRestart()) {
        restartTasksWithDelay(failureHandlingResult);
    } else {
        failJob(failureHandlingResult.getError());
    }
}

回到主流程 DefaultScheduler 調用父類 SchedulerBase 的構造方法

SchedulerBase 的構造方法中會調用 createAndRestoreExecutionGraph, createAndRestoreExecutionGraph 中就會生成 ExecutionGraph 了

this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));


private ExecutionGraph createAndRestoreExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker) throws Exception {

    ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);

    final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

    if (checkpointCoordinator != null) {
        // check whether we find a valid checkpoint
        if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
            new HashSet<>(newExecutionGraph.getAllVertices().values()),
            false)) {

            // check whether we can restore from a savepoint
            tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
    }

    return newExecutionGraph;
}

createAndRestoreExecutionGraph 方法中調用了 ExecutionGraphBuilder.buildGraph 生成 ExecutionGraph (到這里三層的抽象圖結構就都生成好了)

private ExecutionGraph createExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    ShuffleMaster<?> shuffleMaster,
    final JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException {

    final FailoverStrategy.Factory failoverStrategy = legacyScheduling ?
        FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) :
        new NoOpFailoverStrategy.Factory();

    return ExecutionGraphBuilder.buildGraph(
        null,
        jobGraph,
        jobMasterConfiguration,
        futureExecutor,
        ioExecutor,
        slotProvider,
        userCodeLoader,
        checkpointRecoveryFactory,
        rpcTimeout,
        restartStrategy,
        currentJobManagerJobMetricGroup,
        blobWriter,
        slotRequestTimeout,
        log,
        shuffleMaster,
        partitionTracker,
        failoverStrategy);
}

在 buildGraph 方法中會生成 ExecutionGra

final ExecutionGraph executionGraph;
try {
    executionGraph = (prior != null) ? prior :
        new ExecutionGraph(
            jobInformation,
            futureExecutor,
            ioExecutor,
            rpcTimeout,
            restartStrategy,
            maxPriorAttemptsHistoryLength,
            failoverStrategyFactory,
            slotProvider,
            classLoader,
            blobWriter,
            allocationTimeout,
            partitionReleaseStrategyFactory,
            shuffleMaster,
            partitionTracker,
            jobGraph.getScheduleMode());
} catch (IOException e) {
    throw new JobException("Could not create the ExecutionGraph.", e);
}

同時,如果 checkpoint 配置不是 null ,就會調用 executionGraph.enableCheckpointing 方法

if (snapshotSettings != null) {

        executionGraph.enableCheckpointing(
                chkConfig,
                triggerVertices,
                ackVertices,
                confirmVertices,
                hooks,
                checkpointIdCounter,
                completedCheckpoints,
                rootBackend,
                checkpointStatsTracker)
}

new 出 一個 CheckpointCoordinator

checkpointCoordinator = new CheckpointCoordinator(
            jobInformation.getJobId(),
            chkConfig,
            tasksToTrigger,
            tasksToWaitFor,
            tasksToCommitTo,
            operatorCoordinators,
            checkpointIDCounter,
            checkpointStore,
            checkpointStateBackend,
            ioExecutor,
            new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
            SharedStateRegistry.DEFAULT_FACTORY,
            failureManager);

到這里就開始創建 CheckpointCoordinator 了

以上的調用棧,都是 JobManager 內 JobMaster 的內容,而 JobManager 包含:ResourceManager、Dispatcher、JobMaster 三個組件(以上調用棧最前面就是 Dispatcher )

 

簡化下內容就是這樣的了:

Dispatcher.createJobManagerRunner

DefaultJobManagerRunnerFactory.createJobManagerRunner   new JobManagerRunnerImpl

JobManagerRunnerImpl.JobManagerRunnerImpl  構造方法

DefaultJobMasterServiceFactory.createJobMasterService   new JobMaster

JobMaster.JobMaster   構造方法      ---->   createScheduler

JobMaster.createScheduler   ---->          schedulerNGFactory.createInstance

DefaultSchedulerFactory.createInstance        --------->  FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration)

DefaultScheduler.DefaultScheduler           --------->  super()

SchedulerBase.SchedulerBase   構造方法  ------>   createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));

SchedulerBase.createAndRestoreExecutionGraph      ------->   createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);

SchedulerBase.createExecutionGraph  ---->     ExecutionGraphBuilder.buildGraph

ExecutionGraph.buildGraph   ---->   executionGraph.enableCheckpointing

ExecutionGraph.enableCheckpointing    -------->    new CheckpointCoordinator   至此,checkpointCoordinator 啟動

 

列下上游的調用棧,結合上一篇:Flink MiniCluster 啟動流程 

從 MiniCluster.start() 開始:

PerJobMiniClusterFactor.submitJob  ----> miniCluster.start();   miniCluster.submitJob(jobGraph)

PerJobMiniClusterFactor.submitJob  ----->  多線程調用  dispatcherGateway.submitJob(jobGraph, rpcTimeout))  (進到里面就是另一個線程了)

Dispatcher.submitJob  ----> internalSubmitJob

Dispatcher.internalSubmitJob  ------>   waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)   調用 persistAndRunJob 

Dispatcher.persistAndRunJob  ----> runJob 

Dispatcher.runJob  -----> createJobManagerRunner

Dispatcher.createJobManagerRunner  --------> 多線程調用  jobManagerRunnerFactory.createJobManagerRunner 跟最開頭就接上了

這樣就都接上了

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM