一、flink架構
1.1、集群模型和角色
如上圖所示:當 Flink 集群啟動后,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,然后 TaskManager 將心跳和統計信息匯報 給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。
- Client:
- 用戶在提交編寫好的 Flink 工程時,會先創建一個客戶端再進行提交,這個客戶端就是 Clien。可以是運行在任何機器上(與 JobManager 環境連通即可,一般在跳板機上)。提交 Job 后, Client 可以結束進程,也可以不結束並等待結果返回。
- Client 會根據用戶傳入的參數選擇使用 yarn per job 模式、stand-alone 模式還是 yarn-session 模式將 Flink 程序提交到集群。
- JobManager:
- 集群的管理者,負責調度任務,主要負責從 Client 處接收到 Job 和 JAR 包等資源后,會生成優化后的執行計划,並以 Task 的 單元調度到各個 TaskManager 去執行。
- 負責調度任務、協調 checkpoints、協調故障恢復、收集 Job 的狀態信息,並管理 Flink 集群中的從節點 TaskManager。
- TaskManager :
- 實際負責執行計算的 Worker,在其上執行 Flink Job 的一組 Task;TaskManager 還是所在節點的管理員,它負責把該節點上的服務器信息比如內存、磁盤、任務運行情況等向 JobManager 匯報。
- 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動后,與自己的上游建立 Netty 連接,接收數據並處理。
- flnik架構中的角色間的通信使用Akka(量小,數據大量傳輸時性能差),數據的傳輸使用Netty(量大,spark全部使用Netty通信)。
二、flink資源和資源組
在flink集群中,一個TaskManager就是一個JVM進程,並且會用獨立的線程來執行task,為了控制一個TaskManager能接受多少個task,Flink提出了Task Slot 的概念,
- 我們可以簡單的把 Task Slot 理解為 TaskManager 的計算資源子集。假如一個 TaskManager 擁有 5 個 slot,那么該 TaskManager 的計算資源會被平均分為 5 份,不同的 task 在不同的 slot 中執行,避免資源競爭。但是需要注意的是,slot 僅僅用來做內存的隔離,對 CPU 不起作用。
- 通過調整 task slot 的數量,用戶可以定義task之間是如何相互隔離的。每個 TaskManager 有一個slot,也就意味 着每個task運行在獨立的 JVM 中。每個 TaskManager 有多個slot的話,也就是說多個 task 運行在同一個JVM中。 而在同一個 JVM 進程中的 task,可以共享 TCP 連接(基於多路復用)和心跳消息,可以減少數據的網絡傳輸。也能 共享一些數據結構,一定程度上減少了每個task的消耗。
2.1、task的並行度
val wordCount2: DataStream[(String, Int)] = socket.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(int: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = int.split(" ") for (str <- strings) { out.collect((str, 1)) } } }).setParallelism(2) // 設置task的並行度/槽數 .keyBy(_._1).sum(1).setParallelism(2)
2.2、Operator Chains
為了更高效地分布式執行,Flink 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task。每個 task 在一個 線程中執行。將 operators 鏈接成 task 是非常有效的優化:
- 它能減少線程之間的切換;
- 減少消息的序列化/反序列化;
- 減少數據在緩沖區的交換;
- 減少了延遲的同時提高整體的吞吐量。
- 上下游的並行度一致(槽一致)。
- 下游節點的入度為1 (也就是說下游節點沒有來自其他節點的輸入) 。
- 上下游節點都在同一個 slot group 中。
- 下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS) 。
- 上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD) 。
- 上下游算子之間沒有數據shuffle (數據分區方式是 forward) 。
- 用戶沒有禁用 chain。
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.PublicEvolving; /** * Defines the chaining scheme for the operator. When an operator is chained to the * predecessor, it means that they run in the same thread. They become one operator * consisting of multiple steps. * * <p>The default value used by the StreamOperator is {@link #HEAD}, which means that * the operator is not chained to its predecessor. Most operators override this with * {@link #ALWAYS}, meaning they will be chained to predecessors whenever possible. */ @PublicEvolving public enum ChainingStrategy { /** * Operators will be eagerly chained whenever possible. * * <p>To optimize performance, it is generally a good practice to allow maximal * chaining and increase operator parallelism. */ ALWAYS, /** * The operator will not be chained to the preceding or succeeding operators. */ NEVER, /** * The operator will not be chained to the predecessor, but successors may chain to this * operator. */ HEAD }
代碼驗證:
- operator禁用chaining
- 全局禁用chaining
- 查看job的graph圖
原理與實現:
那么 Flink 是如何將多個 operators chain在一起的呢?chain在一起的operators是如何作為一個整體被執行的呢? 它們之間的數據流又是如何避免了序列化/反序列化以及網絡傳輸的呢?下圖展示了operators chain的內部實現:
如上圖所示,Flink內部是通過OperatorChain這個類來將多個operator鏈在一起形成一個新的operator。
OperatorChain形成的框框就像一個黑盒,Flink 無需知道黑盒中有多少個ChainOperator、數據在chain內部是怎么 流動的,只需要將input數據交給 HeadOperator 就可以了,這就使得OperatorChain在行為上與普通的operator無 差別,上面的OperaotrChain就可以看做是一個入度為1,出度為2的operator。所以在實現中,對外可見的只有 HeadOperator,以及與外部連通的實線輸出,這些輸出對應了JobGraph中的JobEdge,在底層通過 RecordWriterOutput來實現。另外,框中的虛線是operator chain內部的數據流,這個流內的數據不會經過序列化/ 反序列化、網絡傳輸,而是直接將消息對象傳遞給下游的 ChainOperator 處理,這是性能提升的關鍵點,在底層 是通過 ChainingOutput 實現的
2.3、SlotSharingGroup 與 CoLocationGroup
Flink 允許將不能形成算子鏈的兩個操作,比如下圖中的 flatmap 和 key&sink 放在一個 TaskSlot 里執行以達到資源共享的目的。
每一個 TaskManager 會擁有一個或多個的 task slot,每個 slot 都能跑由多個連續 task 組成的一個 pipeline。
如上文所述的 WordCount 例子,5個Task沒有solt共享的時候在TaskManager的slots中如下圖分布,2個 TaskManager,每個有3個slot:
默認情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot 持有該job的整個pipeline。允許slot共享有以下兩點好處:
- Flink 集群所需的task slots數與 job 中最高的並行度一致。
- 更容易獲得更充分的資源利用。如果沒有slot共享,那么非密集型操作 source/flatmap 就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,將基線的2個並行度增加到6個,能充分利用slot資源, 同時保證每個TaskManager能平均分配到相同數量的subtasks。
我們將 WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共享(所有operator都在 default共享組),將得到如上圖所示的slot分布圖。該任務最終會占用6個slots(最高並行度為6)。其次,我們可 以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager。
SlotSharingGroup:
- SlotSharingGroup 是 Flink 中用來實現slot共享的類,它盡可能地讓subtasks共享一個slot。
- 保證同一個group的並行度相同的sub-tasks 共享同一個slots
- 算子的默認 group 為 default(即默認一個job下的subtask都可以共享一個slot)
- 為了防止不合理的共享,用戶也能通過 API 來強制指定 operator 的共享組,比如: someStream.filter(...).slotSharingGroup("group1");就強制指定了 filter 的 slot 共享組為 group1。
- 怎么確定一個未做 SlotSharingGroup 設置的算子的 Group 是什么呢(根據上游算子的 group 和自身是否設置 group 共同確定)
- 適當設置可以減少每個 slot 運行的線程數,從而整體上減少機器的負載 CoLocationGroup(強制)
CoLocationGroup(強制):
- 保證所有的並行度相同的sub-tasks運行在同一個slot
- 主要用於迭代流(訓練機器學習模型)
代碼驗證:
- 設置本地開發環境的slot數量
- 設置最后的 operator 使用新的 group
- 為什么占用了兩個呢?
- 因為不同組,與上面的default不能共享slot,組間互斥。
- 同組中的同一個 operator 的 subtask 不能在一個 slot 中,由於 operator 的並行度是 2,所以占用了兩個槽 位,組內互斥。
原理與實現:
那么多個 tasks(或者說 operators )是如何共享 slot 的呢?
我們先來看一下用來定義計算資源的slot的類圖:
抽象類 Slot 定義了該槽位屬於哪個 TaskManager(instance)的第幾個槽位(slotNumber),屬於哪個 Job(jobID)等信息。最簡單的情況下,一個 slot 只持有一個task,也就是 SimpleSlot 的實現。復雜點的情況,一 個 slot 能共享給多個task使用,也就是 SharedSlot 的實現。
接下來我們來看看 Flink 為subtask分配slot的過程。關於Flink調度,有兩個非常重要的原則我們必須知道:
- 同一個 operator 的各個 subtask 是不能呆在同一個 SharedSlot 中的,例如 FlatMap[1] 和 FlatMap[2] 是不能在同一 個 SharedSlot 中的。
- Flink 是按照拓撲順序從 Source 一個個調度到 Sink 的。例如 WordCount(Source並行度為1,其他並行度為 2),那么調度的順序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg- >Sink[2]。假設現在有2個 TaskManager,每個只有1個slot(為簡化問題),那么分配slot的過程如圖所示:
注:圖中 SharedSlot 與 SimpleSlot 后帶的括號中的數字代表槽位號(slotNumber)
- 為Source分配slot。首先,我們從TaskManager1中分配出一個SharedSlot。並從SharedSlot中為Source分配 出一個SimpleSlot。如上圖中的①和②。
- 為FlatMap[1]分配slot。目前已經有一個SharedSlot,則從該SharedSlot中分配出一個SimpleSlot用來部署 FlatMap[1]。如上圖中的③。
- 為FlatMap[2]分配slot。由於TaskManager1的SharedSlot中已經有同operator的FlatMap[1]了,我們只能分配 到其他SharedSlot中去。從TaskManager2中分配出一個SharedSlot,並從該SharedSlot中為FlatMap[2]分配 出一個SimpleSlot。如上圖的④和⑤。
- 為Key->Sink[1]分配slot。目前兩個SharedSlot都符合條件,從TaskManager1的SharedSlot中分配出一個 SimpleSlot用來部署Key->Sink[1]。如上圖中的⑥。
- 為Key->Sink[2]分配slot。TaskManager1的SharedSlot中已經有同operator的Key->Sink[1]了,則只能選擇另 一個SharedSlot中分配出一個SimpleSlot用來部署Key->Sink[2]。如上圖中的⑦。
最后Source、FlatMap[1]、Key->Sink[1]這些subtask都會部署到TaskManager1的唯一一個slot中,並啟動對應的 線程。FlatMap[2]、Key->Sink[2]這些subtask都會被部署到TaskManager2的唯一一個slot中,並啟動對應的線 程。從而實現了slot共享。
Flink中計算資源的相關概念以及原理實現。最核心的是 Task Slot,每個slot能運行一個或多個task。為了拓撲更高 效地運行,Flink提出了Chaining,盡可能地將operators chain在一起作為一個task來處理。為了資源更充分的利 用,Flink又提出了SlotSharingGroup,盡可能地讓多個task共享一個slot。
如何計算一個應用需要多少slot:
- 不設置 SlotSharingGroup 時是應用的最大並行度,此時只有一個 default 組。
- 設置 SlotSharingGroup 時所有 SlotSharingGroup 中的最大並行度之和。
由於 source 和 map 之后的 operator 不屬於同一個 group ,所以 source 和它們不能在一個 solt 中運行,而這里的 source 的 default 組的並行度是10,test 組的並行度是20,所以所需槽位一共是30