前言
本文主要譯自Flink Forward 2017的柏林站中Robert Metzger的有關集群規划的How to size your flink cluster一文。該文中主要是考慮網絡資源,博主結合自己的使用經驗對文中省略的做了一定補充,同時也非常歡迎大伙留言補充。
本文非直譯,原文鏈接如下:https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
文中拿捏不准的地方,均附有英文原文。若是有表述不合適的,歡迎大伙留言指出。
1、關鍵參數與資源
為估算Flink集群所需資源,首先我們需要根據Flink任務中的指標給出集群的最低資源需求(baseline)。
1.1 指標(metric):
1)每秒的record數和每個record的大小;
2)不同key的個數和每個key產生state的大小;
3)state的更新方式以及state的訪問模式
此外還需考慮SLA(服務級別協議)。例如,可能願意接受的停機時間,可接受的延遲或最大吞吐量,因為此類SLA會對Flink群集的大小產生影響。
1.2 資源
在給Flink集群做規划時,我們需要考慮集群的資源,但這里的資源一般指什么呢?一般有以下幾種:
1)網絡容量。在考慮網絡容量時,我們也需要考慮到可能使用網絡的其他服務,如Kafka、HDFS等;
2)磁盤帶寬。當我們的容錯機制是基於磁盤的,如RockDB、HDFS,此時也有可能需要考慮到Kafka,因為其也是將數據存在磁盤的;
3)節點數量以及能提供的CPU和內存;
2、例子
Flink例子的拓撲圖1如下:
該例子從kafka消費message,以用戶id(userId)做keyBy后,經過window算子聚合(window算子為sliding window,其窗口大小為5min,間隔是1min),處理后的消息寫入到kafka中。
2.1 任務metrics
從kafka消費的record平均大小為2KB,吞吐量為1百萬/s,userId的個數為5億(5*10^9)。該任務的關鍵指標(key metric)如下:
2.2 硬件
1)5個節點,每個節點有一個TaskManager;2)萬兆網;3)磁盤通過網絡連接(本例中集群部署在雲上,物理機得另外考慮);此外,kafka是單獨的集群。如下圖2:
每個節點是16核,為簡化,文中暫不考慮CUP和內存的需求。在實際的生產中需要根據任務邏輯和容錯方式去考慮內存。本例的狀態是通過RockDB的方式存儲,該方式對內存的要求較小。
2.3 單節點資源需求
為方便分析,我們先考慮單節點上的資源需求,集群整體的需求可以大致通過乘以節點數得到。例子中,每個算子的並行度相同且沒有其他特殊調度限制,每個節點擁有流任務的所有算子,即每個節點上都有Kafka source、window、Kafka sink算子,如下圖3:
為方便計算資源,上圖中KeyBy算子單獨給出,但在實際中KeyBy是Kafka算子和window算子之間鏈接的配置屬性。下面將結合圖3從上往下分析網絡資源的需求(network resource requirement)。
2.3.1 Kafka Source
為計算從單個Kafka Source的拿到的數據,我們先計算從Kafka拿到數據的綜合,計算過程如下:
1)每秒1,000,000條,每條大小為2KB ,每秒獲得總數據為:
2KB×1,000,000/s=2GB/s
2)Flink集群中每個節點每秒獲得數據為
2GB/s÷5=400MB/s
2.3.2 Shuffle過程(KeyBy)
經過KeyBy后,具有相同userId的數據將會在一個節點上,但是Kafka可能根據不同的元數據進行分區(partitioned according to a different partitioning scheme),因此對一個key(userId),單個節點直接從Kafka得到的數據為400MB/s÷5=80MB/s,這樣就有320MB/s的需要通過shuffle獲得。
2.3.3 window emit和Kafka sink
window會發送多少數據,有多少數據會到Kafka sink?分析如下:
window算子為每個key(userId)聚合生成4個long數,每分鍾發射一次,這樣window每分鍾為每個key會發射2個int字段(userId、window_ts)和4個long字段,總的數據量如下:
(2 x 4 bytes) + (4 x 8 bytes) = 40 bytes per key
這樣5個節點,每個節點的數據量為:
500,000,000 keys x 40 bytes÷5 = 4GB
每秒的數據量為4GB/min ÷ 60 = 67MB/s,因為每個節點上都有Kafka sink,不需要額外的重分區,因此從Flink到Kafka的數據為67MB/s。在實際中,算子不會以67MB/s的恆定速度發送數據,而是每分鍾最大限度地利用可用帶寬幾秒鍾。
單節點數據總流向總結如下:
- Data in: 720MB/s (400 + 320) per machine
- Data out: 387MB/s (320 + 67) per machine
整個過程可以總結如下:
2.3.4 獲取state和checkpointing
到目前為止,我們只考慮Flink處理的數據。實際上,還需考慮到state存儲和checkpoint過程中所需要的網絡資源。
1)state消耗的網絡帶寬
為弄清window算子的state大小,我們需要從另外一個角度去分析該問題。Flink的計算窗口大小為5min,滑動尺度為1min,為此Flink通過維持五個窗口實現“滑動窗口”。如在2.3.3節中提到,每個窗口Flink需要維持40Bytes的數據。每當一個event到達時,Flink將會從已有state中獲得數據(40Bytes)去更新聚合值,然后將更新后的數據寫入state(磁盤),如下圖:
這意味每個節點將會產生40MB/s的網絡消耗,計算方式如下:
40 bytes of state x 5 windows x 200,000 msg/s per machine = 40MB/s
正如文中開始提及的,磁盤是通過網絡連接的,所以state讀取產生的網絡消耗也得考慮進去,則單節點整體的網絡資源情況如下:
2)checkpoint過程
每當有新event到來上述state過程就會被觸發,有時間我們為了保證當任務失敗后可以恢復會開啟checkpoint,本例中checkpoint設置為每隔一分鍾周期性觸發,每個checkpoint過程會將現有的state通過網絡拷貝到系統中。每個節點一次checkpoint會拷貝的數據為:
40bytes of state x 5 windows x 100,000,000 keys = 20GB
每秒中的數據為20GB ÷ 60 = 333 MB/s。當然checkpoint過程數據同樣不是以穩定的速率發送到系統中,而是會以最大的速率發送。此外,從Flink1.3以后,基於RockDB是可以實現增量checkpoint,本例暫時不考慮該特性。單節點整個任務過程網絡消耗如下:
集群整體網絡消耗如下:
760 + 760 x 5 + (40×2)×5 + (400+67)×5 = 10335 MB/s
(40×2)×5是5個節點state的讀寫過程消耗,(400+67)×5是從Kafka讀和寫過程消耗的(kafka數據會落盤)。
該數據僅為上述硬件設置中的可用網絡容量的一半以上,如下圖。
2.3.5 總結
該例子中,每個節點流進和流出的數據為760MB/s,僅為節點容量的60%(每個節點為1250MB/s),剩下的40%可以用來應對突發的情況,如網絡開銷、checkpoint恢復期間的數據重放或者由於數據傾斜導致的節點之間數據shuffle過大的情況等。
3、其他建議
1)CUP個數,Flink官網給出的建議是和slot的個數成比例,從而也就和任務的並行度有關了,換句話說,在考慮任務的並行度時要結合CPU的個數考慮;
2)盡量申請多的內存,內存的最小和可以通過在測試集群中測試后,大致成比例的放大到生成集群中;
3)考慮I/O,數據盤最好和日志盤分離;
4)還有其他如JobManager最好和TaskManager節點分離等;