1. Flink運行時組件
作業管理器(JobManager)
任務管理器(TaskManager)
資源管理器(ResourceManager)
分發器(Dispatcher)
2. 任務提交流程
Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置,之后向Yarn ResourceManager提交任務;
ResourceManager分配Container資源並通知對應的NodeManager啟動ApplicationMaster,ApplicationMaster啟動后加載Flink的Jar包和配置構建環境,然后啟動JobManager,之后ApplicationMaster向ResourceManager申請資源啟動TaskManager,ResourceManager分配Container資源后;
由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager,NodeManager加載Flink的Jar包和配置構建環境並啟動TaskManager,TaskManager啟動后向JobManager發送心跳包,並等待JobManager向其分配任務。
3. 任務調度原理
客戶端不是運行時和程序執行的一部分,但它用於准備並發送dataflow(JobGraph)給Master(JobManager),然后,客戶端斷開連接或者維持連接以等待接收計算結果。
當 Flink 集群啟動后,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,然后 TaskManager 將心跳和統計信息匯報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。
Client 為提交 Job 的客戶端,可以是運行在任何機器上(與 JobManager 環境連通即可)。提交 Job 后,Client 可以結束進程(Streaming的任務),也可以不結束並等待結果返回。
JobManager 主要負責調度 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后,會生成優化后的執行計划,並以 Task 的單元調度到各個 TaskManager 去執行。
TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動后,與自己的上游建立 Netty 連接,接收數據並處理。
執行圖:
Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
StreamGraph:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
JobGraph:StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,並不是一個具體的數據結構。
TaskManager和Slots
Flink中每一個TaskManager都是一個獨立的JVM進程,它可能會在獨立的線程上執行一個或者多個subtask;
為了控制一個TaskManager能接收多少個task,TaskManager通過task slot來控制(一個TaskManager至少有一個slot);
每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內存分成三份給各個slot。資源slot化意味着一個subtask將不需要跟來自其他job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。
通過調整task slot的數量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味着每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(基於多路復用)和心跳消息。它們也可能共享數據集和數據結構,因此這減少了每個task的負載。
默認情況下,Flink允許子任務共享slot,即使它們是不同任務的子任務。這樣的結果是,一個slot可以保存作業的整個管道。
Task Slot是靜態的概念,是指TaskManager具有的並發執行能力。可以通過參數taskmanager.numberOfTaskSlots進行配置;
並行度parallelism是動態概念,即TaskManager運行程序時實際使用的並發能力,可以通過參數parallelism.default進行配置。
也就是說,假設一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設置parallelism.default=1,即運行程序默認的並行度為1,9個TaskSlot只用了1個,有8個空閑,因此,設置合適的並行度才能提高效率。
程序與數據流
所有的Flink程序都是由三部分組成的: Source 、Transformation和Sink。
- Source負責讀取數據源,Transformation利用各種算子進行處理加工,Sink負責輸出。
- 在運行時,Flink上運行的程序會被映射成邏輯數據流streaming dataflows,它包含了這三部分。
- 每一個dataflow以一個或多個sources開始以一個或多個sinks結束。dataflow類似於任意的有向無環圖(DAG),當然特定形式的環可以通過iteration構建。
- 在大部分情況下,程序中的transformations跟dataflow中的operator是一一對應的關系,但有時候,一個transformation可能對應多個operator。
並行數據流
Flink程序的執行具有並行、分布式的特性。在執行過程中,一個 stream 包含一個或多個 stream partition ,而每一個 operator 包含一個或多個 operator subtask,這些operator subtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執行。
一個特定operator的subtask的個數被稱之為其parallelism(並行度)。一個stream的並行度總是等同於其producing operator的並行度。一個程序中,不同的operator可能具有不同的並行度。
Stream在operator之間傳輸數據的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決於operator的種類。
① One-to-one:stream(比如在source和map operator之間)維護着分區以及元素的順序。那意味着map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同,map、fliter、flatMap等算子都是one-to-one的對應關系。 類似於spark中的窄依賴
② Redistributing:stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區會發生改變。每一個operator subtask依據所選擇的transformation發送數據到不同的目標subtask。例如,keyBy() 基於hashCode重分區、broadcast和rebalance會隨機重新分區,這些算子都會引起redistribute過程,而redistribute過程就類似於Spark中的shuffle過程。 類似於spark中的寬依賴
任務鏈
Flink采用了一種稱為任務鏈的優化技術,可以在特定條件下減少本地通信的開銷。為了滿足任務鏈的要求,必須將兩個或多個算子設為相同的並行度,
並通過本地轉發(local forward)的方式進行連接。 相同並行度的one-to-one操作,Flink這樣相連的算子鏈接在一起形成一個task,原來的算子成為里面的subtask。
並行度相同、並且是one-to-one操作,兩個條件缺一不可。
相同並行度的one to one操作,Flink這樣相連的operator 鏈接在一起形成一個task,原來的operator成為里面的subtask。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換和基於緩存區的數據交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。