flink任務並行


Slot和TaskManager

  • 首先Flink中每個真正執行任務的taskManager都是一個JVM進程,其在多線程環境中執行一個或者多個子任務,執行的任務可以看成一個線程,線程所占據的資源可以看做是slot。
  • 那么為了控制一個JVM同時能運行的任務數量,flink引入了task slot的概念,每一個slot能獨立執行某個任務。
  • 每一個task solt代表了taskManager資源的一個子集,比如,一個擁有3個solt的TaskManager,每一個solt可以使用 1/3 taskManager所管理的內存。
  • 進行資源分割意味着為子任務保留足夠的內存,從而避免與其他子任務進行競爭。注意:當前solt還不能分割cpu資源,僅僅對當前taskManager的內存進行了分割。
  • slots控制着靜態的並行執行任務的能力,即使slot的數量非常多,但可以不用,主要還是看動態的並行度設置的多少
  • 一般來說動態的並行度要小於可用的靜態的slot個數

並行度 Parallelism

  • 一個特定算子的子任務的個數被稱之為該算子的並行度,並且在代碼中可以顯示的指定該算子的並行度
  • 一個數據流的並行度,就是其所有算子中最大的並行度
  • 怎么設置並行度:
    • Flink集群配置文件 flink-conf.yaml parallelism.default:1 優先級最低
    • Flink Client(提交任務時設置)  -p 1 優先級其次
    • 代碼設置全局並行度 env.setParallelism(2) 也可以單個算子進行設置 優先級最高

Flink是怎么實現並行計算?

  • 就是對算子設置並行度即可,在不同slot之間是並行計算的

對於並行的任務,需要占用多少slot?

  • 當前任務的每一個並行的子任務都需要占用一個slot,那么所需的最少的slot個數為:不同slot組的最大任務並行度之和

Slot共享

  • Flink允許不同的子任務共享一個slot
  • 對於同一任務的子任務,需要分配到不同的slot上,但對於有先后順序的不同任務的子任務,slot允許進行子任務共享
  • slot共享更有益於資源的合理利用,不會造成資源的浪費,減少網絡IO,並行子任務分配到slot上是隨機的(默認隨機,可以自己設置)

一個流處理任務需要多少個Slot

  • 在Flink程序中,算子的最大並行度為需要的slots數量。只要有這么多slots,任務就能跑起來。

數據傳輸形式

由於不同算子,可能具有不同的並行度。算子之前的傳輸形式可以是 one-to-one(forwarding) 也可以是 redistributing的模式
  • one-to-one:Stream維護着分區以及元素的順序。比如map和filter 如果這兩個算子的並行度是2的話,那么他們之間便是one-to-one一一對應的關系,數據之間不存在重分區操作 [類似於spark中的窄依賴]
    • 如果map和filter的並行度不同,也會產生redistributing,Flink內部會采取重分區的操作,默認是采用輪詢的方式來進行。
    • map、filter、flatmap等算子應該都是one-to-one對應關系
  • redistributing:涉及到的前后算子之間需要做重分區,Stream的分區會發生改變。 [redistributing類似於在spark中 前后算子是寬依賴]
    • 每一個算子的子任務依據所選擇的transformation發送數據到不同的目標任務。
    • 例如keyby算子基於hashcode進行重分區

合並任務

  • 前后算子並行度相同,且是one-to-one Flink將這樣的算子連接起來形成一個task
  • 這種技術被稱之為任務鏈的優化技術,可以減少本地通信的開銷,序列化與反序列化開銷

Flink 任務(task) 總數

  • 首先 所有的Flink程序都是由三部分組成的 source(數據來源) transformation(轉換操作,數據加工) sink(數據輸出) ,task總數 = (合並之后的task個數 * 合並之后task的並行度)之和

獨享slot

  • 某個算子因其邏輯太復雜了,非常耗費CPU,只能拿到一個slot里面執行,那么需要把該算子單獨拿出來到一個slot里面,而不進行合並
    • .filter.disableChaining() 表示當前filter算子不會和前后算子進行合並操作,會導致任務數量變多 但由於slot共享的限制,還不能做到獨享slot
  • 不是因為某個算子太復雜,而是因為任務數量過多,任務鏈太復雜,需要從中間進行拆分 一分為二 重新開始一個新的任務鏈
    • .map.startNewChain() 表示從map開始生成一個新的任務鏈
  • 以上兩種方式,任務不進行合並或是生成一個新的任務鏈並沒有實際意義,因為切分之后還是有可能會造成slot共享,所以拆開就變得沒有意義了
  • .flatMap.slotSharingGroup("a") 從當前的flatMap算子開始,往后的算子 都共享在slot組為a的slot里面,和前面的算子的slot組就分開了,然后不同slot組的任務,一定會被分配到不同的slot里面 。slotSharingGroup("default") 默認值 不設置的話默認就在default這個slot組
  • 全局切斷,env.disableOperatorChaining() 表示所有算子都不會進行合並操作
  • 所以slot數量為 不同slot組的最大並行度之和


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM