Flink on YARN時,如何確定TaskManager數


轉自: https://www.jianshu.com/p/5b670d524fa5

 

答案寫在最前面:Job的最大並行度除以每個TaskManager分配的任務槽數。

問題

Flink 1.5 Release Notes中,有這樣一段話,直接上截圖。

 

這說明從1.5版本開始,Flink on YARN時的容器數量——亦即TaskManager數量——將由程序的並行度自動推算,也就是說flink run腳本的-yn/--yarncontainer參數不起作用了。那么自動推算的規則是什么呢?要弄清楚它,先來復習Flink的並行度(Parallelism)和任務槽(Task Slot)。

並行度(Parallelism)

與Spark類似地,一個Flink Job在生成執行計划時也划分成多個Task。Task可以是Source、Sink、算子或算子鏈(算子鏈有點意思,之后會另寫文章詳細說的)。Task可以由多線程並發執行,每個線程處理Task輸入數據的一個子集。而並發的數量就稱為Parallelism,即並行度。

Flink程序中設定並行度有4種級別,從低到高分別為:算子級別、執行環境(ExecutionEnvironment)級別、客戶端(命令行)級別、配置文件(flink-conf.yaml)級別。實際執行時,優先級則是反過來的,算子級別最高。簡單示例如下。

  • 算子級別
dataStream.flatMap(new SomeFlatMapFunction()).setParallelism(4); 
  • 執行環境級別
streamExecutionEnvironment.setParallelism(4); 
  • 命令行級別
bin/flink -run --parallelism 4 example-0.1.jar
  • flink-conf.yaml級別
parallelism.default: 4

任務槽(Task Slot)

Flink運行時由兩個組件組成:JobManager與TaskManager,與Spark Standalone模式下的Master與Worker是同等概念。從官網抄來的圖如下所示,很容易理解。

 

JobManager和TaskManager本質上都是JVM進程。為了提高Flink程序的運行效率和資源利用率,Flink在TaskManager中實現了任務槽(Task Slot)。任務槽是Flink計算資源的基本單位,每個任務槽可以在同一時間執行一個Task,而TaskManager可以擁有一個或者多個任務槽。

任務槽可以實現TaskManager中不同Task的資源隔離,不過是邏輯隔離,並且只隔離內存,亦即在調度層面認為每個任務槽“應該”得到taskmanager.heap.size的N分之一大小的內存。CPU資源不算在內。

TaskManager的任務槽個數在使用flink run腳本提交on YARN作業時用-ys/--yarnslots參數來指定,另外在flink-conf.yaml文件中也有默認值taskManager.numberOfTaskSlots。一般來講,我們設定該參數時可以將它理解成一個TaskManager可以利用的CPU核心數,因此也要根據實際情況(集群的CPU資源和作業的計算量)來確定。

確定TaskManager數

以Flink自帶示例中簡化的WordCount程序為例:

    // 執行環境並行度設為6 env.setParallelism(6); // Source並行度為1 DataStream<String> text = env .readTextFile(params.get("input")) .setParallelism(1); DataStream<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(0) .sum(1); counts.print(); 

--yarnslots 3參數來執行,即每個TaskManager分配3個任務槽。TaskManager、任務槽和任務的分布將如下圖所示,方括號內的數字為並行線程的編號。

 
圖來自http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/,致敬

由圖中可以看出,由於算子鏈機制的存在,KeyAgg與Sink操作鏈接在了一起,作為一個Task來執行。

Flink允許任務槽共享,即來自同一個Job的不同Task的Sub-Task(理解為Task的子集就行)進入同一個槽位,因此在圖中也可以見到任務槽X中同時存在FlatMap[X]與KeyAgg[X]+Sink[X]。任務槽共享有兩點好處:

  • 能夠讓每個Task的Sub-Task都均攤到不同的TaskManager,避免負載傾斜。
  • 不需要再計算App一共需要起多少個Task,因為作業需要的任務槽數量肯定等於Job中最大的並行度。

所以,可以得出Flink on YARN時,TaskManager的數量就是:max(parallelism) / yarnslots(向上取整)。例如,一個最大並行度為10,每個TaskManager有兩個任務槽的作業,就會啟動5個TaskManager,如Web UI所示。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM