1 系統架構
Flink是一個用於有狀態的並行數據流處理的分布式系統。它由多個進程構成,這些進程一般會分布運行在不同的機器上。對於分布式系統來說,面對的常見問題有:集群中資源的分配和管理、進程協調調度、持久化和高可用的數據存儲,以及故障恢復。
對於這些分布式系統的經典問題,業內已有比較成熟的解決方案和服務。所以Flink並不會自己去處理所有的問題,而是利用了現有的集群架構和服務,這樣它就可以把精力集中在核心工作——分布式數據流處理上了。Flink與一些集群資源管理工具有很好的集成,比如Apache Mesos、YARN和Kubernetes;同時,也可以配置為獨立(stand-alone)集群運行。Flink自己並不提供持久化的分布式存儲,而是直接利用了已有的分布式文件系統(比如HDFS)或者對象存儲(比如S3)。對於高可用的配置,Flink需要依靠Apache ZooKeeper來完成。
在本節中,我們將介紹Flink的不同組件,以及在運行程序時它們如何相互作用。我們會討論部署Flink應用程序的兩種模式,並且了解每種模式下分發和執行任務的方式。最后,我們還會解釋一下Flink的高可用性模式是如何工作的。
1.1 Flink運行時組件
Flink運行時架構主要包括四個不同的組件,它們會在運行流處理應用程序時協同工作:作業管理器(JobManager)、資源管理器(ResourceManager)、任務管理器(TaskManager),以及分發器(Dispatcher)。因為Flink是用Java和Scala實現的,所以所有組件都會運行在Java虛擬機(JVMs)上。每個組件的職責如下:
- 作業管理器(JobManager)是控制一個應用程序執行的主進程,也就是說,每個應用程序都會被一個不同的作業管理器所控制執行。
1) 作業管理器會先接收到要執行的應用程序。這個應用程序會包括:作業圖(JobGraph)、邏輯數據流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。
2) 作業管理器會把JobGraph轉換成一個物理層面的數據流圖,這個圖被叫做“執行圖”(ExecutionGraph),包含了所有可以並發執行的任務。
3) 作業管理器會向資源管理器(ResourceManager)請求執行任務必要的資源,也就是任務管理器(TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源,就會將執行圖分發到真正運行它們的TaskManager上。
4) 而在運行過程中,作業管理器會負責所有需要中央協調的操作,比如說檢查點(checkpoints)的協調。
- ResourceManager主要負責管理任務管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定義的處理資源單元。Flink為不同的環境和資源管理工具提供了不同資源管理器(ResourceManager),比如YARN、Mesos、K8s,以及standalone部署。
1)當作業管理器申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配給作業管理器。如果ResourceManager沒有足夠的插槽來滿足作業管理器的請求,它還可以向資源提供平台發起會話,以提供啟動TaskManager進程的容器。
2) 另外,ResourceManager還負責終止空閑的TaskManager,釋放計算資源。
- 任務管理器(TaskManager)是Flink中的工作進程。
1) 通常在Flink中會有多個TaskManager運行,每一個TaskManager都包含了一定數量的插槽(slots)。插槽的數量限制了TaskManager能夠執行的任務數量。
2) 啟動之后,TaskManager會向資源管理器注冊它的插槽;
3) 收到資源管理器的指令后,TaskManager就會將一個或者多個插槽提供給作業管理器調用。作業管理器就可以向插槽分配任務(tasks)來執行了。
4) 在執行過程中,一個TaskManager可以跟其它運行同一應用程序的TaskManager交換數據。任務的執行和插槽的概念會在“任務執行”一節做具體討論。
- 分發器(Dispatcher)可以跨作業運行,它為應用提交提供了REST接口。
1) 當一個應用被提交執行時,分發器就會啟動並將應用移交給一個作業管理器。
2) 由於是REST接口,所以Dispatcher可以作為集群的一個HTTP接入點,這樣就能夠不受防火牆阻擋。
3) Dispatcher也會啟動一個Web UI,用來方便地展示和監控作業執行的信息。
Dispatcher在架構中可能並不是必需的,這取決於應用提交運行的方式。
上圖是從一個較為高層級的視角,來看應用中各組件的交互協作。如果部署的集群環境不同(例如YARN,Mesos,Kubernetes,standalone等),其中一些步驟可以被省略,或是有些組件會運行在同一個JVM進程中。
1.2 應用部署
Flink應用程序可以用以下兩種不同的方式部署:
框架(Framework)方式
在這個模式下,Flink應用被打包成一個Jar文件,並由客戶端提交給一個運行服務(running service)。這個服務可以是一個Flink的Dispatcher,也可以是一個Flink的作業管理器,或是Yarn的ResourceManager。如果application被提交給一個作業管理器,則它會立即開始執行這個application。如果application被提交給了一個Dispatcher,或是Yarn ResourceManager,則它會啟動一個作業管理器,然后將application交給它,再由作業管理器開始執行此應用。
庫(Library)方式
在這個模式下,Flink Application 會被打包在一個容器(container) 鏡像里,例如一個Docker 鏡像。此鏡像包含了運行作業管理器和ResourceManager的代碼。當一個容器從鏡像啟動后,它會自動啟動ResourceManager和作業管理器,並提交打包好的應用。另一種方法是:將應用打包到鏡像后,只用於部署TaskManager容器。從鏡像啟動的容器會自動啟動一個TaskManager,然后連接ResourceManager並注冊它的slots。這些鏡像的啟動以及失敗重啟,通常都會由一個外部的資源管理器管理(比如Kubernetes)。
框架模式遵循了傳統的任務提交方式,從客戶端提交到Flink運行服務。而在庫模式下,沒有運行的Flink服務。它是將Flink作為一個庫,與應用程序一同打包到了一個容器鏡像。這種部署方式在微服務架構中較為常見。我們會在“運行管理流式應用程序”一節對這個話題做詳細討論。
1.3 任務執行
一個TaskManager可以同時執行多個任務(tasks)。這些任務可以是同一個算子(operator)的子任務(數據並行),也可以是來自不同算子的(任務並行),甚至可以是另一個不同應用程序的(作業並行)。TaskManager提供了一定數量的處理插槽(processing slots),用於控制可以並行執行的任務數。一個slot可以執行應用的一個分片,也就是應用中每一個算子的一個並行任務。圖3-2展示了TaskManagers,slots,tasks以及operators之間的關系:
最左邊是一個“作業圖”(JobGraph),包含了5個算子——它是應用程序的非並行表示。其中算子A和C是數據源(source),E是輸出端(sink)。C和E並行度為2,而其他的算子並行度為4。因為最高的並行度是4,所以應用需要至少四個slot來執行任務。現在有兩個TaskManager,每個又各有兩個slot,所以我們的需求是滿足的。作業管理器將JobGraph轉化為“執行圖”(ExecutionGraph),並將任務分配到四個可用的slot上。對於有4個並行任務的算子,它的task會分配到每個slot上。而對於並行度為2的operator C和E,它們的任務被分配到slot 1.1、2.1 以及 slot 1.2、2.2。將tasks調度到slots上,可以讓多個tasks跑在同一個TaskManager內,也就可以是的tasks之間的數據交換更高效。然而將太多任務調度到同一個TaskManager上會導致TaskManager過載,繼而影響效率。之后我們會在“控制任務調度”一節繼續討論如何控制任務的調度。
TaskManager在同一個JVM中以多線程的方式執行任務。線程較進程會更輕量級,但是線程之間並沒有對任務進行嚴格隔離。所以,單個任務的異常行為有可能會導致整個TaskManager進程掛掉,當然也同時包括運行在此進程上的所有任務。通過為每個TaskManager配置單獨的slot,就可以將應用在TaskManager上相互隔離開來。TaskManager內部有多線程並行的機制,而且在一台主機上可以部署多個TaskManager,所以Flink在資源配置上非常靈活,在部署應用時可以充分權衡性能和資源的隔離。我們將會在第九章對Flink集群的配置和搭建繼續做詳細討論。
1.4 高可用配置
流式應用程序一般被設計為7 x 24小時運行。所以很重要的一點是:即使出現了進程掛掉的情況,應用仍需要繼續保持運行。為了從故障恢復,系統首先需要重啟進程、然后重啟應用並恢復它的狀態。接下來,我們就來了解Flink如何重啟失敗的進程。
TaskManager故障
如前所述,Flink需要足夠數目的slot,來執行一個應用的所有任務。假設一個Flink環境有4個TaskManager,每個提供2個插槽,那么流應用程序執行的最高並行度為8。如果其中一個TaskManager掛掉了,那么可用的slots會降到6。在這種情況下,作業管理器會請求ResourceManager提供更多的slots。如果此請求無法滿足——例如應用跑在一個獨立集群——那么作業管理器在有足夠的slots之前,無法重啟應用。應用的重啟策略決定了作業管理器的重啟頻率,以及兩次重啟嘗試之間的時間間隔。
作業管理器故障
比TaskManager故障更嚴重的問題是作業管理器故障。作業管理器控制整個流應用程序的執行,並維護執行中的元數據——例如指向已完成檢查點的指針。若是對應的作業管理器掛掉,則流程序無法繼續運行。所以這就導致在Flink應用中,作業管理器是單點故障。為了解決這個問題,Flink提供了高可用模式。在原先的作業管理器掛掉后,可以將一個作業的狀態和元數據遷移到另一個作業管理器,並繼續執行。
Flink的高可用模式基於Apache ZooKeeper,我們知道,ZooKeeper是用來管理需要協調和共識的分布式服務的系統。Flink主要利用ZooKeeper來進行領導者(leader)的選舉,並把它作為一個高可用和持久化的數據存儲。當在高可用模式下運行時,作業管理器會將JobGraph以及所有需要的元數據(例如應用程序的jar文件),寫入到一個遠程的持久化存儲系統中。而且,作業管理器會將指向存儲位置的指針,寫入到ZooKeeper的數據存儲中。在執行一個應用的過程中,作業管理器會接收每個獨立任務檢查點的狀態句柄(也就是存儲位置)。當一個檢查點完成時(所有任務已經成功地將它們的狀態寫入到遠程存儲), 作業管理器把狀態句柄寫入遠程存儲,並將指向這個遠程存儲的指針寫入ZooKeeper。這樣,一個作業管理器掛掉之后再恢復,所需要的所有數據信息已經都保存在了遠程存儲,而ZooKeeper里存有指向此存儲位置的指針。圖3-3描述了這個設計:
當一個作業管理器失敗,所有屬於這個應用的任務都會自動取消。一個新的作業管理器接管工作,會執行以下操作:
- 從ZooKeeper請求存儲位置(storage location),從遠端存儲獲取JobGraph,Jar文件,以及應用最近一次檢查點(checkpoint)的狀態句柄(state handles)
- 從ResourceManager請求slots,用來繼續運行應用
- 重啟應用,並將所有任務的狀態,重設為最近一次已完成的檢查點
如果我們是在容器環境里運行應用(如Kubernetes),故障的作業管理器或TaskManager 容器通常會由容器服務自動重啟。當運行在YARN或Mesos之上時,作業管理器或TaskManager進程會由Flink的保留進程自動觸發重啟。而在standalone模式下,Flink並未提供重啟故障進程的工具。所以,此模式下我們可以增加備用(standby)的 作業管理器和TaskManager,用於接管故障的進程。我們將會在“高可用配置”一節中做進一步討論。
2 Flink中的數據傳輸
2.1 基於信任度的流控制
通過網絡連接來發送每條數據的效率很低,會導致很大的開銷。為了充分利用網絡連接的帶寬,就需要進行緩沖了。在流處理的上下文中,緩沖的一個缺點是會增加延遲,因為數據需要在緩沖區中進行收集,而不是立即發送。
Flink實現了一個基於信任度的流量控制機制,其工作原理如下。接收任務授予發送任務一些“信任度”(credit),也就是為了接收其數據而保留的網絡緩沖區數。當發送者收到一個信任度通知,它就會按照被授予的信任度,發送盡可能多的緩沖數據,並且同時發送目前積壓數據的大小——也就是已填滿並准備發送的網絡緩沖的數量。接收者用保留的緩沖區處理發來的數據,並對發送者傳來的積壓量進行綜合考量,為其所有連接的發送者確定下一個信用度授權的優先級。
基於信用度的流控制可以減少延遲,因為發送者可以在接收者有足夠的資源接受數據時立即發送數據。此外,在數據傾斜的情況下,這樣分配網絡資源是一種很有效的機制,因為信用度是根據發送者積壓數據量的規模授予的。因此,基於信用的流量控制是Flink實現高吞吐量和低延遲的重要組成部分。
2.2 任務鏈
Flink采用了一種稱為任務鏈的優化技術,可以在特定條件下減少本地通信的開銷。為了滿足任務鏈的要求,必須將兩個或多個算子設為相同的並行度,並通過本地轉發(local forward)的方式進行連接。圖3-5所示的算子管道滿足這些要求。它由三個算子組成,這些算子的任務並行度都被設為2,並且通過本地轉發方式相連接。
圖3-6展示了管道以任務鏈方式運行的過程。算子的函數被融合成了一個單一的任務,由一個線程執行。由函數生成的數據通過一個簡單的方法調用移交給下一個函數;這樣在函數之間直接傳遞數據,基本上沒有序列化和通信成本。
任務鏈可以顯著降低本地任務之間的通信成本,但也有一些場景,在沒有鏈接的情況下運行管道操作是有意義的。例如,如果任務鏈中某個函數執行的開銷巨大,那就可以將一條長的任務鏈管道斷開,或者將一條鏈斷開為兩個任務,從而可以將這個開銷大的函數調度到不同的槽(slots)中。圖3-7顯示了在沒有任務鏈的情況下相同管道操作的執行情況。所有函數都由獨立的單個任務來評估,每個任務都在專有的線程中運行。
任務鏈在Flink中默認會啟用。在“控制任務鏈”一節中,我們展示了如何禁用應用程序的任務鏈,以及如何控制各個算子的鏈接行為。
參考網址:
https://blog.csdn.net/Accelerating/article/details/107894474
http://www.manongjc.com/detail/13-plgrtuapvoblful.html
3 事件時間處理
在“時間語義”一節,我們重點強調了時間語義在流處理應用中的重要性,並且解釋了處理時間(processing time)和事件時間(event time)的不同。處理時間比較好理解,因為它是基於處理器本地時間的;但同時,它會帶來比較混亂、不一致、並且不可重現的結果。相比之下,事件時間語義能夠產生可重現且一致的結果,這也是許多流處理場景希望解決的一大難題。但是,與處理時間應用程序相比,事件時間應用程序會更復雜,需要額外的配置。另外,支持事件時間的流處理器,也比純粹在處理時間中運行的系統內部更為復雜。
Flink為常見的事件時間處理操作提供了直觀且易於使用的原語,同時暴露了表達性很強的API,用戶可以使用自定義算子實現更高級的事件時間應用程序。很好地理解Flink的內部時間處理,對於實現這樣的高級應用程序會有很大幫助,有時也是必需的。上一章介紹了Flink利用兩個概念來支持事件時間語義:記錄時間戳(timestamps)和水位線(watermarks)。接下來,我們將描述Flink如何在內部實現並處理時間戳和水位線,進而支持具有事件時間語義的流式應用程序。
3.1 時間戳
由Flink事件時間流應用程序處理的所有記錄都必須伴有時間戳。時間戳將數據與特定時間點相關聯,通常就是數據所表示的事件發生的時間點。而只要時間戳大致跟數據流保持一致,基本上隨着數據流的前進而增大,應用程序就可以自由選擇時間戳的含義。不過正如“時間語義”一節中所討論的,在現實場景中,時間戳基本上都是亂序的,所以采用“事件時間”而非“處理事件”往往會顯得更為重要。
當Flink以事件時間模式處理數據流時,它會根據數據記錄的時間戳來處理基於時間的算子。例如,時間窗口算子根據相關時間戳將數據分配給不同的時間窗口。Flink將時間戳編碼為16字節的長整型值,並將其作為元數據附加到數據記錄中。它的內置運算符會將這個長整型值解釋為一個具有毫秒精度的Unix時間戳,也就是1970-01-01-00:00:00.000以來的毫秒數。當然,如果用戶進行了自定義,那么運算符可以有自己的解釋,例如,可以將精度調整到微秒。
3.2 水位線
除了時間戳,基於事件時間的Flink應用程序還必須支持水位線(watermark)。在基於事件時間的應用中,水位線用於生成每個任務的當前事件時間。基於時間的算子使用這個“當前事件時間”來觸發計算和處理操作。例如,一個時間窗口任務(time-window task)會在任務的事件時間超出窗口的關閉邊界時,完成窗口計算,並輸出計算結果。
在Flink中,水位線被實現為一條特殊的數據記錄,它里面以長整型值保存了一個時間戳。水位線在帶有時間戳的數據流中,跟隨着其它數據一起流動,如圖3-8所示。
水位線有兩個基本屬性:
- 必須單調遞增,以確保任務的事件時間時鍾在向前推進,而不是在后退。
- 它們與數據的時間戳相關。帶有時間戳T的水位線表示,所有后續數據的時間戳都應該大於T。
上面的第二個屬性用於處理帶有亂序時間戳的數據流,比如圖3-8中時間戳3和5的數據。基於時間的算子任務會收集和處理數據(這些數據可能具有亂序的時間戳),並在事件時間時鍾到達某個時刻時完成計算。這個時刻就表示數據收集的截止,具有之前時間戳的數據應該都已經到達、不再需要了;而其中的事件時間時鍾,正是由當前接收到的水位線來指示的。如果任務再接收到的數據違反了watermark的這一屬性,也就是時間戳小於以前接收到的水位線時,它所屬的那部分計算可能已經完成了。這種數據被稱為延遲數據(late records)。Flink提供了處理延遲數據的不同方式,我們會在“處理延遲數據”一節中討論。
水位線還有一個很有趣的特性,它允許應用程序自己來平衡結果的完整性和延遲。如果水位線與數據的時間戳非常接近,那么我們可以得到較低的處理延遲,因為任務在完成計算之前只會短暫地等待更多數據到達。而同時,結果的完整性可能會受到影響,因為相關數據可能因為遲到而被視為“延遲數據”,這樣就不會包含在結果中。相反,非常保守的水位線提供了足夠的時間去等待所有數據到達,這樣會增加處理延遲,但提高了結果的完整性。
3.3 watermark的傳遞和事件時間
在本節中,我們將討論算子如何處理水位線。Flink把watermark作為一條特殊的數據來實現,它也會由算子任務接收和發送。任務會有一個內部的時間服務,它會維護定時器,並在收到watermark時觸發。任務可以在計時器服務中注冊定時器,以便在將來特定的時間點執行計算。例如,窗口算子為每個活動窗口注冊一個定時器,當事件時間超過窗口的結束時間時,該計時器將清除窗口的狀態。
當任務收到watermark時,將執行以下操作:
- 任務根據watermark的時間戳更新其內部事件時鍾。
- 任務的時間服務會將所有過期的計時器標識出來,它們的時間小於當前的事件時間。對於每個過期的計時器,任務調用一個回調函數,該函數可以執行計算並發送結果。
- 任務會發出一個帶有更新后的事件時間的watermark。
Flink限制通過DataStream API訪問時間戳和watermark。函數不能讀取或修改數據的時間戳和watermark,但底層的“處理函數”(process functions)除外,它們可以讀取當前處理數據的時間戳、請求算子的當前事件時間,還可以注冊定時器。通常的函數都不會暴露這些可以設置時間戳、操作任務事件時間時鍾、或者發出水位線的API。而基於時間的數據流算子任務則會配置發送出的數據的時間戳,以確保它們能夠與已到達的水位線平齊。例如,窗口計算完成后,時間窗口的算子任務會將窗口的結束時間作為時間戳附加到將要發送出的結果數據上,然后再使用觸發窗口計算的時間戳發出watermark。
現在,讓我們更詳細地解釋一下任務在接收到新的watermark時,如何繼續發送watermark並更新其事件時鍾。正如我們在“數據並發和任務並發”中所了解的,Flink將數據流拆分為多個分區,並通過單獨的算子任務並行地處理每個分區。每個分區都是一個流,里面包含了帶着時間戳的數據和watermark。一個算子與它前置或后續算子的連接方式有多種情況,所以它對應的任務可以從一個或多個“輸入分區”接收數據和watermark,同時也可以將數據和watermark發送到一個或多個“輸出分區”。接下來,我們將詳細描述一個任務如何向多個輸出任務發送watermark,以及如何通過接收到的watermark來驅動事件時間時鍾前進。
任務為每個輸入分區維護一個分區水位線(watermark)。當從一個分區接收到watermark時,它會比較新接收到的值和當前水位值,然后將相應的分區watermark更新為兩者的最大值。然后,任務會比較所有分區watermark的大小,將其事件時鍾更新為所有分區watermark的最小值。如果事件時間時鍾前進了,任務就將處理所有被觸發的定時器操作,並向所有連接的輸出分區發送出相應的watermark,最終將新的事件時間廣播給所有下游任務。
圖3-9顯示了具有四個輸入分區和三個輸出分區的任務如何接收watermark、更新分區watermark和事件時間時鍾,以及向下游發出watermark。
具有兩個或多個輸入流(如Union或CoFlatMap)的算子任務(參見“多流轉換”一節)也會以所有分區watermark的最小值作為事件時間時鍾。它們並不區分不同輸入流的分區watermark,所以兩個輸入流的數據都是基於相同的事件時間時鍾進行處理的。當然我們可以想到,如果應用程序的各個輸入流的事件時間不一致,那么這種處理方式可能會導致問題。
Flink的水位處理和傳遞算法,確保了算子任務發出的時間戳和watermark是“對齊”的。不過它依賴一個條件,那就是所有分區都會提供不斷增長的watermark。一旦一個分區不再推進水位線的上升,或者完全處於空閑狀態、不再發送任何數據和watermark,任務的事件時間時鍾就將停滯不前,任務的定時器也就無法觸發了。對於基於時間的算子來說,它們需要依賴時鍾的推進來執行計算和清除狀態,這種情況顯然就會有問題。如果任務沒有定期從所有輸入任務接收到新的watermark,那么基於時間的算子的處理延遲和狀態空間的大小都會顯著增加。
對於具有兩個輸入流而且watermark明顯不同的算子,也會出現類似的情況。具有兩個輸入流的任務的事件時間時鍾,將會同較慢的那條流的watermark保持一致,而通常較快流的數據或者中間結果會在state中緩沖,直到事件時間時鍾達到這條流的watermark,才會允許處理它們。
3.4 時間戳的分配和水位線的產生
我們已經解釋了什么是時間戳和水位線,以及它們是如何由Flink內部處理的;然而我們還沒有討論它們的產生。流應用程序接收到數據流時,通常就會先分配時間戳並生成水位線(watermark)。因為時間戳的選擇是由不同的應用程序決定的,而且watermark取決於時間戳和流的特性,所以應用程序必須首先顯式地分配時間戳並生成watermark。Flink流應用程序可以通過三種方式分配時間戳和生成watermark:
- 在數據源(source)處分配:當數據流被攝入到應用程序中時,可以由“源函數”SourceFunction分配和生成時間戳和watermark。SourceFunction可以產生並發送一個數據流;數據會與相關的時間戳一起發送出去,而watermark可以作為一條特殊數據在任何時間點發出。如果SourceFunction(暫時)不再發出watermark,它可以聲明自己處於“空閑”(idle)狀態。Flink會在后續算子的水位計算中,把空閑的SourceFunction產生的流分區排除掉。source的這一空閑機制,可以用來解決前面提到的水位不再上升的問題。源函數(Source Function)在“實現自定義源函數”一節中進行了更詳細的討論。
- 定期分配:在Flink中,DataStream API提供一個名為AssignerWithPeriodicWatermarks的用戶定義函數,它可以從每個數據中提取時間戳,並被定期調用以生成當前watermark。提取出的時間戳被分配給相應的數據,而生成的watermark也會添加到流中。這個函數將在“分配時間戳和生成水位線”一節中討論。
- 間斷分配:AssignerWithPunctuatedWatermarks是另一個用戶定義的函數,它同樣會從每個數據中提取一個時間戳。它可以用於生成特殊輸入數據中的watermark。與AssignerWithPeriodicWatermarks相比,此函數可以(但不是必須)從每個記錄中提取watermark。我們在“分配時間戳和生成水位線”一節中同樣討論了該函數。
用戶定義的時間戳分配函數並沒有嚴格的限制,通常會放在盡可能靠近source算子的位置,因為當經過一些算子處理后,數據及其時間戳的順序就更加難以解釋了。所以盡管我們可以在流應用程序的中段覆蓋已有的時間戳和watermark——Flink通過用戶定義的函數提供了這種靈活性,但這顯然並不是推薦的做法。
4 狀態管理
在第2章中,我們已經知道大多數流應用程序都是有狀態的。許多算子會不斷地讀取和更新狀態,例如在窗口中收集的數據、讀取輸入源的位置,或者像機器學習模型那樣的用戶定制化的算子狀態。 Flink用同樣的方式處理所有的狀態,無論是內置的還是用戶自定義的算子。本節我們將會討論Flink支持的不同類型的狀態,並解釋“狀態后端”是如何存儲和維護狀態的。
一般來說,由一個任務維護,並且用來計算某個結果的所有數據,都屬於這個任務的狀態。你可以認為狀態就是一個本地變量,可以被任務的業務邏輯訪問。圖3-10顯示了任務與其狀態之間的交互。
任務會接收一些輸入數據。在處理數據時,任務可以讀取和更新狀態,並根據輸入數據和狀態計算結果。最簡單的例子,就是統計接收到多少條數據的任務。當任務收到新數據時,它會訪問狀態以獲取當前的計數,然后讓計數遞增,更新狀態並發送出新的計數。
應用程序里,讀取和寫入狀態的邏輯一般都很簡單直接,而有效可靠的狀態管理會復雜一些。這包括如何處理很大的狀態——可能會超過內存,並且保證在發生故障時不會丟失任何狀態。幸運的是,Flink會幫我們處理這相關的所有問題,包括狀態一致性、故障處理以及高效存儲和訪問,以便開發人員可以專注於應用程序的邏輯。
在Flink中,狀態始終與特定算子相關聯。為了使運行時的Flink了解算子的狀態,算子需要預先注冊其狀態。總的說來,有兩種類型的狀態:算子狀態(operator state)和鍵控狀態(keyed state),它們有着不同的范圍訪問,我們將在下面展開討論。
4.1 算子狀態
算子狀態的作用范圍限定為算子任務。這意味着由同一並行任務所處理的所有數據都可以訪問到相同的狀態,狀態對於同一任務而言是共享的。算子狀態不能由相同或不同算子的另一個任務訪問。圖3-11顯示了任務如何訪問算子狀態。
Flink為算子狀態提供三種基本數據結構:
列表狀態
將狀態表示為一組數據的列表。
聯合列表狀態
也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復。我們將在后面繼續討論。
廣播狀態
如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應用廣播狀態。在保存檢查點和重新調整算子並行度時,會用到這個特性。這兩部分內容將在本章后面討論。
4.2 鍵控狀態
顧名思義,鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的。Flink為每個鍵值維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個算子任務中,這個任務會維護和處理這個key對應的狀態。當任務處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key。因此,具有相同key的所有數據都會訪問相同的狀態。圖3-12顯示了任務如何與鍵控狀態進行交互。
我們可以將鍵控狀態看成是在算子所有並行任務上,對鍵進行分區(或分片)之后的一個鍵值映射(key-value map)。 Flink為鍵控狀態提供不同的數據結構,用於確定map中每個key存儲的值的類型。我們簡單了解一下最常見的鍵控狀態。
值狀態
為每個鍵存儲一個任意類型的單個值。復雜數據結構也可以存儲為值狀態。
列表狀態
為每個鍵存儲一個值的列表。列表里的每個數據可以是任意類型。
映射狀態
為每個鍵存儲一個鍵值映射(map)。map的key和value可以是任意類型。
狀態的數據結構可以讓Flink實現更有效的狀態訪問。我們將在“在運行時上下文(RuntimeContext)中聲明鍵控狀態”中做進一步討論。
4.3 狀態后端
每傳入一條數據,有狀態的算子任務都會讀取和更新狀態。由於有效的狀態訪問對於處理數據的低延遲至關重要,因此每個並行任務都會在本地維護其狀態,以確保快速的狀態訪問。狀態到底是如何被存儲、訪問以及維護的?這件事由一個可插入的組件決定,這個組件就叫做狀態后端(state backend)。狀態后端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠程存儲。
對於本地狀態管理,狀態后端會存儲所有鍵控狀態,並確保所有的訪問都被正確地限定在當前鍵范圍。 Flink提供了默認的狀態后端,會將鍵控狀態作為內存中的對象進行管理,將它們存儲在JVM堆上。另一種狀態后端則會把狀態對象進行序列化,並將它們放入RocksDB中,然后寫入本地硬盤。第一種方式可以提供非常快速的狀態訪問,但它受內存大小的限制;而訪問RocksDB狀態后端存儲的狀態速度會較慢,但其狀態可以增長到非常大。
狀態檢查點的寫入也非常重要,這是因為Flink是一個分布式系統,而狀態只能在本地維護。 TaskManager進程(所有任務在其上運行)可能在任何時間點掛掉。因此,它的本地存儲只能被認為是不穩定的。狀態后端負責將任務的狀態檢查點寫入遠程的持久存儲。寫入檢查點的遠程存儲可以是分布式文件系統,也可以是數據庫。不同的狀態后端在狀態檢查點的寫入機制方面有所不同。例如,RocksDB狀態后端支持增量的檢查點,這對於非常大的狀態來說,可以顯著減少狀態檢查點寫入的開銷。
我們將在“選擇狀態后端”一節中更詳細地討論不同的狀態后端及其優缺點。
4.4 調整有狀態算子的並行度
流應用程序的一個常見要求是,為了增大或較小輸入數據的速率,需要靈活地調整算子的並行度。對於無狀態算子而言,並行度的調整沒有任何問題,但更改有狀態算子的並行度顯然就沒那么簡單了,因為它們的狀態需要重新分區並分配給更多或更少的並行任務。 Flink支持四種模式來調整不同類型的狀態。
具有鍵控狀態的算子通過將鍵重新分區為更少或更多任務來縮放並行度。不過,並行度調整時任務之間會有一些必要的狀態轉移。為了提高效率,Flink並不會對單獨的key做重新分配,而是用所謂的“鍵組”(key group)把鍵管理起來。鍵組是key的分區形式,同時也是Flink為任務分配key的方式。圖3-13顯示了如何在鍵組中重新分配鍵控狀態。
具有算子列表狀態的算子,會通過重新分配列表中的數據項目來進行並行度縮放。從概念上講,所有並行算子任務的列表項目會被收集起來,並將其均勻地重新分配給更少或更多的任務。如果列表條目少於算子的新並行度,則某些任務將以空狀態開始。圖3-14顯示了算子列表狀態的重新分配。
具有算子聯合列表狀態的算子,會通過向每個任務廣播狀態的完整列表,來進行並行度的縮放。然后,任務可以選擇要使用的狀態項和要丟棄的狀態項。圖3-15顯示了如何重新分配算子聯合列表狀態。
具有算子廣播狀態的算子,通過將狀態復制到新任務,來增大任務的並行度。這是沒問題的,因為廣播狀態保證了所有任務都具有相同的狀態。而對於縮小並行度的情況,我們可以直接取消剩余任務,因為狀態是相同的,已經被復制並且不會丟失。圖3-16顯示了算子廣播狀態的重新分配。
5 檢查點,保存點和狀態恢復
Flink是一個分布式數據處理系統,因此必須有一套機制處理各種故障,比如被殺掉的進程,故障的機器和中斷的網絡連接。任務都是在本地維護狀態的,所以Flink必須確保狀態不會丟失,並且在發生故障時能夠保持一致。
在本節中,我們將介紹Flink的檢查點(checkpoint)和恢復機制,這保證了“精確一次”(exactly-once)的狀態一致性。我們還會討論Flink獨特的保存點(savepoint)功能,這是一個“瑞士軍刀”式的工具,可以解決許多操作數據流時面對的問題。
5.1 一致的檢查點
Flink的恢復機制的核心,就是應用狀態的一致檢查點。有狀態流應用的一致檢查點,其實就是所有任務狀態在某個時間點的一份拷貝,而這個時間點應該是所有任務都恰好處理完一個相同的輸入數據的時候。這個過程可以通過一致檢查點的一個簡單算法步驟來解釋。這個算法的步驟是:
- 暫停所有輸入流的攝取,也就是不再接收新數據的輸入。
- 等待所有正在處理的數據計算完畢,這意味着結束時,所有任務都已經處理了所有輸入數據。
- 通過將每個任務的狀態復制到遠程持久存儲,來得到一個檢查點。所有任務完成拷貝操作后,檢查點就完成了。
- 恢復所有輸入流的攝取。
需要注意,Flink實現的並不是這種簡單的機制。我們將在本節后面介紹Flink更精妙的檢查點算法。
圖3-17顯示了一個簡單應用中的一致檢查點。
上面的應用程序中具有單一的輸入源(source)任務,輸入數據就是一組不斷增長的數字的流——1,2,3等。數字流被划分為偶數流和奇數流。求和算子(sum)的兩個任務會分別實時計算當前所有偶數和奇數的總和。源任務會將其輸入流的當前偏移量存儲為狀態,而求和任務則將當前的總和值存儲為狀態。在圖3-17中,Flink在輸入偏移量為5時,將檢查點寫入了遠程存儲,當前的總和為6和9
5.2 從一致檢查點中恢復狀態
在執行流應用程序期間,Flink會定期檢查狀態的一致檢查點。如果發生故障,Flink將會使用最近的檢查點來一致恢復應用程序的狀態,並重新啟動處理流程。圖3-18顯示了恢復過程。
應用程序從檢查點的恢復分為三步:
- 重新啟動整個應用程序。
- 將所有的有狀態任務的狀態重置為最近一次的檢查點。
- 恢復所有任務的處理。
這種檢查點的保存和恢復機制可以為應用程序狀態提供“精確一次”(exactly-once)的一致性,因為所有算子都會保存檢查點並恢復其所有狀態,這樣一來所有的輸入流就都會被重置到檢查點完成時的位置。至於數據源是否可以重置它的輸入流,這取決於其實現方式和消費流數據的外部接口。例如,像Apache Kafka這樣的事件日志系統可以提供流上之前偏移位置的數據,所以我們可以將源重置到之前的偏移量,重新消費數據。而從套接字(socket)消費數據的流就不能被重置了,因為套接字的數據一旦被消費就會丟棄掉。因此,對於應用程序而言,只有當所有的輸入流消費的都是可重置的數據源時,才能確保在“精確一次”的狀態一致性下運行。
從檢查點重新啟動應用程序后,其內部狀態與檢查點完成時的狀態完全相同。然后它就會開始消費並處理檢查點和發生故障之間的所有數據。盡管這意味着Flink會對一些數據處理兩次(在故障之前和之后),我們仍然可以說這個機制實現了精確一次的一致性語義,因為所有算子的狀態都已被重置,而重置后的狀態下還不曾看到這些數據。
我們必須指出,Flink的檢查點保存和恢復機制僅僅可以重置流應用程序的內部狀態。對於應用中的一些的輸出(sink)算子,在恢復期間,某些結果數據可能會多次發送到下游系統,比如事件日志、文件系統或數據庫。對於某些存儲系統,Flink提供了具有精確一次輸出功能的sink函數,比如,可以在檢查點完成時提交發出的記錄。另一種適用於許多存儲系統的方法是冪等更新。在“應用程序一致性保證”一節中,我們還會詳細討論如何解決應用程序端到端的精確一次一致性問題。
5.3 Flink的檢查點算法
Flink的恢復機制,基於它的一致性檢查點。前面我們已經了解了從流應用中創建檢查點的簡單方法——先暫停應用,保存檢查點,然后再恢復應用程序,這種方法很好理解,但它的理念是“停止一切”,這對於即使是中等延遲要求的應用程序而言也是不實用的。所以Flink沒有這么簡單粗暴,而是基於Chandy-Lamport算法實現了分布式快照的檢查點保存。該算法並不會暫停整個應用程序,而是將檢查點的保存與數據處理分離,這樣就可以實現在其它任務做檢查點狀態保存狀態時,讓某些任務繼續進行而不受影響。接下來我們將解釋此算法的工作原理。
Flink的檢查點算法用到了一種稱為“檢查點分界線”(checkpoint barrier)的特殊數據形式。與水位線(watermark)類似,檢查點分界線由source算子注入到常規的數據流中,它的位置是限定好的,不能超過其他數據,也不能被后面的數據超過。檢查點分界線帶有檢查點ID,用來標識它所屬的檢查點;這樣,這個分界線就將一條流邏輯上分成了兩部分。分界線之前到來的數據導致的狀態更改,都會被包含在當前分界線所屬的檢查點中;而基於分界線之后的數據導致的所有更改,就會被包含在之后的檢查點中。
我們用一個簡單的流應用程序作為示例,來一步一步解釋這個算法。該應用程序有兩個源(source)任務,每個任務都消費一個增長的數字流。源任務的輸出被划分為兩部分:偶數和奇數的流。每個分區由一個任務處理,該任務計算所有收到的數字的總和,並將更新的總和轉發給輸出(sink)任務。這個應用程序的結構如圖3-19所示。
作業管理器會向每個數據源(source)任務發送一條帶有新檢查點ID的消息,通過這種方式來啟動檢查點,如圖3-20所示。
當source任務收到消息時,它會暫停發出新的數據,在狀態后端觸發本地狀態的檢查點保存,並向所有傳出的流分區廣播帶着檢查點ID的分界線(barriers)。狀態后端在狀態檢查點完成后會通知任務,而任務會向作業管理器確認檢查點完成。在發出所有分界線后,source任務就可以繼續常規操作,發出新的數據了。通過將分界線注入到輸出流中,源函數(source function)定義了檢查點在流中所處的位置。圖3-21顯示了兩個源任務將本地狀態保存到檢查點,並發出檢查點分界線之后的流應用程序。
源任務發出的檢查點分界線(barrier),將被傳遞給所連接的任務。與水位線(watermark)類似,barrier會被廣播到所有連接的並行任務,以確保每個任務從它的每個輸入流中都能接收到。當任務收到一個新檢查點的barrier時,它會等待這個檢查點的所有輸入分區的barrier到達。在等待的過程中,任務並不會閑着,而是會繼續處理尚未提供barrier的流分區中的數據。對於那些barrier已經到達的分區,如果繼續有新的數據到達,它們就不會被立即處理,而是先緩存起來。這個等待所有分界線到達的過程,稱為“分界線對齊”(barrier alignment),如圖3-22所示。
當任務從所有輸入分區都收到barrier時,它就會在狀態后端啟動一個檢查點的保存,並繼續向所有下游連接的任務廣播檢查點分界線,如圖3-23所示。
所有的檢查點barrier都發出后,任務就開始處理之前緩沖的數據。在處理並發出所有緩沖數據之后,任務就可以繼續正常處理輸入流了。圖3-24顯示了此時的應用程序。
最終,檢查點分界線會到達輸出(sink)任務。當sink任務接收到barrier時,它也會先執行“分界線對齊”,然后將自己的狀態保存到檢查點,並向作業管理器確認已接收到barrier。一旦從應用程序的所有任務收到一個檢查點的確認信息,作業管理器就會將這個檢查點記錄為已完成。圖3-25顯示了檢查點算法的最后一步。這樣,當發生故障時,我們就可以用已完成的檢查點恢復應用程序了。
5.4 檢查點的性能影響
Flink的檢查點算法可以在不停止整個應用程序的情況下,生成一致的分布式檢查點。但是,它可能會增加應用程序的處理延遲。Flink對此有一些調整措施,可以在某些場景下顯得對性能的影響沒那么大。
當任務將其狀態保存到檢查點時,它其實處於一個阻塞狀態,而此時新的輸入會被緩存起來。由於狀態可能變得非常大,而且檢查點需要通過網絡將數據寫入遠程存儲系統,檢查點的寫入很容易就會花費幾秒到幾分鍾的時間——這對於要求低延遲的應用程序而言,顯然是不可接受的。在Flink的設計中,真正負責執行檢查點寫入的,其實是狀態后端。具體怎樣復制任務的狀態,取決於狀態后端的實現方式。例如,文件系統(FileSystem)狀態后端和RocksDB狀態后端都支持了異步(asynchronous)檢查點。觸發檢查點操作時,狀態后端會先創建狀態的本地副本。本地拷貝完成后,任務就將繼續常規的數據處理,這往往並不會花費太多時間。一個后台線程會將本地快照異步復制到遠程存儲,並在完成檢查點后再回來通知任務。異步檢查點的機制,顯著減少了任務繼續處理數據之前的等待時間。此外,RocksDB狀態后端還實現了增量的檢查點,這樣可以大大減少要傳輸的數據量。
為了減少檢查點算法對處理延遲的影響,另一種技術是調整分界線對齊的步驟。對於需要非常低的延遲、並且可以容忍“至少一次”(at-least-once)狀態保證的應用程序,Flink可以將檢查點算法配置為,在等待barrier對齊期間處理所有到達的數據,而不是把barrier已經到達的那些分區的數據緩存起來。當檢查點的所有barrier到達,算子任務就會將狀態寫入檢查點——當然,現在的狀態中,就可能包括了一些“提前”的更改,這些更改由本該屬於下一個檢查點的數據到來時觸發。如果發生故障,從檢查點恢復時,就將再次處理這些數據:這意味着檢查點現在提供的是“至少一次”(at-least-once)而不是“精確一次”(exactly-once)的一致性保證。
5.5 保存點
Flink的恢復算法是基於狀態檢查點的。Flink根據可配置的策略,定期保存並自動丟棄檢查點。檢查點的目的是確保在發生故障時可以重新啟動應用程序,所以當應用程序被顯式地撤銷(cancel)時,檢查點會被刪除掉。除此之外,應用程序狀態的一致性快照還可用於除故障恢復之外的更多功能。
Flink中一個最有價值,也是最獨特的功能是保存點(savepoints)。原則上,創建保存點使用的算法與檢查點完全相同,因此保存點可以認為就是具有一些額外元數據的檢查點。 Flink不會自動創建保存點,因此用戶(或者外部調度程序)必須明確地觸發創建操作。同樣,Flink也不會自動清理保存點。第10章將會具體介紹如何觸發和處理保存點。
使用保存點
有了應用程序和與之兼容的保存點,我們就可以從保存點啟動應用程序了。這會將應用程序的狀態初始化為保存點的狀態,並從保存點創建時的狀態開始運行應用程序。雖然看起來這種行為似乎與用檢查點從故障中恢復應用程序完全相同,但實際上故障恢復只是一種特殊情況,它只是在相同的集群上以相同的配置啟動相同的應用程序。而從保存點啟動應用程序會更加靈活,這就可以讓我們做更多事情了。
- 可以從保存點啟動不同但兼容的應用程序。這樣一來,我們就可以及時修復應用程序中的邏輯bug,並讓流式應用的源盡可能多地提供之前發生的事件,然后重新處理,以便修復之前的計算結果。修改后的應用程序還可用於運行A / B測試,或者具有不同業務邏輯的假設場景。這里要注意,應用程序和保存點必須兼容才可以這么做——也就是說,應用程序必須能夠加載保存點的狀態。
- 可以使用不同的並行度來啟動相同的應用程序,可以將應用程序的並行度增大或減小。
- 可以在不同的集群上啟動同樣的應用程序。這非常有意義,意味着我們可以將應用程序遷移到較新的Flink版本或不同的集群上去。
- 可以使用保存點暫停應用程序,稍后再恢復。這樣做的意義在於,可以為更高優先級的應用程序釋放集群資源,或者在輸入數據不連續生成時釋放集群資源。
- 還可以將保存點設置為某一版本,並歸檔(archive)存儲應用程序的狀態。
保存點是非常強大的功能,所以許多用戶會定期創建保存點以便能夠及時退回之前的狀態。我們見到的各種場景中,保存點一個最有趣的應用是不斷將流應用程序遷移到更便宜的數據中心上去。
從保存點啟動應用程序
前面提到的保存點的所有用例,都遵循相同的模式。那就是首先創建正在運行的應用程序的保存點,然后在一個新啟動的應用程序中用它來恢復狀態。之前我們已經知道,保存點的創建和檢查點非常相似,而接下來我們就將介紹對於一個從保存點啟動的應用程序,Flink如何初始化其狀態。
應用程序由多個算子組成。每個算子可以定義一個或多個鍵控狀態和算子狀態。算子由一個或多個算子任務並行執行。因此,一個典型的應用程序會包含多個狀態,這些狀態分布在多個算子任務中,這些任務可以運行在不同的TaskManager進程上。
圖3-26顯示了一個具有三個算子的應用程序,每個算子執行兩個算子任務。一個算子(OP-1)具有單一的算子狀態(OS-1),而另一個算子(OP-2)具有兩個鍵控狀態(KS-1和KS-2)。當保存點創建時,會將所有任務的狀態復制到持久化的存儲位置。
保存點中的狀態拷貝會以算子標識符(operator ID)和狀態名稱(state name)組織起來。算子ID和狀態名稱必須能夠將保存點的狀態數據,映射到一個正在啟動的應用程序的算子狀態。從保存點啟動應用程序時,Flink會將保存點的數據重新分配給相應的算子任務。
請注意,保存點不包含有關算子任務的信息。這是因為當應用程序以不同的並行度啟動時,任務數量可能會更改。
如果我們要從保存點啟動一個修改過的應用程序,那么保存點中的狀態只能映射到符合標准的應用程序——它里面的算子必須具有相應的ID和狀態名稱。默認情況下,Flink會自動分配唯一的算子ID。然而,一個算子的ID,是基於它之前算子的ID確定性地生成的。因此,算子的ID會在其前序算子改變時改變,比如,當我們添加了新的或移除掉一個算子時,前序算子ID改變,當前算子ID就會變化。所以對於具有默認算子ID的應用程序而言,如果想在不丟失狀態的前提下升級,就會受到極大的限制。因此,我們強烈建議在程序中為算子手動分配唯一ID,而不是依靠Flink的默認分配。我們將在“指定唯一的算子標識符”一節中詳細說明如何分配算子標識符。
參考資料:
https://blog.csdn.net/qq_33982605/article/details/106206977