作業提交流程官方介紹:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/python/overview
命令行提交作業:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/cli/#submitting-pyflink-jobs
以下面的基礎命令為入口點:
./bin/flink run --python examples/python/table/word_count.py
查看flink-dist/src/main/bin/flink 文件可以看到這個shell腳本最后一行:
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
是通過shell去啟動jvm進程,入口點就是CliFrontend這個類,同時把命令行傳進來的參數也傳給它的main方法。
從代碼可以看出CliFrontend的main方法主要做兩件事,
- 初始化全局配置參數(不是傳進來的args)
- 構造CliFrontend實例,調用parseAndRun()
try { final CliFrontend cli = new CliFrontend(configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration)); retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args)); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); } finally { System.exit(retCode); }
在隨后的parseAndRun中
進行參數校驗,對參數中的第一位進行截取,默認第一位參數為命令(action),通過switch去執行對應的命令操作。
// check for action if (args.length < 1) { CliFrontendParser.printHelp(customCommandLines); System.out.println("Please specify an action."); return 1; } // get action String action = args[0]; // remove action from parameters // ./flink run --python /home/jrdw/peizhouyu/flink-python/app.py --jobmanager 11.95.88.158:6017 // 截除第一個參數 // params[] = --python /home/jrdw/peizhouyu/flink-python/app.py --jobmanager 11.95.88.158:6017 final String[] params = Arrays.copyOfRange(args, 1, args.length); try { // do action switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); return 0; case "-v": case "--version":
ACTION_RUN 這個常量就是run,所以執行的命令調用對應的run()方法。在run方法中主要關注create ProgramOptions的過程,這個過程把輸入的內容轉成對象ProgramOptions。
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
create過程會判斷當前是Java作業還是python作業,判斷的依據就是命令行傳入的參數中是否有python作業相關的關鍵字,比如:py,python,pym,pyModule...等,或者啟動類為org.apache.flink.client.python.PythonGatewayServer都會被判定為python作業。
如果是python作業則會通過createPythonProgramOptions方法創建ProgramOptions,而Java是直接new ProgramOptions。
public static ProgramOptions create(CommandLine line) throws CliArgsException { if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) { return createPythonProgramOptions(line); } else { return new ProgramOptions(line); } }
createPythonProgramOptions會先初始化一個用來加載引擎python相關擴展包的類加載器,然后通過反射創建PythonProgramOptions實例,PythonProgramOptions繼承自ProgramOptions。
需要注意的是在反射創建PythonProgramOptions的時候,在構造方法中會初始化一些配置對應的變量,同時會把entryPointClass屬性設置為: org.apache.flink.client.python.PythonDriver
public static ProgramOptions createPythonProgramOptions(CommandLine line) throws CliArgsException { try { ClassLoader classLoader = getPythonClassLoader(); Class<?> pythonProgramOptionsClazz = Class.forName( "org.apache.flink.client.cli.PythonProgramOptions", false, classLoader); Constructor<?> constructor = pythonProgramOptionsClazz.getConstructor(CommandLine.class); return (ProgramOptions) constructor.newInstance(line); } catch (InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { throw new CliArgsException( "Python command line option detected but the flink-python module seems to be missing " + "or not working as expected.", e); } }
回到run方法繼續看create ProgramOptions之后的邏輯,后續getJobJarAndDependencies方法獲取依賴的jar列表,其中也是通過entryPointClass的值判斷是否是python作業,如果是python作業會獲取FLINK_OPT_DIR下面flink-python開頭的jar包作為dependencies包。隨后配置合並和處理拿到effectiveConfiguration。
准備都完成以后會根據ProgramOptions(python作業為其子類PythonProgramOptions)和 effectiveConfiguration 創建 PackagedProgram,創建之后調用 executeProgram () 執行創建的PackagedProgram。
final ProgramOptions programOptions = ProgramOptions.create(commandLine); final List<URL> jobJars = getJobJarAndDependencies(programOptions); final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); }
executeProgram()又通過ClientUtils.executeProgram()執行
protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException { ClientUtils.executeProgram( new DefaultExecutorServiceLoader(), configuration, program, false, false); }
ClientUtils.executeProgram()中講上下文環境的類加載器設置為之前創建的,同時保留當前上下文的類加載器,后面提交完成后需要切換回來,然后執行 program 的 invokeInteractiveModeForExecution()方法。
public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info( "Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } }
invokeInteractiveModeForExecution中又執行 callMainMethod(mainClass, args);
public void invokeInteractiveModeForExecution() throws ProgramInvocationException { FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); try { callMainMethod(mainClass, args); } finally { FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread(); } }
callMainMethod()中就是反射調用入口類的main方法了,略去一下反射檢查,關鍵就這一行。
mainMethod.invoke(null, (Object) args);
這里執行的這個main方法對於Java作業來說就是寫的jar程序的main方法,對於python作業其實是之前默認賦值的 org.apache.flink.client.python.PythonDriver 的mian方法(后續還有 org.apache.flink.client.python.PythonGatewayServer )。
占位符