Slot和TaskManager
- 首先Flink中每個真正執行任務的taskManager都是一個JVM進程,其在多線程環境中執行一個或者多個子任務,執行的任務可以看成一個線程,線程所占據的資源可以看做是slot。
- 那么為了控制一個JVM同時能運行的任務數量,flink引入了task slot的概念,每一個slot能獨立執行某個任務。
- 每一個task solt代表了taskManager資源的一個子集,比如,一個擁有3個solt的TaskManager,每一個solt可以使用 1/3 taskManager所管理的內存。
- 進行資源分割意味着為子任務保留足夠的內存,從而避免與其他子任務進行競爭。注意:當前solt還不能分割cpu資源,僅僅對當前taskManager的內存進行了分割。
- slots控制着靜態的並行執行任務的能力,即使slot的數量非常多,但可以不用,主要還是看動態的並行度設置的多少
- 一般來說動態的並行度要小於可用的靜態的slot個數
並行度 Parallelism
- 一個特定算子的子任務的個數被稱之為該算子的並行度,並且在代碼中可以顯示的指定該算子的並行度
- 一個數據流的並行度,就是其所有算子中最大的並行度
- 怎么設置並行度:
- Flink集群配置文件 flink-conf.yaml parallelism.default:1 優先級最低
- Flink Client(提交任務時設置) -p 1 優先級其次
- 代碼設置全局並行度 env.setParallelism(2) 也可以單個算子進行設置 優先級最高
Flink是怎么實現並行計算?
- 就是對算子設置並行度即可,在不同slot之間是並行計算的
對於並行的任務,需要占用多少slot?
- 當前任務的每一個並行的子任務都需要占用一個slot,那么所需的最少的slot個數為:不同slot組的最大任務並行度之和
Slot共享
- Flink允許不同的子任務共享一個slot
- 對於同一任務的子任務,需要分配到不同的slot上,但對於有先后順序的不同任務的子任務,slot允許進行子任務共享
- slot共享更有益於資源的合理利用,不會造成資源的浪費,減少網絡IO,並行子任務分配到slot上是隨機的(默認隨機,可以自己設置)
一個流處理任務需要多少個Slot
- 在Flink程序中,算子的最大並行度為需要的slots數量。只要有這么多slots,任務就能跑起來。
數據傳輸形式
由於不同算子,可能具有不同的並行度。算子之前的傳輸形式可以是 one-to-one(forwarding) 也可以是 redistributing的模式
- one-to-one:Stream維護着分區以及元素的順序。比如map和filter 如果這兩個算子的並行度是2的話,那么他們之間便是one-to-one一一對應的關系,數據之間不存在重分區操作 [類似於spark中的窄依賴]
- 如果map和filter的並行度不同,也會產生redistributing,Flink內部會采取重分區的操作,默認是采用輪詢的方式來進行。
- map、filter、flatmap等算子應該都是one-to-one對應關系
- redistributing:涉及到的前后算子之間需要做重分區,Stream的分區會發生改變。 [redistributing類似於在spark中 前后算子是寬依賴]
- 每一個算子的子任務依據所選擇的transformation發送數據到不同的目標任務。
- 例如keyby算子基於hashcode進行重分區
合並任務
- 前后算子並行度相同,且是one-to-one Flink將這樣的算子連接起來形成一個task
- 這種技術被稱之為任務鏈的優化技術,可以減少本地通信的開銷,序列化與反序列化開銷
Flink 任務(task) 總數
- 首先 所有的Flink程序都是由三部分組成的 source(數據來源) transformation(轉換操作,數據加工) sink(數據輸出) ,task總數 = (合並之后的task個數 * 合並之后task的並行度)之和
獨享slot
- 某個算子因其邏輯太復雜了,非常耗費CPU,只能拿到一個slot里面執行,那么需要把該算子單獨拿出來到一個slot里面,而不進行合並
- .filter.disableChaining() 表示當前filter算子不會和前后算子進行合並操作,會導致任務數量變多 但由於slot共享的限制,還不能做到獨享slot
- 不是因為某個算子太復雜,而是因為任務數量過多,任務鏈太復雜,需要從中間進行拆分 一分為二 重新開始一個新的任務鏈
- .map.startNewChain() 表示從map開始生成一個新的任務鏈
- 以上兩種方式,任務不進行合並或是生成一個新的任務鏈並沒有實際意義,因為切分之后還是有可能會造成slot共享,所以拆開就變得沒有意義了
- .flatMap.slotSharingGroup("a") 從當前的flatMap算子開始,往后的算子 都共享在slot組為a的slot里面,和前面的算子的slot組就分開了,然后不同slot組的任務,一定會被分配到不同的slot里面 。slotSharingGroup("default") 默認值 不設置的話默認就在default這個slot組
- 全局切斷,env.disableOperatorChaining() 表示所有算子都不會進行合並操作
- 所以slot數量為 不同slot組的最大並行度之和