FLINK-啟動命令2(Application 模式)


1. 背景
flink-1.11 引入了一種新的部署模式,即 Application 模式。目前,flink-1.11 已經可以支持基於 Yarn 和 Kubernetes 的 Application 模式。

2. 優勢
Session模式:所有作業共享集群資源,隔離性差,JM 負載瓶頸,main 方法在客戶端執行。
Per-Job模式:每個作業單獨啟動集群,隔離性好,JM 負載均衡,main 方法在客戶端執行。

通過以上兩種模式的特點描述,可以看出,main方法都是在客戶端執行,社區考慮到在客戶端執行 main() 方法來獲取 flink 運行時所需的依賴項,並生成 JobGraph,並將依賴項和 JobGraph 發送到集群的一系列過程中,由於需要大量的網絡帶寬下載依賴項並將二進制文件發送到集群,會造成客戶端消耗大量的資源。尤其在大量用戶共享客戶端時,問題更加突出。因此,社區提出新的部署方式 Application 模式解決該問題。

3. 原理
Application 模式下,用戶程序的 main 方法將在集群中而不是客戶端運行,用戶將程序邏輯和依賴打包進一個可執行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 負責調用其中的 main 方法來生成 JobGraph。Application 模式為每個提交的應用程序創建一個集群,該集群可以看作是在特定應用程序的作業之間共享的會話集群,並在應用程序完成時終止。在這種體系結構中,Application 模式在不同應用之間提供了資源隔離和負載平衡保證。在特定一個應用程序上,JobManager 執行 main() 可以節省所需的 CPU 周期,還可以節省本地下載依賴項所需的帶寬。

4. 使用
application 模式使用 bin/flink run-application 提交作業;通過 -t 指定部署環境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);並支持通過 -D 參數指定通用的 運行配置,比如 jobmanager/taskmanager 內存、checkpoint 時間間隔等。
通過 bin/flink run-application -h 可以看到 -D/-t 的詳細說明:(-e 已經被廢棄,可以忽略)

Options for Generic CLI mode:
     -D <property=value>   Generic configuration options for
                           execution/deployment and for the configured executor.
                           The available options can be found at
                           https://ci.apache.org/projects/flink/flink-docs-stabl
                           e/ops/config.html
     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is
                           also available with the "Application Mode".
                           The name of the executor to be used for executing the
                           given job, which is equivalent to the
                           "execution.target" config option. The currently
                           available executors are: "collection", "remote",
                           "local", "kubernetes-session", "yarn-per-job",
                           "yarn-session".
     -t,--target <arg>     The deployment target for the given application,
                           which is equivalent to the "execution.target" config
                           option. The currently available targets are:
                           "collection", "remote", "local",
                           "kubernetes-session", "yarn-per-job", "yarn-session",
                           "yarn-application" and "kubernetes-application".

下面列舉幾個使用 Application 模式提交作業到 yarn 上運行的命令:

1.帶有 JM 和 TM 內存設置的命令提交:

./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.application.name="MyFlinkWordCount" \
./examples/batch/WordCount.jar

2.在 1 的基礎上自己設置 TaskManager slots 個數為3,以及指定並發數為3:

./bin/flink run-application -t yarn-application -p 3 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/batch/WordCount.jar

當然,指定並發還可以使用 -Dparallelism.default=3,而且社區目前傾向使用 -D+通用配置代替客戶端命令參數(比如 -p)。
所以這樣寫更符合規范:

./bin/flink run-application -t yarn-application \
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/batch/WordCount.jar

3.和 yarn.provided.lib.dirs 參數一起使用,可以充分發揮 application 部署模式的優勢:
我們看 官方配置文檔 對這個配置的解釋:

yarn.provided.lib.dirs: A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib

意思是我們可以預先上傳 flink 客戶端依賴包 (flink-dist/lib/plugin) 到遠端存儲(一般是 hdfs,或者共享存儲),然后通過 yarn.provided.lib.dirs 參數指定這個路徑,flink 檢測到這個配置時,就會從該地址拉取 flink 運行需要的依賴包,省去了依賴包上傳的過程,yarn-cluster/per-job 模式也支持該配置。在之前的版本中,使用 yarn-cluster/per-job 模式,每個作業都會單獨上傳 flink 依賴包(一般會有 180MB左右)導致 hdfs 資源浪費,而且程序異常退出時,上傳的 flink 依賴包往往得不到自動清理。通過指定 yarn.provided.lib.dirs,所有作業都會使用一份遠端 flink 依賴包,並且每個 yarn nodemanager 都會緩存一份,提交速度也會大大提升,對於跨機房提交作業會有很大的優化。
使用示例如下:
my-application.jar 是用戶 jar 包

./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist/lib;hdfs://myhdfs/my-remote-flink-dist/plugins" \
examples/streaming/my-application.jar

my-application.jar 也可以提前上傳 hdfs:

./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist/lib;hdfs://myhdfs/my-remote-flink-dist/plugins"
hdfs://myhdfs/jars/my-application.jar

也可以將 yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml,這時提交作業就和普通作業沒有區別了:

./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.application.name="MyFlinkWordCount" \ 
-Dtaskmanager.numberOfTaskSlots=3 \
/local/path/to/my-application.jar

注意:如果自己指定 yarn.provided.lib.dirs,有以下注意事項:

  • 需要將 lib 包和 plugins 包地址用;分開,從上面的例子中也可以看到,將 plugins 包放在 lib 目錄下可能會有包沖突錯誤
  • plugins 包路徑地址必須以 plugins 結尾,例如上面例子中的 hdfs://myhdfs/my-remote-flink-dist/plugins
  • hdfs 路徑必須指定 nameservice(或 active namenode 地址),而不能使用簡化方式(例如 hdfs:///path/to/lib)

該種模式的操作使得 flink 作業提交變得很輕量,因為所需的 Flink jar 包和應用程序 jar 將到指定的遠程位置獲取,而不是由客戶端下載再發送到集群。這也是社區在 flink-1.11 版本引入新的部署模式的意義所在。

Application 模式在停止、取消或查詢正在運行的應用程序的狀態等方面和 flink-1.11 之前的版本一樣,可以采用現有的方法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM