上文提到構建平台需要實現一個更好的提交流程,具體目標是將 SQL 任務相關內容,如 SQL 、DDL、UDF、配置內容等信息作為參數,調用提交API就能在目標集群創建任務。
一、分析提交作業流程
首先借由官方文檔中的整體角色流程圖,可以看出左側一部分 Flink Program 其中包括用戶程序代碼和一個 Client,是由該 Client 將用戶代碼生成的作業圖 - JobGraph 提交到遠程的 JobManager 中,但是我們前面說提交任務要自己直接上傳 Jar 包到 JobManager,這種情況怎么解釋呢?
其實,這個 client 是指 flink-clients
模塊中提供的 ClusterClient
抽象類,它有兩個實現 MiniClusterClient
和 RestClusterClient
,分別對應向用戶本地內嵌環境和遠程集群提交任務。當我們使用 Main 方法在本地啟動Flink任務時,模版代碼中 getExecutionEnvironment
會創建 LocalStreamEnvironment
,后面它會使用 MiniClusterClient
向自己程序啟動的 MiniCluster
中提交任務(JobGraph和依賴包)。當任務打成 Fat jar 后,提交到 JobManager 的 web 端或集群中任意實例的命令行入口: bin/flink run -d xxx.jar
,都會通過 callMainMethod(mainClass, args)
去執行 getExecutionEnvironment
進而創建別的 XXXEvironment
,最終使用 RestClusterClient
向真正的 JobManager 提交任務(JobGraph和依賴包)。總體上說 client 是指運行 flink-clients
模塊中的 RestClusterClient
的角色。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(……);
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
// ……
tableEnv.execute("jobName");
了解到這里,我們就有了第一個實現作業提交的思路,可以主動指定 createRemoteEnvironment(host, port, configs, jarFiles)
,這樣在 execute()
時就會通過上面的 RestClusterClient
向遠程 JobManager 提交任務。
二、復用 SQL Client 的提交過程
雖然我們解決了如何向遠程集群提交任務的問題,但是這只是低層次的提交API,我們依然要考慮如何封裝 SQL 任務,此外還有如何實現調試功能。先不要閉門造車,我們可以先看看 Flink SQL Client 是怎么實現的。
Flink SQL Client 是官方提供的一個命令行界面的針對Flink的SQL客戶端工具,除了命令解析和配置管理外,它封裝了 Table Api 功能和提交任務的流程,對外暴露一系列功能,如 useDatabase,createTable,listTables,listFunctions,explainStatement,executeQuery,executeUpdate 等,深入源碼可以看到每個功能都對應了一系列 Table Api 操作,最終 execute 時會通過 RestClusterClient
向遠程 JobManager 提交任務。
看的這里,我們有了第二個實現方案,就是通過借(改)鑒(造) Flink SQL Client 的 LocalExecutor 和 提交流程,把該部分擴展為 Web 平台的服務層。這里還有個意外之喜是有了調試功能,LocalExecutor 中實現的 executeQueryInternal
就是一個調試功能,即提交 select 語句,結果輸出持續異步返回給用戶。與之對應的 executeUpdateInternal
是包括 insert 的帶有 Sink 的真正的提交執行,也就是 detached mode,只返回任務 ID。
我們一個完成的SQL任務流程為例,入口參數是源表和結果表 DDL、任務 SQL、UDF 定義 和 jar 包地址 (HDFS 或 本地路徑)、並行度配置、Checkpoint配置、啟動 Savepoint 地址 等信息,其中在創建 ExecutionContext 以及 Environment 的 ExecutionConfig 進行UDF和各類參數配置,通過 Table Api 的 sqlUpdate 執行 DDL,最后復用原來的任務提交代碼。
此外,在最新的 Flink 1.10 版本中,任務提交的 API 有所變化,主要是新增加了抽象層,將原來的 StreamGraph 定義為 PipeLine,增加了 PipelineExecutorFactory
,PipelineExecutor
,ClusterClientProvider
等中間層,最終還是去調用 RestClusterClient
進行任務提交,其中設計模式和層次較多我們暫不關心,可以直接復用 SQL Client 中封裝的 ProgramDeployer 進行調用。