Flink使用(二)——Flink集群資源規划


前言

  本文主要譯自Flink Forward 2017的柏林站中Robert Metzger的有關集群規划的How to size your flink cluster一文。該文中主要是考慮網絡資源,博主結合自己的使用經驗對文中省略的做了一定補充,同時也非常歡迎大伙留言補充。

  本文非直譯,原文鏈接如下:https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

  文中拿捏不准的地方,均附有英文原文。若是有表述不合適的,歡迎大伙留言指出。


  1、關鍵參數與資源

  為估算Flink集群所需資源,首先我們需要根據Flink任務中的指標給出集群的最低資源需求(baseline)。

  1.1 指標(metric)

    1)每秒的record數和每個record的大小;

    2)不同key的個數和每個key產生state的大小;

    3)state的更新方式以及state的訪問模式

  此外還需考慮SLA(服務級別協議)。例如,可能願意接受的停機時間,可接受的延遲或最大吞吐量,因為此類SLA會對Flink群集的大小產生影響。

  1.2 資源

    在給Flink集群做規划時,我們需要考慮集群的資源,但這里的資源一般指什么呢?一般有以下幾種:

    1)網絡容量。在考慮網絡容量時,我們也需要考慮到可能使用網絡的其他服務,如Kafka、HDFS等;

    2)磁盤帶寬。當我們的容錯機制是基於磁盤的,如RockDB、HDFS,此時也有可能需要考慮到Kafka,因為其也是將數據存在磁盤的;

    3)節點數量以及能提供的CPU和內存;


  2、例子

  Flink例子的拓撲圖1如下:

   該例子從kafka消費message,以用戶id(userId)做keyBy后,經過window算子聚合(window算子為sliding window,其窗口大小為5min,間隔是1min),處理后的消息寫入到kafka中。

  2.1 任務metrics

  從kafka消費的record平均大小為2KB,吞吐量為1百萬/s,userId的個數為5億(5*10^9)。該任務的關鍵指標(key metric)如下:

   2.2 硬件

  1)5個節點,每個節點有一個TaskManager;2)萬兆網;3)磁盤通過網絡連接(本例中集群部署在雲上,物理機得另外考慮);此外,kafka是單獨的集群。如下圖2:

  每個節點是16核,為簡化,文中暫不考慮CUP和內存的需求。在實際的生產中需要根據任務邏輯和容錯方式去考慮內存。本例的狀態是通過RockDB的方式存儲,該方式對內存的要求較小。

   2.3 單節點資源需求

    為方便分析,我們先考慮單節點上的資源需求,集群整體的需求可以大致通過乘以節點數得到。例子中,每個算子的並行度相同且沒有其他特殊調度限制,每個節點擁有流任務的所有算子,即每個節點上都有Kafka source、window、Kafka sink算子,如下圖3:

  為方便計算資源,上圖中KeyBy算子單獨給出,但在實際中KeyBy是Kafka算子和window算子之間鏈接的配置屬性。下面將結合圖3從上往下分析網絡資源的需求(network resource requirement)。

  2.3.1  Kafka Source

  為計算從單個Kafka Source的拿到的數據,我們先計算從Kafka拿到數據的綜合,計算過程如下:

  1)每秒1,000,000條,每條大小為2KB ,每秒獲得總數據為:

    2KB×1,000,000/s=2GB/s

  2)Flink集群中每個節點每秒獲得數據為

    2GB/s÷5=400MB/s

  2.3.2 Shuffle過程(KeyBy)

  經過KeyBy后,具有相同userId的數據將會在一個節點上,但是Kafka可能根據不同的元數據進行分區(partitioned according to a different partitioning scheme),因此對一個key(userId),單個節點直接從Kafka得到的數據為400MB/s÷5=80MB/s,這樣就有320MB/s的需要通過shuffle獲得。

  2.3.3 window emit和Kafka sink

    window會發送多少數據,有多少數據會到Kafka sink?分析如下:

    window算子為每個key(userId)聚合生成4個long數,每分鍾發射一次,這樣window每分鍾為每個key會發射2個int字段(userId、window_ts)和4個long字段,總的數據量如下:

    (2 x 4 bytes) + (4 x 8 bytes) = 40 bytes per key

  這樣5個節點,每個節點的數據量為:

    500,000,000 keys x 40 bytes÷5 = 4GB

  每秒的數據量為4GB/min ÷ 60 = 67MB/s,因為每個節點上都有Kafka sink,不需要額外的重分區,因此從Flink到Kafka的數據為67MB/s。在實際中,算子不會以67MB/s的恆定速度發送數據,而是每分鍾最大限度地利用可用帶寬幾秒鍾。

  單節點數據總流向總結如下:

  • Data in: 720MB/s (400 + 320) per machine
  • Data out: 387MB/s (320 + 67) per machine

  整個過程可以總結如下:

    2.3.4  獲取state和checkpointing

    到目前為止,我們只考慮Flink處理的數據。實際上,還需考慮到state存儲和checkpoint過程中所需要的網絡資源。

    1)state消耗的網絡帶寬

    為弄清window算子的state大小,我們需要從另外一個角度去分析該問題。Flink的計算窗口大小為5min,滑動尺度為1min,為此Flink通過維持五個窗口實現“滑動窗口”。如在2.3.3節中提到,每個窗口Flink需要維持40Bytes的數據。每當一個event到達時,Flink將會從已有state中獲得數據(40Bytes)去更新聚合值,然后將更新后的數據寫入state(磁盤),如下圖:

   這意味每個節點將會產生40MB/s的網絡消耗,計算方式如下:

  40 bytes of state x 5 windows x 200,000 msg/s per machine = 40MB/s

  正如文中開始提及的,磁盤是通過網絡連接的,所以state讀取產生的網絡消耗也得考慮進去,則單節點整體的網絡資源情況如下:

   2)checkpoint過程

    每當有新event到來上述state過程就會被觸發,有時間我們為了保證當任務失敗后可以恢復會開啟checkpoint,本例中checkpoint設置為每隔一分鍾周期性觸發,每個checkpoint過程會將現有的state通過網絡拷貝到系統中。每個節點一次checkpoint會拷貝的數據為:

  40bytes of state x 5 windows x 100,000,000 keys = 20GB

  每秒中的數據為20GB ÷ 60 = 333 MB/s。當然checkpoint過程數據同樣不是以穩定的速率發送到系統中,而是會以最大的速率發送。此外,從Flink1.3以后,基於RockDB是可以實現增量checkpoint,本例暫時不考慮該特性。單節點整個任務過程網絡消耗如下:

   集群整體網絡消耗如下:

    760 + 760 x 5 + (40×2)×5 + (400+67)×5 = 10335 MB/s

  (40×2)×5是5個節點state的讀寫過程消耗,(400+67)×5是從Kafka讀和寫過程消耗的(kafka數據會落盤)。

  該數據僅為上述硬件設置中的可用網絡容量的一半以上,如下圖。

   2.3.5 總結

    該例子中,每個節點流進和流出的數據為760MB/s,僅為節點容量的60%(每個節點為1250MB/s),剩下的40%可以用來應對突發的情況,如網絡開銷、checkpoint恢復期間的數據重放或者由於數據傾斜導致的節點之間數據shuffle過大的情況等。


 3、其他建議

  1)CUP個數,Flink官網給出的建議是和slot的個數成比例,從而也就和任務的並行度有關了,換句話說,在考慮任務的並行度時要結合CPU的個數考慮;

  2)盡量申請多的內存,內存的最小和可以通過在測試集群中測試后,大致成比例的放大到生成集群中;

  3)考慮I/O,數據盤最好和日志盤分離;

  4)還有其他如JobManager最好和TaskManager節點分離等;

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM