本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
1. Flink運行時架構
1.1Flink架構
Flink 運行時架構主要包含幾個部分:Client、JobManager(master節點)和TaskManger(slave節點)。
Client:Flink 作業在哪台機器上面提交,那么當前機器稱之為Client。用戶開發的Program 代碼,它會構建出DataFlow graph,然后通過Client提交給JobManager。
JobManager:是主(master)節點,相當於YARN里面的REsourceManager,生成環境中一般可以做HA 高可用。JobManager會將任務進行拆分,調度到TaskManager上面執行。
TaskManager:是從節點(slave),TaskManager才是真正實現task的部分。
Client提交作業到JobManager,就需要跟JobManager進行通信,它使用Akka框架或者庫進行通信,另外Client與JobManager進行數據交互,使用的是Netty框架。Akka通信基於Actor System,Client可以向JobManager發送指令,比如Submit job或者Cancel /update job。JobManager也可以反饋信息給Client,比如status updates,Statistics和results
Client提交給JobManager的是一個Job,然后JobManager將Job拆分成task,提交給TaskManager(worker)。JobManager與TaskManager也是基於Akka進行通信,JobManager發送指令,比如Deploy/Stop/Cancel Tasks或者觸發Checkpoint,反過來TaskManager也會跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之間的數據通過網絡進行傳輸,比如Data Stream做一些算子的操作,數據往往需要在TaskManager之間做數據傳輸。
1.2. TaskManger Slot
TaskManager是進程,他下面運行的task(整個Flink應用是Job,Job可以拆分成很多個task)是線程,每個task/subtask(線程)下可運行一個或者多個operator,即OperatorChain。Task是class,抽象的,subtask是Object(類比學習),具體的。
一個TaskManager通過Slot(任務槽)來控制它上面可以接受多少個task,比如一個TaskManager划分了3個Task Slot(僅限內存托管,目前CPU未做隔離),它只能接受3個task。Slot均分TaskManager所托管的內存,比如一個TaskManager有6G內存,那么每個Slot分配2G。
同一個TaskManager中的task共享TCP連接(通過多路復用)和心跳消息。它們還可以共享數據集和數據結構,從而減少每個任務的開銷。一個TaskManager有N個槽位只能接受N個Task嗎?不是,后面會講共享槽位。
1.3. OperatorChain && Task
為了更高效地分布式執行,Flink會盡可能地將operator的subtask鏈接(chain)在一起形成task。以wordcount為例,解析不同視圖下的數據流,如下圖所示。
數據流(邏輯視圖)
創建Source(並行度設置為1)讀取數據源,數據經過FlatMap(並行度設置為2)做轉換操作,然后數據經過Key Agg(並行度設置為2)做聚合操作,最后數據經過Sink(並行度設置為2)將數據輸出。
數據流(並行化視圖)
並行度為1的Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給並行度為2的Key Agg進行聚合操作,然后並行度為2的Sink將數據輸出,未優化前的task總和為7。
數據流(優化后視圖)
並行度為1的Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給Key Agg進行聚合操作,此時Key Agg和Sink操作合並為一個task(注意:將KeyAgg和Sink兩個operator進行了合並,因為這兩個合並后並不會改變整體的拓撲結構),它們一起的並行度為2,數據經過Key Agg和Sink之后將數據輸出,優化后的task總和為5.
1.4. OperatorChain的優點和組成條件
OperatorChain的優點
1.減少線程切換
2.減少序列化與反序列化
3.減少數據在緩沖區的交換
4.減少延遲並且提高吞吐能力
OperatorChain 組成條件
1.沒有禁用Chain
2.上下游算子並行度一致 。
3.下游算子的入度為1(也就是說下游節點沒有來自其他節點的輸入)。
4.上下游算子在同一個slot group(后面緊跟着就會講如何通過slot group先分配到同一個solt,然后才能chain) 。
5.下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)。
6.上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)。
7.上下游算子之間沒有數據shuffle (數據分區方式是 forward)。
1.5. 編程改變OperatorChain行為
Operator chain的行為可以通過編程API中進行指定,可以通過在DataStream的operator后面(如someStream.map(..))調用startNewChain()來指示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。可以調用disableChaining()來指示該operator不參與chaining(不會與前后的operator chain一起)。可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。可以設置Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通過調整並行度,來調整Operator chain。
2. Slot分配與共享
2.1共享Slot
默認情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。
允許slot共享有以下兩點好處:
1.Flink集群需要的任務槽與作業中使用的最高並行度正好相同(前提,保持默認SlotSharingGroup)。也就是說我們不需要再去計算一個程序總共會起多少個task了。
2.更容易獲得更充分的資源利用。如果沒有slot共享,那么非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,將task的2個並行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks。
2.2共享Slot實例
將 WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共享(所有operator都在default共享組),將得到如上圖所示的slot分布圖。
首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6個slots(最高並行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager。
2.3 SlotSharingGroup(soft)
SlotSharingGroup是Flink中用來實現slot共享的類,它盡可能地讓subtasks共享一個slot。
保證同一個group的並行度相同的sub-tasks 共享同一個slots。算子的默認group為default(即默認一個job下的subtask都可以共享一個slot)
為了防止不合理的共享,用戶也能通過API來強制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。怎么確定一個未做SlotSharingGroup設置算子的SlotSharingGroup什么呢(根據上游算子的group 和自身是否設置group共同確定)。適當設置可以減少每個slot運行的線程數,從而整體上減少機器的負載。
2.4 CoLocationGroup(強制)
CoLocationGroup可以保證所有的並行度相同的sub-tasks運行在同一個slot,主要用於迭代流(訓練機器學習模型)。
3. Slot & parallelism的關系
3.1 Slots && parallelism
如上圖所示,有兩個TaskManager,每個TaskManager有3個槽位。假設source操作並行度為3,map操作的並行度為4,sink的並行度為4,所需的task slots數與job中task的最高並行度一致,最高並行度為4,那么使用的Slot也為4。
3.2如何計算Slot
如何計算一個應用需要多少slot?
如果不設置SlotSharingGroup,那么需要的Slot數為應用的最大並行度數。如果設置了SlotSharingGroup,那么需要的Slot數為所有SlotSharingGroup中的最大並行度之和。比如已經強制指定了map的slot共享組為test,那么map和map下游的組為test,map的上游source的組為默認的default,此時default組中最大並行度為10,test組中最大並行度為20,那么需要的Slot=10+20=30。
4.Flink部署模式
4.1 Local 本地部署
Flink 可以運行在 Linux、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的只是 Java 1.7.x或更高版本,本地運行會啟動Single JVM,主要用於測試調試代碼。
4.2 Standalone Cluster集群部署
軟件需求
1.安裝Java1.8或者更高版本
2.集群各個節點需要ssh免密登錄
Flink Standalone 運行流程前面已經講過,這里就不在贅敘。
4.3Flink ON YARN
Flink ON YARN工作流程如下所示:
首先提交job給YARN,就需要有一個Flink YARN Client。
第一步:Client將Flink 應用jar包和配置文件上傳到HDFS。
第二步:Client向REsourceManager注冊resources和請求APPMaster Container
第三步:REsourceManager就會給某一個Worker節點分配一個Container來啟動APPMaster,JobManager會在APPMaster中啟動。
第四步:APPMaster為Flink的TaskManagers分配容器並啟動TaskManager,TaskManager內部會划分很多個Slot,它會自動從HDFS下載jar文件和修改后的配置,然后運行相應的task。TaskManager也會與APPMaster中的JobManager進行交互,維持心跳等。
5.Flink Standalone集群部署
安裝Flink之前需要提前安裝好JDK,這里我們安裝的是JDK1.8版本。
5.1下載
可以到官網:https://archive.apache.org/dist/flink/ 將Flink1.6.2版本下載到本地。
5.2解壓
將下載的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節點
使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解壓flink安裝包
方便后期flink多版本的使用,可以創建flink軟連接
ln -s flink-1.6.2 flink
5.3配置環境變量
vi ~/.bashrc
export FLINK_HOME=/home/hadoop/app/flink
export PATH=$FLINK_HOME/bin:$PATH
使配置文件生效
source ~/.bashrc
查看flink版本
flink -v
5.4修改配置文件
1.修改flink-conf.yaml配置文件
vi flink-conf.yaml
#JobManager地址
jobmanager.rpc.address: cdh01
#槽位配置為3
taskmanager.numberOfTaskSlots: 3
#設置並行度為3
parallelism.default: 3
2.修改masters配置
vi masters
cdh01:8081
3.修改slaves配置
vi slaves
cdh01
cdh02
cdh03
5.5主節點安裝目錄同步到從節點
通過deploy.sh腳本將flink安裝目錄同步到其他節點。
deploy.sh flink-1.6.2 /home/hadoop/app/ slave
在從節點分別創建flink軟連接
ln -s flink-1.6.2 flink
5.6啟動服務
進入flink bin目錄執行啟動集群腳本start-cluster.sh
bin/start-cluster.sh
通過web查看flink集群,查看相關集群信息。
5.7測試運行
查看官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.啟動nc服務
nc -l 9000
2.提交flink作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
3.輸入測試數據
4.查看運行結果
在TaskManager界面查看Flink運行結果