Flink中的計算資源
首先理解Flink中的計算資源的核心概念,比如Slot、Chain、Task等,這有助於我們快速定位生產中的問題。
Task Slot
Flink都是以集群在運行,在運行的過程中包含兩類進程,其中之一就是TaskManager。 在Flink集群中,一個TaskManager就是一個JVM進程,並且會用獨立的現場來執行task,為了控制一個TaskManager能接受多少個task,Flink提出了Task Slot的概念。
Slot共享
默認情況下,Flink還允許同一個Job的子任務共享slot。因為在一個Flink任務中,有很多算子,這些算子的計算壓力各不相同,比如簡單的map和filter算子所需要的資源不多,但是有些算子比如window、group by則需要更多的計算資源才能滿足計算所需。這時那些資源需求大的算子就可以共用其他slot,提高整個集群的資源利用率。
Operator Chain
此外Flink自身會把不同的算子的task連接在一起組成一個新的task。這么做時因為Flink本身提供了非常有效的任務優化手段,因為task是在同一個線程中執行,那么可以有效減少線程間上下文的切換,並且減少序列化/反序列化帶來的資源消耗,從而在整體上提高我們任務的吞吐量。
並行度
Flink使用並行度來定義某一個算子被切分成多少個子任務。Flink代碼會被轉換成邏輯是圖,在實際運行時根據用戶的並行度設置會被轉換成對應的子任務進行執行。
源碼解析
Flink Job在執行中會通過SlotProvider向ResourceManager申請資源,RM負責協調TaskManager,滿足JobManager的資源請求。
整體的類圖如上所述,SlotProvider中的allocateSlot方法負責向SlotPool申請可用的slot資源,通過returnLogicSlot將空閑的slot釋放至SlotPool。 在整個Flink的資源管理的類中,核心的類包括Scheduler、SlotPool、JobMaster。他們之間的交互流程主要:Scheduler調度器向SlotPool資源池申請和釋放slot;如果SlotPool不能滿足需求,那么會向ResourceManager發起申請;獲取到的資源通過JobMaster分配給SlotPool。
如何設置並行度
Flink本身支持不同級別來設置我們任務並行度的方法,他們分別是:
-
算子級別
-
環境級別
-
客戶端級別
-
集群配置級別
算子級別
在編寫Flink程序時,可以在代碼中顯示的制定不同算子的並行度。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = ...
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.setParallelism(10)
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(1);
wordCounts.print();
env.execute("word count");
如上,可以通過顯示的調用setParallelism()方法來顯示的指定每個算子的並行度配置。 在實際生產中,推薦在算子級別顯示指定各自的並行度,方便進行顯示和精確的資源控制。
環境級別
環境級別的並行度設置指的是可以通過調用env.setParallelism()方法來這是整個任務的並行度:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
...
一旦設置了這個參數,表明任務中的所有算子的並行度都是指定的值,生產環境中不推薦。
客戶端級別
可以在使用命令提交Flink Job的時候指定並行度,當任務執行時發現代碼中沒有設置並行度,便會采用提交命令時的參數。 通過 -p 命令來指定提交任務時候的並行度:
./bin/flink run -p 5 ../wordCount-java*.jar
集群配置級別
在flink-conf.yaml文件中有一個參數parallelism.default。該參數會在用戶不設置任何其他的並行度配置時生效:
parallelism.default:1
需要特別指出的是,設置並行度的優先度依次是: