
往期推薦:
Flink在1.11版本新增了一種部署模式,目前支持三種:Session 模式、Per job 模式、Application 模式,這三種模式主要在集群管理、資源隔離、用戶main方法執行位置幾個方面有所不同。
本篇會按照下面幾個步驟進行介紹:
1 什么是Session模式
2 什么是Per Job模式
3 從任務解析過程到Application的設計初衷
4 什么是Application模式
5 啟動過程源碼分析
6 總結與參考資料
Session 模式
Flink支持事先創建好一個集群,然后往這個集群上提交任務。所有的任務都在客戶端進行編譯,編譯成JobGraph后,附加上依賴的庫,提交到Flink的集群。集群接收到任務后,會再創建對應的JobMaster進行ExecutionGraph的解析,然后申請資源並執行。如果Flink集群申請的TM內部有很多Slot,那么會按照Slot的粒度進行任務分配,這樣就可能在一個TM上運行多個任務。
這樣設計的好處是,多個任務可以共用一套集群,方便管理監控。但是帶來的缺點也很明顯,當某一個任務崩潰高掛了對應的TM,上面其他的任務都會受到影響。其他的任務受影響崩潰不說,如果大面積的任務恢復,也可能導致JM的性能壓力。
因此Session模式適用於量多、執行任務時間短、對資源不敏感的場景,比如作為在線(即席)查詢引擎。

關於Session模式的部署和使用,也可以參考之前的文章:
Flink Sql-Gateway在Yarn Session模式下的工作原理
Per job 模式
為了進行更好的資源隔離,Flink支持為每個任務單獨創建一個集群,該模式目前支持Yarn、K8s等。當任務執行完畢,集群會自動關閉並回收資源。這樣就保證了更好的資源隔離,單獨的任務失敗也不會影響其他的任務。另外,這種模式分攤了JM的壓力到每個任務,因此這種模式更適合生產環境部署。
觀察下圖可以發現,per job模式和session模式,只有提交任務和啟動graph不一樣,其他后面的流程都是一樣的。
因此Per Job模式適用於執行任務長、對資源敏感或者消耗資源大的任務。

從任務解析過程到Application的設計初衷
在Flink 1.11之前僅有上面兩種模式,那么新實現的Application模式又是什么呢?在了解Application的由來時,最好先來了解下Flink程序的執行過程。

以DataStream API的程序為例,我們編寫的.map().print()屬於程序代碼,對應上圖的program code:
1 當執行env.execute()時,會觸發程序代碼編譯成StreamGraph,StreamGraph主要的作用就是把.map、.partition等翻譯成數據流圖中的節點和邊。
2 接下來任務提交前,會把StreamGraph編譯成JobGraph,JobGraph更像是可以執行的圖結構,並會對其中的一些節點進行合並優化,也叫做chain。比如輸入數據后進行map操作,就可以在一個節點中同時完成讀取和map操作。
3 生成JobGraph后,再把需要的依賴資源,如第三方Jar等一起提交到集群。
4 提交到集群后,session和job模式有所不同。session模式已經存在一個集群,此時的提交是直接發送到集群的dispatcher,內部創建對應的JobMaster,編譯成ExecutionGraph。如果是per job模式則需要新建一個集群,等服務啟動后,把附加過來的jobGraph直接用內部的dispatcher啟動。他們的倆的區別簡單來說,就是一個是事先創建好的集群,一個是臨時啟動的集群。
5 說回到ExecutionGraph,它就是常說的執行圖,執行圖代表了真正物理執行的拓撲圖,比如並行的節點有多少;每個節點讀取什么數據,從哪里讀取;每個節點輸出什么數據,輸出到哪里;然后JobMaster通過調度器進行任務分配。
6 申請好的TM內部會有很多Slot,每個Slot接收發來的Task進行執行,直到任務結束。
7 任務結束后,Session模式會釋放任務申請的資源,並通知內部的ResourceManager組件,方便后續來任務繼續執行;Per Job模式會直接釋放集群。
可以發現,無論是Session還是Per Job,程序代碼都是在客戶端編譯完成。這里的客戶端就是我們執行flink run啟動的程序(其實是CliFrontend)。假如現在需要做一個平台給多個用戶提交任務,或者任務的量級很大,那么客戶端的壓力會非常大。因為編譯生成StreamGraph和JobGraph需要消耗大量的CPU,下載依賴的Jar包資源、上傳JobGraph也需要大量的網絡帶寬,客戶端很容易成為瓶頸。此時,就考慮可不可以把編譯圖的工作放在集群中完成?就類似於Spark的cluster模式,這就是Appllication模式。
Application 模式
Application的設計跟per job非常像,只不過客戶端不在編譯圖,而是直接把執行的Jar和參數信息發送到yarn的AppMaster,在該進程中,同時完成JM的啟動、編譯圖(用戶main方法執行)、任務執行等過程。

這樣還帶來了其他的好處,比如一些公共的lib可以直接存儲在Hdfs,避免多次上傳下載浪費流量。
以Yarn部署為例,想要啟動application模式,可以使用下面的命令:
# 基於application模式啟動本地jar./bin/flink run-application -t yarn-application \./examples/batch/WordCount.jar# 附加集群參數配置./bin/flink run-application -t yarn-application \-Djobmanager.memory.process.size=2048m \-Dtaskmanager.memory.process.size=4096m \./examples/batch/WordCount.jar# 基於application模式啟動遠程jar./bin/flink run-application -t yarn-application \-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \hdfs://myhdfs/jars/my-application.jar
啟動過程源碼分析
觀察flink腳本,可以看到 exec … org.apache.flink.client.cli.CliFrontend “$@“ 的命令,這就是客戶端代碼入口。

在run中是正常session和job的啟動流程,在runApplication中為application模式啟動流程。
在run中通過反射直接運行用戶代碼的main函數,在用戶代碼的execute()方法中編譯圖並提交到yarn。如果是session則直接發送給dispatcher,如果是per job則重新創建集群。

在Application中直接創建遠程集群,並附加Application相關參數:

目前提交到集群啟動的Master進程大致可以分為下面幾種,后續會詳細探索下各個Entrypoint中的細節。

總結
在Session模式中,集群的生命周期與任務無關,可以在集群中同時提交多個任務,他們共享集群資源。Per job模式中,每個任務單獨維護集群,可以做到更好的資源隔離,集群的生命周期與任務相同。在Application模式中,為每個應用創建一個集群,main方法會運行在集群中,避免客戶端過大的壓力。
參考
Flink 1.11 官方文檔——集群與部署:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
Flink 1.11 官方文檔——Yarn集群與部署:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#user-jars--classpath
Flink 1.11 官方文檔——CLI客戶端命令:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html
FLIP-85 Application Mode:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
關於Application模式的郵件討論:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-85-Delayed-Job-Graph-Generation-td35759.html
[簡書]Flink 1.11 中的Application模式:
https://www.jianshu.com/p/85f2b32186cb
