Flink並行度


並行執行

本節介紹如何在Flink中配置程序的並行執行。FLink程序由多個任務(轉換/操作符、數據源和sinks)組成。任務被分成多個並行實例來執行,每個並行實例處理任務的輸入數據的子集。任務的並行實例的數量稱之為並行性。

 

如果要使用保存點,還應該考慮設置最大並行性(或最大並行性)。當從保存點還原時,可以改變特定運算符或整個程序的並行性,並且該設置指定並行性的上限。這是必需的,因為FLINK內部將狀態划分為key-groups,並且我們不能擁有+INF的key-group數,因為這將對性能有害。

Flink中人物的並行度可以從多個不同層面設置:

 

 

 

 

 

操作算子層

操作算子,數據源,數據接收器等這些並行度都可以通過調用他們的setParallelism()方法設置。例如:

val env = StreamExecutionEnvironment.getExecutionEnvironment

 

val text = [...]

val wordCounts = text

? ?.flatMap{ _.split(" ") map { (_, 1) } }

? ?.keyBy(0)

? ?.timeWindow(Time.seconds(5))

? ?.sum(1).setParallelism(5)

wordCounts.print()

 

env.execute("Word Count Example")

執行環境層面

flink程序執行需要執行環境上下文。執行環境為其要執行的操作算子,數據源,數據sinks都是設置了默認的並行度。執行環境的並行度可以通過操作算子顯示指定並行度來覆蓋掉。

默認的執行環境並行度可以通過調用setParallelism()來設置。例如,操作算子,數據源,數據接收器,並行度都設置為3,那么在執行環境層面,設置方式如下:

 

 

 

 

 

 

 

 

 

 

客戶端層

在提交job 到flink的時候,在客戶端側也可以設置flink的並行度。客戶端即可以是java工程,也可以是scala工程。Flink的Command-line Interface (CLI)就是這樣一種客戶端。

在客戶端側flink可以通過-p參數來設置並行度。例如:

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/bin/flink run -p 10 https://blog.csdn.net/rlnLo2pNEfx9c/article/examples/*WordCount-java*.jar

在java/scala客戶端,並行度設置方式如下:

 

 

 

 

 

 

 

 

 

 

系統層面

系統層面的並行度設置,會針對所有的執行環境生效,可以通過parallelism.default,屬性在conf/flink-conf.yaml文件中設置。

 

設置最大並行度

設置最大並行度,實際上調用的方法是setMaxParallelism(),其調用位置和setParallelism()一樣。

默認的最大並行度是近似於operatorParallelism + (operatorParallelism / 2),下限是127,上線是32768.

 

值得注意的是將最大的並行的設置為超級大的數可能會對性能造成不利的影響,雅思6.5因為一些狀態后端是必須要保存內部數據結構的,這個數據結構跟key-group數量相匹配(這是可重定狀態的內部實現機制)。

配置taskmanagerslot

flink通過將項目分成tasks,來實現並行的執行項目,划分的tasks會被發到slot去處理。

集群中Flink的taskmanager提供處理slot。Slots數量最合適的是跟taskmanager的cores數量成正比。當然,taskmanager.numberOfTaskSlots的推薦值就是cpu核心的數目。

當啟動一個任務的時候,我們可以為其提供默認的slot數目,其實也即是flink工程的並行度,設置方式在上面已經有詳細介紹。

 

640?wx_fmt=png

640?wx_fmt=png

 

推薦閱讀

Flink:動態表上的連續查詢

640?wx_fmt=png


文章來源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/80809738


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM