Flink 1.12.0 sql 任務指定 job name


參考前文:解決 Flink 1.11.0 sql 不能指定 jobName 的問題

從 FLink 1.11 改版 sql 的執行流程后,就不能和 Stream Api 一樣使用 env.execute("JobName") 來指定任務名

看了源碼后發現,在 sql 任務中,直接使用了 "insert-into" 拼接 catelog/database/sink table 做為 sql 任務的 job name

String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);

使用體驗當然是不好的,在 JIRA 上有個 改進的嚴重 issues: https://issues.apache.org/jira/browse/FLINK-18545 討論這個問題,
最后決定在 PipelineOptions 中添加 "pipeline.name" 參數做為 job name

public class PipelineOptions {

  /**
   * The job name used for printing and logging.
   */
  public static final ConfigOption<String> NAME =
    key("pipeline.name")
      .stringType()
      .noDefaultValue()
      .withDescription("The job name used for printing and logging.");

這個 issues 在 Flink 1.12.0 終於 merge 進去了,所以升級到 Flink 1.12.0 就不再需要修改源碼,直接在 TableConfig 中添加 "pipeline.name" 參數即可

由於之前為了指定 JobName 之前修改過源碼,所以升級到 Flink 1.12.0 的第一件事情就是去掉之前修改的源碼,使用 “pipeline.name” 配置參數指定 JobName

其他代碼都和以前一樣,只需要在 TableConfig 添加參數即可

val tabConf = tableEnv.getConfig
onf.setString("pipeline.name", Common.jobName)

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM