Flink1.14 python作業提交流程分析


作業提交流程官方介紹:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/overview

命令行提交作業:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/cli/#submitting-pyflink-jobs

以下面的基礎命令為入口點:

./bin/flink run --python examples/python/table/word_count.py

查看flink-dist/src/main/bin/flink 文件可以看到這個shell腳本最后一行:

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

是通過shell去啟動jvm進程,入口點就是CliFrontend這個類,同時把命令行傳進來的參數也傳給它的main方法。

 從代碼可以看出CliFrontend的main方法主要做兩件事,

  1. 初始化全局配置參數(不是傳進來的args)
  2. 構造CliFrontend實例,調用parseAndRun()
   try {
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        } finally {
            System.exit(retCode);
      }

 在隨后的parseAndRun中

進行參數校驗,對參數中的第一位進行截取,默認第一位參數為命令(action),通過switch去執行對應的命令操作。

 // check for action
        if (args.length < 1) {
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }

        // get action
        String action = args[0];

        // remove action from parameters
//        ./flink run --python /home/jrdw/peizhouyu/flink-python/app.py --jobmanager 11.95.88.158:6017
        // 截除第一個參數
//       params[] =  --python /home/jrdw/peizhouyu/flink-python/app.py --jobmanager 11.95.88.158:6017
        final String[] params = Arrays.copyOfRange(args, 1, args.length);

        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                case ACTION_INFO:
                    info(params);
                    return 0;
                case ACTION_CANCEL:
                    cancel(params);
                    return 0;
                case ACTION_STOP:
                    stop(params);
                    return 0;
                case ACTION_SAVEPOINT:
                    savepoint(params);
                    return 0;
                case "-h":
                case "--help":
                    CliFrontendParser.printHelp(customCommandLines);
                    return 0;
                case "-v":
                case "--version":

ACTION_RUN 這個常量就是run,所以執行的命令調用對應的run()方法。在run方法中主要關注create ProgramOptions的過程,這個過程把輸入的內容轉成對象ProgramOptions。

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

create過程會判斷當前是Java作業還是python作業,判斷的依據就是命令行傳入的參數中是否有python作業相關的關鍵字,比如:py,python,pym,pyModule...等,或者啟動類為org.apache.flink.client.python.PythonGatewayServer都會被判定為python作業。

如果是python作業則會通過createPythonProgramOptions方法創建ProgramOptions,而Java是直接new ProgramOptions。

 public static ProgramOptions create(CommandLine line) throws CliArgsException {
        if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
            return createPythonProgramOptions(line);
        } else {
            return new ProgramOptions(line);
        }
    }

createPythonProgramOptions會先初始化一個用來加載引擎python相關擴展包的類加載器,然后通過反射創建PythonProgramOptions實例,PythonProgramOptions繼承自ProgramOptions。

需要注意的是在反射創建PythonProgramOptions的時候,在構造方法中會初始化一些配置對應的變量,同時會把entryPointClass屬性設置為: org.apache.flink.client.python.PythonDriver

public static ProgramOptions createPythonProgramOptions(CommandLine line)
            throws CliArgsException {
        try {
            ClassLoader classLoader = getPythonClassLoader();
            Class<?> pythonProgramOptionsClazz =
                    Class.forName(
                            "org.apache.flink.client.cli.PythonProgramOptions", false, classLoader);
            Constructor<?> constructor =
                    pythonProgramOptionsClazz.getConstructor(CommandLine.class);
            return (ProgramOptions) constructor.newInstance(line);
        } catch (InstantiationException
                | InvocationTargetException
                | NoSuchMethodException
                | IllegalAccessException
                | ClassNotFoundException e) {
            throw new CliArgsException(
                    "Python command line option detected but the flink-python module seems to be missing "
                            + "or not working as expected.",
                    e);
        }
    }

回到run方法繼續看create ProgramOptions之后的邏輯,后續getJobJarAndDependencies方法獲取依賴的jar列表,其中也是通過entryPointClass的值判斷是否是python作業,如果是python作業會獲取FLINK_OPT_DIR下面flink-python開頭的jar包作為dependencies包。隨后配置合並和處理拿到effectiveConfiguration。

准備都完成以后會根據ProgramOptions(python作業為其子類PythonProgramOptions)和 effectiveConfiguration 創建 PackagedProgram,創建之后調用 executeProgram () 執行創建的PackagedProgram。

     final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            executeProgram(effectiveConfiguration, program);
        }

 executeProgram()又通過ClientUtils.executeProgram()執行

protected void executeProgram(final Configuration configuration, final PackagedProgram program)
            throws ProgramInvocationException {
        ClientUtils.executeProgram(
                new DefaultExecutorServiceLoader(), configuration, program, false, false);
    }

ClientUtils.executeProgram()中講上下文環境的類加載器設置為之前創建的,同時保留當前上下文的類加載器,后面提交完成后需要切換回來,然后執行 program 的 invokeInteractiveModeForExecution()方法。

public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program,
            boolean enforceSingleJobExecution,
            boolean suppressSysout)
            throws ProgramInvocationException {
        checkNotNull(executorServiceLoader);
        final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);

            LOG.info(
                    "Starting program (detached: {})",
                    !configuration.getBoolean(DeploymentOptions.ATTACHED));

            ContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            StreamContextEnvironment.setAsContext(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout);

            try {
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

 invokeInteractiveModeForExecution中又執行 callMainMethod(mainClass, args);

 public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            callMainMethod(mainClass, args);
        } finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

callMainMethod()中就是反射調用入口類的main方法了,略去一下反射檢查,關鍵就這一行。

 mainMethod.invoke(null, (Object) args);

這里執行的這個main方法對於Java作業來說就是寫的jar程序的main方法,對於python作業其實是之前默認賦值的 org.apache.flink.client.python.PythonDriver 的mian方法(后續還有 org.apache.flink.client.python.PythonGatewayServer )。

 

 

占位符


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM