參考前文:解決 Flink 1.11.0 sql 不能指定 jobName 的問題
從 FLink 1.11 改版 sql 的執行流程后,就不能和 Stream Api 一樣使用 env.execute("JobName") 來指定任務名
看了源碼后發現,在 sql 任務中,直接使用了 "insert-into" 拼接 catelog/database/sink table 做為 sql 任務的 job name
String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);
使用體驗當然是不好的,在 JIRA 上有個 改進的嚴重 issues: https://issues.apache.org/jira/browse/FLINK-18545 討論這個問題,
最后決定在 PipelineOptions 中添加 "pipeline.name" 參數做為 job name
public class PipelineOptions { /** * The job name used for printing and logging. */ public static final ConfigOption<String> NAME = key("pipeline.name") .stringType() .noDefaultValue() .withDescription("The job name used for printing and logging.");
這個 issues 在 Flink 1.12.0 終於 merge 進去了,所以升級到 Flink 1.12.0 就不再需要修改源碼,直接在 TableConfig 中添加 "pipeline.name" 參數即可
由於之前為了指定 JobName 之前修改過源碼,所以升級到 Flink 1.12.0 的第一件事情就是去掉之前修改的源碼,使用 “pipeline.name” 配置參數指定 JobName
其他代碼都和以前一樣,只需要在 TableConfig 添加參數即可
val tabConf = tableEnv.getConfig
onf.setString("pipeline.name", Common.jobName)
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文