Flink基於用戶程序生成JobGraph,提交到集群進行分布式部署運行。本篇從源碼角度講解一下Flink Jar包是如何被提交到集群的。(本文源碼基於Flink 1.11.3)
1 Flink run 提交Jar包流程分析
首先分析run腳本可以找到入口類CliFrontend,這個類在main方法中解析參數,基於第二個參數定位到run方法:
try { // do action switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; case "-h": case "--help": ... return 0; case "-v": case "--version": ... default: ... return 1; } }
在run方法中,根據classpath、用戶指定的jar、main函數等信息創建PackagedProgram。在Flink中通過Jar方式提交的任務都封裝成了PackagedProgram對象。
protected void run(String[] args) throws Exception { ... final ProgramOptions programOptions = ProgramOptions.create(commandLine); final PackagedProgram program = getPackagedProgram(programOptions); // 把用戶的jar配置到config里面 final List<URL> jobJars = program.getJobJarAndDependencies(); final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars); try { executeProgram(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } }
創建PackagedProgram后,有個非常關鍵的步驟就是這個effectiveConfig,這里面會把相關的Jar都放入pipeline.jars這個屬性里,后面pipeline提交作業時,這些jar也會一起提交到集群。
其中比較關鍵的是Flink的類加載機制,為了避免用戶自己的jar內與其他用戶沖突,采用了逆轉類加載順序的機制。
private PackagedProgram( @Nullable File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings savepointRestoreSettings, String... args) throws ProgramInvocationException { // 依賴的資源 this.classpaths = checkNotNull(classpaths); // 保存點配置 this.savepointSettings = checkNotNull(savepointRestoreSettings); // 參數配置 this.args = checkNotNull(args); // 用戶jar this.jarFile = loadJarFile(jarFile); // 自定義類加載 this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader( getJobJarAndDependencies(), classpaths, getClass().getClassLoader(), configuration); // 加載main函數 this.mainClass = loadMainClass( entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile), userCodeClassLoader); }
在類加載器工具類中根據參數classloader.resolve-order決定是父類優先還是子類優先,默認是使用子類優先模式。
executeProgram方法內部是啟動任務的核心,在完成一系列的環境初始化后(主要是類加載以及一些輸出信息),會調用packagedProgram的invokeInteractiveModeForExecution的,在這個方法里通過反射調用用戶的main方法。
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException { ... Method mainMethod = entryClass.getMethod("main", String[].class); mainMethod.invoke(null, (Object) args); ... }
執行用戶的main方法后,就是flink的標准流程了。創建env、構建StreamDAG、生成Pipeline、提交到集群、阻塞運行。當main程序執行完畢,整個run腳本程序也就退出了。
總結來說,Flink提交Jar任務的流程是:
1 腳本入口程序根據參數決定做什么操作
2 創建PackagedProgram,准備相關jar和類加載器
3 通過反射調用用戶Main方法
4 構建Pipeline,提交到集群
2 通過PackagedProgram獲取Pipeline
有的時候不想通過阻塞的方式卡任務執行狀態,需要通過類似JobClient的客戶端異步查詢程序狀態,並提供停止退出的能力。
要了解這個流程,首先要了解Pipeline是什么。用戶編寫的Flink程序,無論是DataStream API還是SQL,最終編譯出的都是Pipeline。只是DataStream API編譯出的是StreamGraph,而SQL編譯出的Plan。Pipeline會在env.execute()中進行編譯並提交到集群。
既然這樣,此時可以思考一個問題:Jar包任務是獨立的Main方法,如何能抽取其中的用戶程序獲得Pipeline呢?
通過瀏覽源碼的單元測試,發現了一個很好用的工具類:PackagedProgramUtils。
public static Pipeline getPipelineFromProgram( PackagedProgram program, Configuration configuration, int parallelism, boolean suppressOutput) throws CompilerException, ProgramInvocationException { // 切換classloader final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); // 創建env OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment( configuration, program.getUserCodeClassLoader(), parallelism); benv.setAsContext(); StreamPlanEnvironment senv = new StreamPlanEnvironment( configuration, program.getUserCodeClassLoader(), parallelism); senv.setAsContext(); try { // 執行用戶main方法 program.invokeInteractiveModeForExecution(); } catch (Throwable t) { if (benv.getPipeline() != null) { return benv.getPipeline(); } if (senv.getPipeline() != null) { return senv.getPipeline(); } ... } finally { // 重置classloader } }
這個工具類首先在線程內創建了一個env,這個env通過threadload保存到當前線程中。當通過反射調用用戶代碼main方法時,內部的getEnv函數直接從threadlocal中獲取到這個env。
ThreadLocal<StreamExecutionEnvironmentFactory> factory = new ThreadLocal<>(); public static StreamExecutionEnvironment getExecutionEnvironment() { return Utils.resolveFactory(factory , contextEnvironmentFactory) .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment) .orElseGet(StreamExecutionEnvironment::createLocalEnvironment); }
再回頭看看env有什么特殊的。
public class StreamPlanEnvironment extends StreamExecutionEnvironment { private Pipeline pipeline; public Pipeline getPipeline() { return pipeline; } @Override public JobClient executeAsync(StreamGraph streamGraph) { pipeline = streamGraph; // do not go on with anything now! throw new ProgramAbortException(); } }
原來是重寫了executeAysnc方法,當用戶執行env.execute時,觸發異常,從而在PackagedProgramUtils里面攔截異常,獲取到用戶到pipeline。
總結起來流程如下:
3 編程實戰
通過閱讀上述源碼,可以學習到:
1 classloader類加載的父類優先和子類優先問題
2 threadlocal線程級本地變量的使用
3 PackagedProgramUtils 利用枚舉作為工具類
4 PackagedProgramUtils 利用重寫env,攔截異常獲取pipeline。
關於pipeline如何提交到集群、如何運行,就后文再談了。