1. 系統架構
Flink是一個分布式系統,用於有狀態的並行數據流處理。也就是說,Flink會分布式地運行在多個機器上。在分布式系統中,常見的挑戰有:如何對集群中的資源進行分配與管理、協調進程、數據存儲的高可用、以及異常恢復。
Flink自身並未實現這些功能,而僅關注在它自身的核心功能 - 分布式數據流處理。對於分布式集群的管理,由運行在它之下的集群完成,並提供基礎設施與服務。Flink與常見集群資源管理器契合度良好,例如Apache Mesos,YARN,以及Kubernetes。當然它也可以配置為stand-alone集群。Flink並不提供可靠的分布式存儲。它直接使用其他分布式文件系統如HDFS、S3等。對於在HA設置下的leader選舉,它依賴於ZooKeeper。
在這章我們會介紹Flink的各個組件,以及它們如何相互作用,以運行一個application。我也也會討論Flink 應用的兩種部署模式,以及它們如何分發、執行任務。最后,介紹在HA模式下Flink如何工作。
Flink組件
在Flink中有四個不同的組件,它們共同協作運行流程序。這些組件為:一個JobManager,一個ResourceManager,一個TaskManager,以及一個Dispatcher。Flink是由Java和Scala實現,所以這些組件全部運行在JVM中。每個組件的職責為:
·JobManager:主(master)進程,用於管理單個application的執行。每個application都由一個不同的JobManager管理。JobManager會接收application並執行。一個application包含:一個JobGraph,一個邏輯數據流圖(logical dataflow graph),以及一個Jar文件(包含了所有需要的類、lib庫以及其他資源)。JobManager將JobGraph轉化為一個物理數據流圖(physical dataflow graph),稱為ExecutionGraph。ExecutionGraph由一些可以並行執行的任務(tasks)組成。JobManager向ResourceManager申請必須的計算資源(稱為TaskManager slots)用於執行任務。一旦JobManager收到足夠的TaskManager slots,它將ExecutionGraph中的task分發到TaskManager,然后執行。在執行過程中,JobManager負責任何需要中心協調(central coordination)的操作,例如檢查點(checkpoints)的協調
·ResourceManager:Flink 可以整合多個ResourceManager,例如YARN,Mesos,Kubernetes以及standalone 部署。ResourceManager負責管理TaskManager slots,也就是Flink的一個資源處理單元。當JobManager 申請TaskManager slots時,ResourceManager 會分配空閑slot給它。如果RM並沒有足夠的slots滿足JobManager的請求,則RM can talk to a resource provider to provision containers in which TaskManager processes are started。RM也負責關閉空閑的TaskManagers,以釋放計算資源。
·TaskManagers:是Flink的worker 進程。一般來說,會有多個TaskManagers運行在一個配置好的Flink 集群中。每個TaskManager提供了具體數量的 slots。Slots的數量限制了TaskManager可以運行的task數量。在TaskManager啟動后,它會向ResourceManager注冊它的slots。在接受到RM的指令后,TaskManager會向JobManager提供它的slots。JobManager即可分配任務到這些slots,並開始執行這些任務。在執行過程中,對於同一個application的不同taks,運行在它們之下的TaskManager 之間會互相交換數據。
·Dispatcher 提供了一個REST 接口,用於提交application執行。當一個application被提交,Dispatcher會啟動一個JobManager並將application交給它。REST接口使得Dispatcher可以作為一個(位於防火牆之后的)HTTP 入口服務提供給外部。Dispathcher也運行了一個web控制面板,用於提供job執行的信息。取決於一個application如何提交執行,dispathcher可能並不是必須的。
下圖展示的是:在提交一個application后,Flink的組件之間如何協作運行此應用:
上圖是一個較為High-Level的角度。取決於部署的集群不同(例如YARN,standalone等),一些步驟可以被省略,亦或是有些組件會運行在同一個JVM進程中。
應用部署
Flink application 可以使用以下兩種不同的方式部署:
1. 框架方式
·在這個模式下,Flink應用被打包成一個Jar文件,並由客戶端提交到一個運行的服務(running service)。這個服務可以是一個Flink Dispatcher,一個FlinkJobManager,或是Yarn ResourceManager。如果application被提交給一個JobManager,則它會立即開始執行這個application。如果application被提交給了一個Dispatcher,或是Yarn RM,則它會啟動一個JobManager,然后將application交給它,JobManager開始執行此應用。
2. 庫(Library)模式
·在這個模式下,Flink Application 會被打包在一個container 鏡像,例如一個Docker 鏡像。此鏡像包含了運行JobManager和ResourceManager的代碼。當一個容器從鏡像啟動后,它會自動啟動ResourceManager和JobManager,並提交打包好的應用。另一種方法是:將應用打包到鏡像后,用於部署TaskManager容器。從此鏡像啟動的容器會自動啟動一個TaskManager,它會連接ResourceManager並注冊它的slots。一般來說,這些鏡像的啟動以及失敗重啟由一個外部的資源管理器管理(如Kubernetes)。
框架模式遵循了傳統的提交任務到集群的方式。在庫模式下,沒有運行的Flink服務。它是將Flink作為一個庫,與application一同打包到了一個容器鏡像。這個部署模式在微服務架構中較為常見
任務執行
一個TaskManager可以同時執行多個任務。這些task可以是同一個operator(也就是數據並行)的、或是不同的operator(也就是task並行)的,亦或是另一個不同application的(job並行)一組tasks的子集。TaskManager提供了明確個數的processing slots,用於控制可以並行執行的任務數。一個slot可以執行application的一個分片(一個operator的一個並行task)。下圖展示了TaskManager,slots,tasks以及operators之間的關系:
最左邊是一個JobGraph – application的非並行表示,包含了5個operator。A和C是數據源,E是輸出端(sink)。C和E有2個並行,其他的有4個並行。因為最高的並行度是4,所以應用需要至少四個slot執行任務。給定兩個TaskManager,每個各有兩個slot,這種情況下需求是滿足的。JobManager將JobGraph轉化為ExecutionGraph,並將任務分配到四個可用的slot上。對於有4個並行任務的operator,它的task會分配到每個slot上。對於有2個並行任務的operator C和E,它們的任務被分配到slot 1.1、2.1 以及 slot 1.2、2.2。將tasks調度到slots上,可以讓多個tasks跑在同一個TaskManager內,也就可以是的tasks之間的數據交換更高效。然而將太多任務調度到同一個TaskManager上會導致TaskManager過載,繼而影響效率。之后我們會討論如何控制任務的調度。
TaskManager在同一個JVM中以多線程的方式執行任務。線程較進程會更輕量級,但是線程之間並沒有非常嚴格的將任務互相隔離。所以,單個誤操作的任務可能會kill掉整個TaskManager進程,以及運行在此進程上的所有任務。通過為每個TaskManager配置單獨的slot,可以將application相互隔離。依賴於TaskManager內部的多線程,以及在一台實例上配置部署多個TaskManager,Flink可以為性能與資源隔離提供更靈活的權衡。
高可用設置
流應用一般設計為7 x 24 小時運行。所以很重要的一點是:即使在出現了進程掛掉的情況,應用仍需要繼續保持執行。為了從故障恢復,系統需要重啟進程、重啟應用並恢復它的狀態。接下來我們會介紹Flink如何重啟失敗的進程。
1. TaskManager 失敗
正如前文提到,Flink需要足夠數目的slot,以執行一個應用的所有任務。假設一個Flink配置有4個TaskManager,每個TM提供2個slot,則一個流程序最高可以以8個並行單位執行。如果其中一個TaskManager失敗,可用的slots會降到6。在這種情況下,JobManager會要求ResourceManager提供更多的slots。如果此要求無法完成 - 例如應用跑在一個standalone集群 – JobManager在有足夠的slots之前,無法重啟此application。應用的重啟邏輯決定了JobManager的重啟頻率,以及兩次嘗試之間的時間間隔。
2. JobManager失敗
比TaskManager失敗更嚴重的問題是JobManager失敗。JM控制整個流應用的執行,並維護執行中的元數據,例如指向已完成的檢查點的指針。若是對應的JobManager消失,則流程序無法繼續運行。也就是說JobManager在Flink應用中是單點故障。為了克服這個問題,Flink支持高可用模式,在源JM消失后,可以將一個job的狀態與元數據遷移到另一個JobManager,並繼續執行。
Flink的高可用模式基於ZooKeeper。若是在HA模式下運行,則JobManager將JobGraph以及所有必須的metadata(例如應用的jar文件)寫入到一個遠程持久性存儲系統中。此外,JM會寫一個指針信息(指向存儲位置)到Zookeeper的數據存儲中。在執行一個application的過程中,JM接收每個獨立task檢查點的state句柄(也就是存儲位置)。根據一個檢查點的完成情況(當所有任務已經成功地將它們的state寫入到遠程存儲), JobManager寫入state句柄到遠程存儲,以及寫入指針(指向遠程存儲的指針)到ZooKeeper。所以,所有需要(在一個JM失敗后)被還原的信息被存儲在遠程存儲,而ZooKeeper持有指向此存儲位置的指針。下圖描述了此設計:
當一個JM失敗,所有屬於這個application的任務會自動取消。一個新的JM接管失敗JM的工作,並執行以下操作:
1.從ZooKeeper請求存儲位置(storage location),從遠端存儲獲取JobGraph,Jar文件,以及application上次checkpoint的狀態句柄(state handles)
2.從ResourceManager請求slots,以繼續執行application
3.重啟application並重制它所有的tasks到上一個完成了的checkpoint
當一個application是以庫部署的形式運行(如Kubernetes),失敗的JobManager或TaskManager 容器會由容器服務自動重啟。當運行在YARN或Mesos之上時,JobManager或TaskManager進程會由Flink自動觸發重啟。在standalone模式下,Flink並未提供為失敗進程重啟的工具。所以次模式下可以運行一個standby JM和TM,用於接管失敗的進程。
References:
Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019