Flink學習筆記:Flink Runtime


 

本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

 

Flink大數據項目實戰:http://t.cn/EJtKhaz

1. Flink運行時架構

1.1Flink架構

Flink 運行時架構主要包含幾個部分:Client、JobManager(master節點)和TaskManger(slave節點)。

 

Client:Flink 作業在哪台機器上面提交,那么當前機器稱之為Client。用戶開發的Program 代碼,它會構建出DataFlow graph,然后通過Client提交給JobManager。

JobManager:是主(master)節點,相當於YARN里面的REsourceManager,生成環境中一般可以做HA 高可用。JobManager會將任務進行拆分,調度到TaskManager上面執行。

TaskManager:是從節點(slave),TaskManager才是真正實現task的部分。

Client提交作業到JobManager,就需要跟JobManager進行通信,它使用Akka框架或者庫進行通信,另外Client與JobManager進行數據交互,使用的是Netty框架。Akka通信基於Actor System,Client可以向JobManager發送指令,比如Submit job或者Cancel /update job。JobManager也可以反饋信息給Client,比如status updates,Statistics和results 

Client提交給JobManager的是一個Job,然后JobManager將Job拆分成task,提交給TaskManager(worker)。JobManager與TaskManager也是基於Akka進行通信,JobManager發送指令,比如Deploy/Stop/Cancel Tasks或者觸發Checkpoint,反過來TaskManager也會跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之間的數據通過網絡進行傳輸,比如Data Stream做一些算子的操作,數據往往需要在TaskManager之間做數據傳輸。

1.2. TaskManger Slot

 

TaskManager是進程,他下面運行的task(整個Flink應用是Job,Job可以拆分成很多個task)是線程,每個task/subtask(線程)下可運行一個或者多個operator,即OperatorChain。Task是class,抽象的,subtask是Object(類比學習),具體的。

一個TaskManager通過Slot(任務槽)來控制它上面可以接受多少個task,比如一個TaskManager划分了3個Task Slot(僅限內存托管,目前CPU未做隔離),它只能接受3個task。Slot均分TaskManager所托管的內存,比如一個TaskManager有6G內存,那么每個Slot分配2G。

同一個TaskManager中的task共享TCP連接(通過多路復用)和心跳消息。它們還可以共享數據集和數據結構,從而減少每個任務的開銷。一個TaskManager有N個槽位只能接受N個Task嗎?不是,后面會講共享槽位。

1.3. OperatorChain && Task

為了更高效地分布式執行,Flink會盡可能地將operator的subtask鏈接(chain)在一起形成task。以wordcount為例,解析不同視圖下的數據流,如下圖所示。

 

 

 

數據流(邏輯視圖)

創建Source(並行度設置為1)讀取數據源,數據經過FlatMap(並行度設置為2)做轉換操作,然后數據經過Key Agg(並行度設置為2)做聚合操作,最后數據經過Sink(並行度設置為2)將數據輸出。

數據流(並行化視圖)

並行度為1的Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給並行度為2的Key Agg進行聚合操作,然后並行度為2的Sink將數據輸出,未優化前的task總和為7。

數據流(優化后視圖)

並行度為1的Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給Key Agg進行聚合操作,此時Key Agg和Sink操作合並為一個task(注意:將KeyAgg和Sink兩個operator進行了合並,因為這兩個合並后並不會改變整體的拓撲結構),它們一起的並行度為2,數據經過Key Agg和Sink之后將數據輸出,優化后的task總和為5.

1.4. OperatorChain的優點和組成條件

OperatorChain的優點

1.減少線程切換

2.減少序列化與反序列化

3.減少數據在緩沖區的交換

4.減少延遲並且提高吞吐能力

OperatorChain 組成條件

1.沒有禁用Chain

2.上下游算子並行度一致 。

3.下游算子的入度為1(也就是說下游節點沒有來自其他節點的輸入)。

4.上下游算子在同一個slot group(后面緊跟着就會講如何通過slot group先分配到同一個solt,然后才能chain) 。

5.下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)。

6.上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)。

7.上下游算子之間沒有數據shuffle (數據分區方式是 forward)。

1.5. 編程改變OperatorChain行為

Operator chain的行為可以通過編程API中進行指定,可以通過在DataStream的operator后面(如someStream.map(..))調用startNewChain()來指示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。可以調用disableChaining()來指示該operator不參與chaining(不會與前后的operator chain一起)。可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。可以設置Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通過調整並行度,來調整Operator chain。

2. Slot分配與共享

2.1共享Slot

 

 

默認情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。

允許slot共享有以下兩點好處:

1.Flink集群需要的任務槽與作業中使用的最高並行度正好相同(前提,保持默認SlotSharingGroup)。也就是說我們不需要再去計算一個程序總共會起多少個task了。

2.更容易獲得更充分的資源利用。如果沒有slot共享,那么非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,將task的2個並行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks。

2.2共享Slot實例

 

將 WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共享(所有operator都在default共享組),將得到如上圖所示的slot分布圖。

首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6個slots(最高並行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager。

2.3 SlotSharingGroup(soft)

SlotSharingGroup是Flink中用來實現slot共享的類,它盡可能地讓subtasks共享一個slot。

保證同一個group的並行度相同的sub-tasks 共享同一個slots。算子的默認group為default(即默認一個job下的subtask都可以共享一個slot)

為了防止不合理的共享,用戶也能通過API來強制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。怎么確定一個未做SlotSharingGroup設置算子的SlotSharingGroup什么呢(根據上游算子的group 和自身是否設置group共同確定)。適當設置可以減少每個slot運行的線程數,從而整體上減少機器的負載。

 

 

2.4 CoLocationGroup(強制)

CoLocationGroup可以保證所有的並行度相同的sub-tasks運行在同一個slot,主要用於迭代流(訓練機器學習模型)。

3. Slot & parallelism的關系

3.1 Slots && parallelism

 

 

如上圖所示,有兩個TaskManager,每個TaskManager有3個槽位。假設source操作並行度為3,map操作的並行度為4,sink的並行度為4,所需的task slots數與job中task的最高並行度一致,最高並行度為4,那么使用的Slot也為4。

3.2如何計算Slot

如何計算一個應用需要多少slot?

 

 

如果不設置SlotSharingGroup,那么需要的Slot數為應用的最大並行度數。如果設置了SlotSharingGroup,那么需要的Slot數為所有SlotSharingGroup中的最大並行度之和。比如已經強制指定了map的slot共享組為test,那么map和map下游的組為test,map的上游source的組為默認的default,此時default組中最大並行度為10,test組中最大並行度為20,那么需要的Slot=10+20=30。

4.Flink部署模式

4.1 Local 本地部署

Flink 可以運行在 Linux、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的只是 Java 1.7.x或更高版本,本地運行會啟動Single JVM,主要用於測試調試代碼。

4.2 Standalone Cluster集群部署

軟件需求

1.安裝Java1.8或者更高版本

2.集群各個節點需要ssh免密登錄

 

Flink Standalone 運行流程前面已經講過,這里就不在贅敘。

4.3Flink ON YARN

 

Flink ON YARN工作流程如下所示:

首先提交job給YARN,就需要有一個Flink YARN Client。

第一步:Client將Flink 應用jar包和配置文件上傳到HDFS。

第二步:Client向REsourceManager注冊resources和請求APPMaster  Container

第三步:REsourceManager就會給某一個Worker節點分配一個Container來啟動APPMaster,JobManager會在APPMaster中啟動。

第四步:APPMaster為Flink的TaskManagers分配容器並啟動TaskManager,TaskManager內部會划分很多個Slot,它會自動從HDFS下載jar文件和修改后的配置,然后運行相應的task。TaskManager也會與APPMaster中的JobManager進行交互,維持心跳等。

5.Flink Standalone集群部署

安裝Flink之前需要提前安裝好JDK,這里我們安裝的是JDK1.8版本。

 

5.1下載

可以到官網:https://archive.apache.org/dist/flink/ 將Flink1.6.2版本下載到本地。

 

5.2解壓

將下載的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節點

 

使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解壓flink安裝包

 

方便后期flink多版本的使用,可以創建flink軟連接

ln -s flink-1.6.2 flink

 

5.3配置環境變量

vi ~/.bashrc

export FLINK_HOME=/home/hadoop/app/flink

export PATH=$FLINK_HOME/bin:$PATH

使配置文件生效

source ~/.bashrc

查看flink版本

flink -v

5.4修改配置文件

1.修改flink-conf.yaml配置文件

vi flink-conf.yaml

#JobManager地址

jobmanager.rpc.address: cdh01

#槽位配置為3

taskmanager.numberOfTaskSlots: 3

#設置並行度為3

parallelism.default: 3

2.修改masters配置

vi masters

cdh01:8081

 

 

3.修改slaves配置

vi slaves

cdh01

cdh02

cdh03

5.5主節點安裝目錄同步到從節點

通過deploy.sh腳本將flink安裝目錄同步到其他節點。

deploy.sh flink-1.6.2 /home/hadoop/app/ slave

在從節點分別創建flink軟連接

ln -s flink-1.6.2 flink

5.6啟動服務

進入flink bin目錄執行啟動集群腳本start-cluster.sh

bin/start-cluster.sh

 

通過web查看flink集群,查看相關集群信息。

http://cdh01:8081

 

5.7測試運行

查看官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

1.啟動nc服務

nc -l 9000

2.提交flink作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

3.輸入測試數據

 

4.查看運行結果

在TaskManager界面查看Flink運行結果

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM