Flink MiniCluster 啟動流程


Checkpoint 源碼流程:

  Flink MiniCluster 啟動流程 

  Flink CheckpointCoordinator 啟動流程  

  Flink Checkpoint 流程 

最近一段時間,在看 Flink Checkpoint 相關的源碼,從 CheckpointCoordinator 創建開始,Debug 出了 MiniCluster 的啟動流程、創建 JobMaster/TaskManager、創建CheckpointCoordinator、觸發 Checkpoint 等的流程,先描述下 MiniCluster 的啟動流程,后續會把 Checkpoint的流程也做個分享

-------------------

Flink 任務執行有三種模式:

* yarn cluster
* yarn session
* Application

現在要說的就是 Application 模式,也稱為: 本地模式,即在 IDEA 中直接啟動一個 Flink 任務,就是使用 Application 模式(以下簡稱“本地模式”)

以下內容來自官網翻譯

```
在上述所有模式下,應用程序的main()方法都是在客戶端執行的。 此過程包括在本地下載應用程序的依賴項,
執行main()提取Flink運行時可以理解的應用程序表示形式(即JobGraph),並將依賴項和JobGraph發送到
集群中。 這使客戶端成為大量的資源消耗者,因為它可能需要大量的網絡帶寬來下載依賴項並將二進制文件運送
到群集,並且需要CPU周期來執行main()。 在用戶之間共享客戶端時,此問題可能會更加明顯。

基於此觀察,“應用程序模式”將為每個提交的應用程序創建一個群集,但是這次,該應用程序的main()
方法在JobManager上執行。 每個應用程序創建集群可以看作是創建僅在特定應用程序的作業之間共享的
會話集群,並且在應用程序完成時被刪除。 通過這種體系結構,應用程序模式可以提供與逐作業模式相
同的資源隔離和負載平衡保證,但要保證整個應用程序的粒度。 在JobManager上執行main()可以節
省所需的CPU周期,但也可以節省本地下載依賴項所需的帶寬。 此外,由於每個應用程序只有一個
JobManager,因此它可以更均勻地分散下載群集中應用程序相關性的網絡負載。

與 Per-Job 模式相比,應用程序模式允許提交由多個作業組成的應用程序。 作業執行的順序不受部署
模式的影響,但受啟動作業的調用的影響。 使用被阻塞的execute()可以建立一個順序,這將導致
“下一個”作業的執行被推遲到“該”作業完成為止。 使用無阻塞的executeAsync()將導致“下一個”
作業在“此”作業完成之前開始。

```

官網地址:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/#deployment-modes

官網缺乏的描述是,Application 模式定義為本地執行的模式,用於開發和調試。

1. 代碼調用 execute


本地模式是從用戶代碼的 StreamExecutionEnvironment.execute 開始的

execute 有 4 個重載的方法:

def execute()
def execute(jobName: String)
def executeAsync()
def executeAsync(jobName: String)

分為兩組同步和異步,同步和異步的區別官網也有描述,主要是在同時啟動多個任務時任務執行時間的問題

同步 execute 調用 javaEnv.execute 源碼如下:

def execute(jobName: String) = javaEnv.execute(jobName)

2. javaEnv.execute 生成 SreamGraph

這一步中會生成任務的 StreamGraph,Flink 有三層抽象的圖結構(物理執行圖是實際的執行的結構了):

StreamGraph ---> JobGraph ---> ExecutionGraph ---> 物理執行圖

三層圖結構擴展閱讀: https://www.cnblogs.com/bethunebtj/p/9168274.html#21-flink%E7%9A%84%E4%B8%89%E5%B1%82%E5%9B%BE%E7%BB%93%E6%9E%84

javaEnv.execute 源碼如下:

/**
 * Triggers the program execution. The environment will execute all parts of
 * the program that have resulted in a "sink" operation. Sink operations are
 * for example printing results or forwarding them to a message queue.
 *
 * <p>The program execution will be logged and displayed with the provided name
 *
 * @param jobName
 *         Desired name of the job
 * @return The result of the job execution, containing elapsed time and accumulators.
 * @throws Exception which occurs during job execution.
 */
public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
    // 生成 StreamGraph后,作為參數 繼續 execute
    return execute(getStreamGraph(jobName));
}

相關代碼如下:
public StreamGraph getStreamGraph(String jobName) {
    // clearTransformations: true 執行算子后,清理env 里面的 算子 clearTransformations 
    return getStreamGraph(jobName, true);
}

public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
    // generate 是真正生成 streamGraph 的地方
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
    if (clearTransformations) {
        this.transformations.clear();
    }
    return streamGraph;
}

// 
private StreamGraphGenerator getStreamGraphGenerator() {
    if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
    }
    // 創建 StreamGraphGenerator
    return new StreamGraphGenerator(transformations, config, checkpointCfg)
        .setStateBackend(defaultStateBackend)
        .setChaining(isChainingEnabled)
        .setUserArtifacts(cacheFile)
        .setTimeCharacteristic(timeCharacteristic)
        .setDefaultBufferTimeout(bufferTimeout);
}

使用生成的 StreamGraph 調用 LocalStreamEnvironment 的 execute 方法

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    return super.execute(streamGraph);
}

注: 算子的 transformations 是在 addOperator 方法中添加,source 在 addSource 方法中單獨添加

又回到 StreamExecutionEnvironment.execute

@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    // 調用異步執行
    final JobClient jobClient = executeAsync(streamGraph);

    try {
        final JobExecutionResult jobExecutionResult;

        if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
            jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
        } else {
            jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
        }

        jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

        return jobExecutionResult;
    } catch (Throwable t) {
        jobListeners.forEach(jobListener -> {
            jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
        });
        ExceptionUtils.rethrowException(t);

        // never reached, only make javac happy
        return null;
    }
}

executeAsync

@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    checkNotNull(streamGraph, "StreamGraph cannot be null.");
    checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

    final PipelineExecutorFactory executorFactory =
        executorServiceLoader.getExecutorFactory(configuration);

    checkNotNull(
        executorFactory,
        "Cannot find compatible factory for specified execution.target (=%s)",
        configuration.get(DeploymentOptions.TARGET));

    CompletableFuture<JobClient> jobClientFuture = executorFactory
        .getExecutor(configuration)
        // 調用 LocalExecutor.execute
        .execute(streamGraph, configuration);

    try {
        JobClient jobClient = jobClientFuture.get();
        jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
        return jobClient;
    } catch (ExecutionException executionException) {
        final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
        jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));

        throw new FlinkException(
            String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
            strippedException);
    }
}

3、 生成 JobGraph

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
    checkNotNull(pipeline);
    checkNotNull(configuration);

    Configuration effectiveConfig = new Configuration();
    effectiveConfig.addAll(this.configuration);
    effectiveConfig.addAll(configuration);

    // we only support attached execution with the local executor.
    checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
    // 生成 JobGraph
    final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
    // 提交任務
    return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
}

// 生成 JobGraph
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
    // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
    // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
    // for now.
    if (pipeline instanceof Plan) {
        Plan plan = (Plan) pipeline;
        final int slotsPerTaskManager = configuration.getInteger(
                TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
        final int numTaskManagers = configuration.getInteger(
                ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

        plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
    }

    return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
}

4、 啟動 MiniCluster

public CompletableFuture<JobClient> submitJob(JobGraph jobGraph) throws Exception {
    MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
    MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
    // 啟動 miniCluster, 啟動一大堆服務: Rpc、Metrics、  啟動 TaskManager 等
    miniCluster.start();

    return miniCluster
        // 提交任務
        .submitJob(jobGraph)
        .thenApply(result -> new PerJobMiniClusterJobClient(result.getJobID(), miniCluster))
        .whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                // We failed to create the JobClient and must shutdown to ensure cleanup.
                shutDownCluster(miniCluster);
            }
        })
        .thenApply(Function.identity());
}

至此本地的 MiniCluster 啟動完成,任務也提交執行了

 

調用棧如下:

```java
KafkaToKafka.execute

StreamExecutionEnvironment.execute

StreamExecutionEnvironment.execute

LocalStreamEnvironment.execute

StreamExecutionEnvironment.execute

StreamExecutionEnvironment.executeAsync

LocalExecutor.execute
Pipeline 生成 jobGraph

PerJobMiniClusterFactory.submitJob

MiniCluster.submitJob
調用 miniCluster.start(); 有啟動 startTaskManagers
至此,供本地運行的 MiniCluster 啟動


```

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM