Flink源碼剖析:Jar包任務提交流程


 

Flink基於用戶程序生成JobGraph,提交到集群進行分布式部署運行。本篇從源碼角度講解一下Flink Jar包是如何被提交到集群的。(本文源碼基於Flink 1.11.3)

1 Flink run 提交Jar包流程分析

首先分析run腳本可以找到入口類CliFrontend,這個類在main方法中解析參數,基於第二個參數定位到run方法:

try {
    // do action
    switch (action) {
        case ACTION_RUN:
            run(params);
            return 0;
        case ACTION_RUN_APPLICATION:
            runApplication(params);
            return 0;
        case ACTION_LIST:
            list(params);
            return 0;
        case ACTION_INFO:
            info(params);
            return 0;
        case ACTION_CANCEL:
            cancel(params);
            return 0;
        case ACTION_STOP:
            stop(params);
            return 0;
        case ACTION_SAVEPOINT:
            savepoint(params);
            return 0;
        case "-h":
        case "--help":
            ...
            return 0;
        case "-v":
        case "--version":
            ...
        default:
            ...
            return 1;
    }
}

在run方法中,根據classpath、用戶指定的jar、main函數等信息創建PackagedProgram。在Flink中通過Jar方式提交的任務都封裝成了PackagedProgram對象。

protected void run(String[] args) throws Exception {
    ...
    final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    final PackagedProgram program = getPackagedProgram(programOptions);

    // 把用戶的jar配置到config里面
    final List<URL> jobJars = program.getJobJarAndDependencies();
    final Configuration effectiveConfiguration = getEffectiveConfiguration(
            activeCommandLine, commandLine, programOptions, jobJars);

    try {
        executeProgram(effectiveConfiguration, program);
    } finally {
        program.deleteExtractedLibraries();
    }
}

創建PackagedProgram后,有個非常關鍵的步驟就是這個effectiveConfig,這里面會把相關的Jar都放入pipeline.jars這個屬性里,后面pipeline提交作業時,這些jar也會一起提交到集群。

其中比較關鍵的是Flink的類加載機制,為了避免用戶自己的jar內與其他用戶沖突,采用了逆轉類加載順序的機制。

private PackagedProgram(
        @Nullable File jarFile,
        List<URL> classpaths,
        @Nullable String entryPointClassName,
        Configuration configuration,
        SavepointRestoreSettings savepointRestoreSettings,
        String... args) throws ProgramInvocationException {

    // 依賴的資源
    this.classpaths = checkNotNull(classpaths);

    // 保存點配置
    this.savepointSettings = checkNotNull(savepointRestoreSettings);

    // 參數配置
    this.args = checkNotNull(args);

    // 用戶jar
    this.jarFile = loadJarFile(jarFile);

    // 自定義類加載
    this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
        getJobJarAndDependencies(),
        classpaths,
        getClass().getClassLoader(),
        configuration);

    // 加載main函數
    this.mainClass = loadMainClass(
        entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
        userCodeClassLoader);
}

在類加載器工具類中根據參數classloader.resolve-order決定是父類優先還是子類優先,默認是使用子類優先模式。

executeProgram方法內部是啟動任務的核心,在完成一系列的環境初始化后(主要是類加載以及一些輸出信息),會調用packagedProgram的invokeInteractiveModeForExecution的,在這個方法里通過反射調用用戶的main方法。

private static void callMainMethod(Class<?> entryClass, String[] args) 
    throws ProgramInvocationException {
    ...
    Method mainMethod = entryClass.getMethod("main", String[].class);
    mainMethod.invoke(null, (Object) args);
    ...
}

執行用戶的main方法后,就是flink的標准流程了。創建env、構建StreamDAG、生成Pipeline、提交到集群、阻塞運行。當main程序執行完畢,整個run腳本程序也就退出了。

總結來說,Flink提交Jar任務的流程是:
1 腳本入口程序根據參數決定做什么操作
2 創建PackagedProgram,准備相關jar和類加載器
3 通過反射調用用戶Main方法
4 構建Pipeline,提交到集群

2 通過PackagedProgram獲取Pipeline

有的時候不想通過阻塞的方式卡任務執行狀態,需要通過類似JobClient的客戶端異步查詢程序狀態,並提供停止退出的能力。

要了解這個流程,首先要了解Pipeline是什么。用戶編寫的Flink程序,無論是DataStream API還是SQL,最終編譯出的都是Pipeline。只是DataStream API編譯出的是StreamGraph,而SQL編譯出的Plan。Pipeline會在env.execute()中進行編譯並提交到集群。

既然這樣,此時可以思考一個問題:Jar包任務是獨立的Main方法,如何能抽取其中的用戶程序獲得Pipeline呢?

通過瀏覽源碼的單元測試,發現了一個很好用的工具類:PackagedProgramUtils。

public static Pipeline getPipelineFromProgram(
        PackagedProgram program,
        Configuration configuration,
        int parallelism,
        boolean suppressOutput) throws CompilerException, ProgramInvocationException {

    // 切換classloader
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

    // 創建env
    OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(
        configuration,
        program.getUserCodeClassLoader(),
        parallelism);
    benv.setAsContext();
    StreamPlanEnvironment senv = new StreamPlanEnvironment(
        configuration,
        program.getUserCodeClassLoader(),
        parallelism);
    senv.setAsContext();

    try {
        // 執行用戶main方法
        program.invokeInteractiveModeForExecution();
    } catch (Throwable t) {
        if (benv.getPipeline() != null) {
            return benv.getPipeline();
        }

        if (senv.getPipeline() != null) {
            return senv.getPipeline();
        }

        ...
    } finally {
        // 重置classloader
    }
}

這個工具類首先在線程內創建了一個env,這個env通過threadload保存到當前線程中。當通過反射調用用戶代碼main方法時,內部的getEnv函數直接從threadlocal中獲取到這個env。

ThreadLocal<StreamExecutionEnvironmentFactory> factory = new ThreadLocal<>();

public static StreamExecutionEnvironment getExecutionEnvironment() {
        return Utils.resolveFactory(factory , contextEnvironmentFactory)
            .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
            .orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
    }

再回頭看看env有什么特殊的。

public class StreamPlanEnvironment extends StreamExecutionEnvironment {

    private Pipeline pipeline;

    public Pipeline getPipeline() {
        return pipeline;
    }

    @Override
    public JobClient executeAsync(StreamGraph streamGraph) {
        pipeline = streamGraph;

        // do not go on with anything now!
        throw new ProgramAbortException();
    }
}

原來是重寫了executeAysnc方法,當用戶執行env.execute時,觸發異常,從而在PackagedProgramUtils里面攔截異常,獲取到用戶到pipeline。

總結起來流程如下:

3 編程實戰

通過閱讀上述源碼,可以學習到:

1 classloader類加載的父類優先和子類優先問題
2 threadlocal線程級本地變量的使用
3 PackagedProgramUtils 利用枚舉作為工具類
4 PackagedProgramUtils 利用重寫env,攔截異常獲取pipeline。

關於pipeline如何提交到集群、如何運行,就后文再談了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM