前言
本文主要是想簡要說明Flink在集群部署、任務提交、任務運行過程中資源情況,若表述有誤歡迎大伙留言分享,非常感謝!
一、集群部署階段
集群部署這里指的是Flink standalone模式,因為在Yarn模式(包括session、single job模式也成Per-job模式)是可以僅通過Flink client提交任務到Yarn上,所以是否手動部署Flink集群對任務的執行是沒有影響的。下圖[1]是簡單的Flink的集群構成情況,包括一個master(JobManager)、兩個worker(TaskManager)。至於Flink standalone模式的HA(一般有兩個JobManager加上若干個TaskManager組成)是通過zookeeper實現的,其主要思想是通過zookeeper選舉出JobManager的active節點,該結點負責資源分配等,另一個節點為standby。Flink的Yarn模式的高可用的通過在container中對JobManager的重啟實現的,其具體過程在此不詳細說明。

部署階段主要有以下兩個參數:
1 #每個TaskManager中slot的個數,在conf/flink-conf.yaml中,默認為1 2 taskmanager.numberOfTaskSlots 3 #任務的並行度,默認為1 4 parallelism.default
1)taskmanager.numberOfTaskSlots:每個TaskManager中slot的數量,官方文檔推薦:和每個TaskManager中物理CPU的個數成比例,比如:等於CPU的個數,或者是CPU個數的一半;
2)parallelism.default:任務的並行度,默認值為1,該值可以通過在程序中設定setParallesim()改變,並行度與slot的關系見下詳解;
二、任務提交階段
Flink集群資源的使用情況除了和已分配的資源有關外,還與並行度有關,已分配的資源表示Flink可以利用這么多資源,而實際能利用多少資源還和並行度有關。任務並行度的最大值由TaskManager集群的Slot總數決定,如:slot總數為10,則任務最大的並行度為10.
在standalone模式中,Flink任務能利用的總資源已在啟動集群時確定,其並行通過在執行./flink run 時,通過可選參數[-p]確定(不指定則為默認值1)。
在Yarn模式中,均是先Yarn集群中分配資源給新建的Flink集群,如下圖,其詳細過程見文檔[2]。

Flink的Yarn模式session、single job模式在實現方法上還是存在一定的區別:
1)session模式是先利用yarn-session.sh在yarn新建一個Flink集群,然后利用./flink run flinkExample.jar提交任務,其資源分配的方式和standalone模式一致;
./yarn-session.sh -n 4 -s 8 -jm 3072 -tm 32768
-n:在yarn上分配了4個container,即分配了4個TaskManager;
-s:每個TaskManager有8個slot;
-jm:每個JobManager中的內存數;
-tm:每個TaskManager中的內存數;
2)single job模式在命令中在申請資源的同時,提交任務,常用命令如下:
./flink run -m yarn-cluster -yn 7 -ys 8 example.jar
-yn:yarn上分配的container個數,即TaskManager的個數;
-ys:每個TaskManager中slot的個數
其他參數的具體使用方法可以在控制台中執行./flink、./yarn-session.sh得到說明。
說明:session模式和single job的區別:
1)session模式:Flink任務運行結束后,從yarn上申請到的資源是不被釋放的,等待下一個Flink的提交,類似一個常住在yarn上永遠不會結束的yarn任務。因為,yarn的資源分配模式中比如fair策略還是存在資源的競爭的,session模式資源的不釋放性,這樣可以在Yarn提供資源分配上的基礎上進行實現資源隔離,也實現了對集群物理環境的屏蔽,但在一定的程度上造成了資源的浪費;
2)single job模式和一般的yarn任務一樣,任務結束后就會釋放申請到的資源,使資源得到重復利用。
所以session、single job模式的實際應用應根據需求使用,一般情況下:
single job模式是按需申請資源,一般適合執行時間較長的大任務。此外,該模式下,每次提交任務都需單獨啟動Flink集群自己的ResourceManager會造成一定的時延;
session模式共享資源,一般適合執行時間短的小任務。此模式下,會共享ResourceManager,所以啟動快。
【說明】這里所說的ResourceManager不是Yarn的組件,是Flink本身的,關於深層的分析,見后續博客。
三、任務運行階段
假定一個3個TaskManager的集群,每個TaskManager有3個slot,集群一個9個slot,從下圖中可以得到[3]:任務的並行度為1時,只會用一個slot,並行度為2會用2個slot,依次類推,當並行度為9時,9個slot都會被利用,當然從example4中可知,可以針對不同的算子(operator)定義並行度。


Ref:
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/cluster_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/yarn_setup.html
[3]https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html
