轉載自:https://www.toutiao.com/a6629091388749251086
1、Apache Flink介紹
Flink是一個純流式計算引擎。
1.1 歷史
Flink起源於一個叫做Stratosphere的研究項目,目標是建立下一代大數據分析引擎,其在2014年4月16日成為Apache的孵化項目,從Stratosphere 0.6開始,正式更名為Flink。Flink 0.7中介紹了最重要的特性:Streaming API。最初只支持Java API,后來增加了Scala API。
1.2 架構
Flink 1.X版本的包含了各種各樣的組件,包括部署、flink core(runtime)以及API和各種庫。
從部署上講,Flink支持local模式、集群模式(standalone集群或者Yarn集群)、雲端部署。Runtime是主要的數據處理引擎,它以JobGraph形式的API接收程序,JobGraph是一個簡單的並行數據流,包含一系列的tasks,每個task包含了輸入和輸出(source和sink例外)。
DataStream API和DataSet API是流處理和批處理的應用程序接口,當程序在編譯時,生成JobGraph。編譯完成后,根據API的不同,優化器(批或流)會生成不同的執行計划。根據部署方式的不同,優化后的JobGraph被提交給了executors去執行。
1.3 分布式執行
Flink分布式程序包含2個主要的進程:JobManager和TaskManager.當程序運行時,不同的進程就會參與其中,包括Jobmanager、TaskManager和JobClient。
首先,Flink程序提交給JobClient,JobClient再提交到JobManager,JobManager負責資源的協調和Job的執行。一旦資源分配完成,task就會分配到不同的TaskManager,TaskManager會初始化線程去執行task,並根據程序的執行狀態向JobManager反饋,執行的狀態包括starting、in progress、finished以及canceled和failing等。當Job執行完成,結果會返回給客戶端。
1.3.1 JobManager
Master進程,負責Job的管理和資源的協調。包括任務調度,檢查點管理,失敗恢復等。
當然,對於集群HA模式,可以同時多個master進程,其中一個作為leader,其他作為standby。當leader失敗時,會選出一個standby的master作為新的leader(通過zookeeper實現leader選舉)。
JobManager包含了3個重要的組件:
(1)Actor系統
(2)調度
(3)檢查點
1.3.1.1 Actor系統
Flink內部使用Akka模型作為JobManager和TaskManager之間的通信機制。
Actor系統是個容器,包含許多不同的Actor,這些Actor扮演者不同的角色。Actor系統提供類似於調度、配置、日志等服務,同時包含了所有actors初始化時的線程池。
所有的Actors存在着層級的關系。新加入的Actor會被分配一個父類的Actor。Actors之間的通信采用一個消息系統,每個Actor都有一個“郵箱”,用於讀取消息。如果Actors是本地的,則消息在共享內存中共享;如果Actors是遠程的,則消息通過RPC遠程調用。
每個父類的Actor都負責監控其子類Actor,當子類Actor出現錯誤時,自己先嘗試重啟並修復錯誤;如果子類Actor不能修復,則將問題升級並由父類Actor處理。
在Flink中,actor是一個有狀態和行為的容器。Actor的線程持續的處理從“郵箱”中接收到的消息。Actor中的狀態和行為則由收到的消息決定。
1.3.1.2 調度器
Flink中的Executors被定義為task slots(線程槽位)。每個Task Manager需要管理一個或多個task slots。
Flink通過SlotSharingGroup和CoLocationGroup來決定哪些task需要被共享,哪些task需要被單獨的slot使用。
1.3.1.3 檢查點
Flink的檢查點機制是保證其一致性容錯功能的骨架。它持續的為分布式的數據流和有狀態的operator生成一致性的快照。其改良自Chandy-Lamport算法,叫做ABS(輕量級異步Barrier快照),具體參見論文:
Lightweight Asynchronous Snapshots for Distributed Dataflows
Flink的容錯機制持續的構建輕量級的分布式快照,因此負載非常低。通常這些有狀態的快照都被放在HDFS中存儲(state backend)。程序一旦失敗,Flink將停止executor並從最近的完成了的檢查點開始恢復(依賴可重發的數據源+快照)。
Barrier作為一種Event,是Flink快照中最主要的元素。它會隨着data record一起被注入到流數據中,而且不會超越data record。每個barrier都有一個唯一的ID,將data record分到不同的檢查點的范圍中。下圖展示了barrier是如何被注入到data record中的:
每個快照中的狀態都會報告給Job Manager的檢查點協調器;快照發生時,flink會在某些有狀態的operator上對data record進行對齊操作(alignment),目的是避免失敗恢復時重復消費數據。這個過程也是exactly once的保證。通常對齊操作的時間僅是毫秒級的。但是對於某些極端的應用,在每個operator上產生的毫秒級延遲也不能允許的話,則可以選擇降級到at least once,即跳過對齊操作,當失敗恢復時可能發生重復消費數據的情況。Flink默認采用exactly once意義的處理。
1.3.2 TaskManager
Task Managers是具體執行tasks的worker節點,執行發生在一個JVM中的一個或多個線程中。Task的並行度是由運行在Task Manager中的task slots的數量決定。如果一個Task Manager有4個slots,那么JVM的內存將分配給每個task slot 25%的內存。一個Task slot中可以運行1個或多個線程,同一個slot中的線程又可以共享相同的JVM。在相同的JVM中的tasks,會共享TCP連接和心跳消息:
1.3.3 Job Client
Job Client並不是Flink程序執行中的內部組件,而是程序執行的入口。Job Client負責接收用戶提交的程序,並創建一個data flow,然后將生成的data flow提交給Job Manager。一旦執行完成,Job Client將返回給用戶結果。
Data flow就是執行計划,比如下面一個簡單的word count的程序:
當用戶將這段程序提交時,Job Client負責接收此程序,並根據operator生成一個data flow,那么這個程序生成的data flow也許看起來像是這個樣子:
默認情況下,Flink的data flow都是分布式並行處理的,對於數據的並行處理,flink將operators和數據流進行partition。Operator partitions叫做sub-tasks。數據流又可以分為一對一的傳輸與重分布的情況。
我們看到,從source到map的data flow,是一個一對一的關系,沒必要產生shuffle操作;而從map到groupBy操作,flink會根據key將數據重分布,即shuffle操作,目的是聚合數據,產生正確的結果。
