flink taskmanager&slots&並行度&任務鏈&task分配詳解


TaskManger與Slots

Flink中每一個worker(TaskManager)都是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制(一個worker至少有一個task slot)。

每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內存分成三份給各個slot。資源slot化意味着一個subtask將不需要跟來自其他job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。

通過調整task slot的數量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味着每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(基於多路復用)和心跳消息。它們也可能共享數據集和數據結構,因此這減少了每個task的負載。

默認情況下,Flink允許子任務共享slot,即使它們是不同任務的子任務(前提是它們來自同一個job)。 這樣的結果是,一個slot可以保存作業的整個管道。

 

Task Slot是靜態的概念,是指TaskManager具有的並發執行能力,可以通過參數taskmanager.numberOfTaskSlots進行配置;而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的並發能力,可以通過參數parallelism.default進行配置。

也就是說,假設一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設置parallelism.default=1,即運行程序默認的並行度為1,9個TaskSlot只用了1個,有8個空閑,因此,設置合適的並行度才能提高效率。

 

flink 中並行任務的分配

  • Flink 中每一個 TaskManager 都是一個JVM進程,它可能會在獨立的線程上執行一個或多個 subtask
  • 為了控制一個 TaskManager 能接收多少個 task, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)

slot 主要隔離內存,cpu 是slot之間共享的。也就是說4核的機器 ,內存足夠,可以把slot設置為8。最多能同時運行8個任務。建議一個核心數分配一個slot

這種圖中 source、map 合成的task的並行度為6
keyby 、window、apply合成的task的並行度為6
sink的並行度為1
總共有13個task
但是不是需要13個slot才能滿足這個並行度的要求

不同的算子操作復雜度不同
我們可以稱像source map sink 這種 計算不復雜的算子稱為非資源密集型的算子 aggregate reduce sum window 這種計算復雜的算子稱為為資源密集型的算子

如果把這兩種算子的優先級看作相同,平等的分配到slo中,當數據流source 來的數據速率相同時,會造成有些slot一直在跑復雜的算子,一直在運行中,當時一直跑簡單算子的slot就會很空閑。

flink 這里是 非資源密集型的 算子和資源密集型的算子可以分配到同一個slot中 ,這樣所有的slot之間任務就會平等,不會存在一直空閑一直高負載。

一個task的並行度是6 就會分為6個並行的task來跑,這六個task不能分配到同一個slot中必須一個slot只有一個。 也就是說 當你的集群的slot只有6 ,你不能設置算子的 並行度超過6。

flink 也能做到把非資源密集型和資源密集型的算子分到不同的slot中 這里需要設置共享組,非資源 密集型 的算子在一個共享組,資源密集 型的算子在一個共享組,這樣這兩種算子就不會共享的使用slot。默認情況下算有算子都屬於同一個共享組,共享所有slot。

默認情況下,Flink 允許子任務共享 slot,即使它們是不同任務的子任務但是可以分配到同一個slot上。 這樣的結果是,一個 slot 可以保存多個作業的整個管道
Task Slot 是靜態的概念,是指 TaskManager 具有的並發執行能力 。
下面看幾個例子

 

 

 

 

 

 

 

並行可以分為兩個方面

  • 數據並行

source 並行拉數據 map 並行處理數據

  • 計算並行

source 在拉新數據,map 在處理source 之前拉的數據
兩個 job 的並行執行

一個特定算子的 子任務(subtask)的個數被稱之為其並行度(parallelism)。
一般情況下,一個 stream 的並行度,可以認為就是其所有算子中最大的並行度


idea里運行flink程序默認並行度是運行程序機器的核心數量。

每一個算子都可以單獨設置並行。

.map((_, 1)).setParallelism(2)

也可以全局指定並行度。

val env = ExecutionEnvironment.getExecutionEnvironment.setParallelism(2)
此時不支持並行的算子 比如env.readTextFile(inputpath) 就會報錯
具體情況調整source和sink的並行度

val env = ExecutionEnvironment.getExecutionEnvironment.setParallelism(2)
此時不支持並行的算子 比如env.readTextFile(inputpath) 就會報錯
具體情況調整source和sink的並行度

三個位置可以配置並行度

  • flink配置文件中
  • 代碼里
  • flink任務提交時

優先級

代碼>提交>配置文件

代碼里設置用代碼里的,代碼里沒設置用提交時設置的,都沒設置用配置文件中的配置。
代碼里算子單獨設置優先級高於全局設置優先級

可以設置共享組 把 task 盡量均勻的分配到整個集群中

任務鏈
合理的設置並行度

  • 減少本地通信的開銷
  • 減少序列化和反序列化

把多個算子合並為一個task,原本的算子成為里面的subtask

滿足任務鏈需要一下條件

  • 算子具有相同並行度(具有相同的分區數)
  • 算子屬於one-to-one

one-to-one :stream維護着分區以及元素的順序(比如source和map之間)。這意味着map 算子的子任務看到的元素的個數以及順序跟 source 算子的子任務生產的元素的個數、順序相同。map、fliter、flatMap等算子都是one-to-one的對應關系。

Redistributing:stream的分區會發生改變。每一個算子的子任務依據所選擇的transformation發送數據到不同的目標任務。例如,keyBy 基於 hashCode 重分區、而 broadcast 和 rebalance 會隨機重新分區,這些算子都會引起redistribute過程,而 redistribute 過程就類似於 Spark 中的 shuffle 過程。

並行度不同的算子之前傳遞數據會進行重分區,Redistributing類型的算子也會進行重分區。

例子

配置文件中默認並行度設置為2 ,提交代碼是並行度設置為2
socket source 並行度只能是1
flatmap fliter map 並行度都是2 且屬於one-to-one 合成任務鏈
keyby 屬於redistrubuting hash 重分區
sum print 並行度為2 屬於one-to-one
執行圖如下

當然還可以禁止掉合成任務鏈

單個算子不參與合成任務鏈

.flatMap(_.split(" ")).disableChaining()

從單個算子開啟一個新的任務鏈

.startNewChain()

全局不合成任務鏈

env.disableOperatorChaining()

下面是一個全局不合成任務鏈的job執行圖,只是在上一個例子的基礎上添加了全局不合成任務鏈。

算子設置並行度

  • source 文件保證數順序需要並行度為 1

 

  •  sink 只輸出到一個文件需要並行度為 1

  • socketsource 並行度只能為1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM