Flink架構(五)- 檢查點,保存點,與狀態恢復


檢查點,保存點,與狀態恢復

Flink是一個分布式數據處理系統,這種場景下,它需要處理各種異常,例如進程終止、機器故障、網絡中斷等。因為tasks在本地維護它們的stateFlink必須確保在出現故障的情況下,state不會丟失,並且保持一致性。

在這一節,我們會介紹Flink用於保證exactly-once state 一致性的檢查點與恢復機制。我們也會討論Flink獨特的保存點功能。

 

一致性檢查點(consistent checkpoints

Flink的恢復機制基於應用狀態的一致檢查點。在有狀態的流應用中,一個一致性檢查點是:在所有tasks處理了一個(相同的)輸入后,當前時間點每個taskstate副本。在為application做一個一致性檢查點時,遵循的一個基本算法步驟如下:

1.       暫停所有輸入流的消費

2.       等待所有未被處理的data完全被處理,表示所有tasks已經處理了它們所有的輸入數據

3.       做檢查點:復制每個taskstate到一個遠端持久性存儲。在所有tasks完成了它們的副本后,檢查點完成

4.       恢復消費輸入流

需要注意的是,Flink並不是實現的這個朴素機制。我們之后會介紹在Flink中更復雜的檢查點算法。

下圖展示了一致性檢查點:

 

 

此應用有一個source task,消費一個遞增數的流,如123等等。流中的數據被分區到一個基數流,一個偶數流。在一個sum operator中,有兩個task,分別用於累加基數與偶數。Source task 存儲當前輸入流的偏移量作為stateSum task 將當前的累加和作為state並存儲。上圖中,在輸入偏移量為5時,Flink做了一個檢查點,此時兩個task的累加和分別為69

 

從一致性檢查點恢復

在流的執行過程中,Flink定期為applicationstate做一致性檢查點。在發生故障時,Flink使用最新的檢查點,以一致性地還原application的狀態並重新開始執行。下圖展示了恢復的過程:

 


應用在恢復時有三個步驟:

1.       重啟整個application

2.       重置所有stateful tasks的狀態為最近的檢查點

3.       恢復所有tasks的處理

這個檢查點與恢復機制可以為應用state提供exactly-once一致性,因為所有的狀態都可以恢復到做時間點的那一刻。一個數據源是否可以重制它的輸入流,取決於它的實現,以及流被消費的源頭(如外部系統或是接口)。例如,event logKafka可以提供一個流中當前偏移量之前的records。相反,若是一個流是從一個socket消費的,則無法被重置,因為sockets在消費完一個數據后,會將它丟棄。因此,一個application在僅當所有輸入流是由可重置的data sources 消費時,它才能夠以exactly-once state 一致性的方式運行。

在一個application從一個檢查點重新開始運行后,它的內部state會與做檢查點時的狀態完全一致。然后它開始消費並處理所有位於檢查點與故障時間點之間的數據。盡管這里隱含的是Flink會處理某些記錄兩次(在故障前與故障后),這個機制仍達到了exactly-once state 一致性,因為所有的operator的狀態都被重置到了它未見到這些數據之前的時間點。

我們必須指出,Flink的檢查點與恢復機制僅重置一個流應用的internal state。在恢復時,取決於應用的sink operator的不同,一些records可能會多次釋放給下游的流,例如一個event log,一個文件系統,或是一個數據庫。對於某些存儲系統,Flinksink 函數可以提供exactly-once 輸出,例如,在檢查點完成時才提交釋放的records。另一個適用於大部分存儲系統的方法是:冪等更新(idempotent updates)。

 

Flink檢查點算法

上文提到,Flink的恢復機制基於的是一致性檢查點。一個朴素的實現是:暫停,做檢查點,然后恢復應用執行。但是這種“stop-the-world”的行為,即使對於能接受中等延時的應用來說,也是不切實際的。在Flink中,它基於Chandy-Lamport算法(用於做分布式快照)實現了檢查點機制。此算法並不停止整個應用的運行,而是將做快照的操作從流處理解耦出來,這樣一些tasks可以持續運行,而其他tasks可以持久化它們的狀態。下面我們介紹一下此算法是如何工作的。

Flink的檢查點算法使用了一個特殊的record類型,稱為一個檢查點分界(checkpoint barrier)。類似於水印,檢查點barrierssource operator注入到常規的流記錄中,並且無法被其他records 趕超。每個檢查點barrier會攜帶一個檢查點ID,用於辨別它屬於哪個檢查點,並且將一個流在邏輯上分成兩部分。在一個barrier之前,對state的所有修改,包含於此barrier的檢查點。若是在一個barrier之后對state的所有修改,則包含於下一個檢查點。

我們使用一個簡單的流處理應用解釋此算法的步驟。此應用由兩個source tasks組成,每個task都消費一個遞增數字的流。Source tasks的輸出被分區到兩個流中,分別是奇數流和偶數流。每個分區都由一個task處理,用於計算接收到的數字的總和,並將更新后的sum值傳遞給一個sink。此應用的示意圖如下:

 

JobManager 向每個source task發送一條包含一個新checkpointID的消息,以初始化一個檢查點,如下圖:

 

 

當一個data source task 收到這條消息時,它會停止釋放records,在state backend觸發一個它本地狀態的一個檢查點,via all outgoing stream partition,廣播(帶有檢查點ID的)檢查點barrier state backend 在它的狀態檢查點完成后,會提醒tasktask會在JobManager承認(acknowledge)檢查點。在所有barriers被發出去后,source繼續它的常規操作。通過注入barrier到它的輸出流,source函數決定了:在流的哪個位置做檢查點。下圖顯示了在兩個source tasks 對它們本地狀態做完檢查點,並釋放檢查點barrier后的流應用。

 

 

source tasks釋放的檢查點barriers,會被傳輸到與它們相連的tasks中。類似於水印,檢查點barrier被廣播到所有相連的並行tasks中,以確保每個task都能從它們的每個輸入流收到一個barrier。當一個task收到一個新檢查點的barrier時,它會等待barriers從它所有的輸入分區到達。在它等待時,對於尚未提供barrier的流分區,它會繼續處理這些流分區中的records。對於已經提供了barrier的流分區,records不會被立即處理,而是被放入緩存。這個等待所有barrier到達的過程稱為分界校准(barrier alignment),如下圖所示:

 

 

一旦一個task從它所有輸入分區中,收到了全部的barriers。它開始在state backend初始化檢查點,並廣播檢查點barrier到它所有的下游tasks,如下圖:

 

 

在所有檢查點barriers已經被釋放后,task開始處理被緩存的記錄。在所有被緩存的記錄被釋放后,task 繼續處理它的輸入流。下圖顯示了應用在這個時間點的運行狀況:

 

 

最終,檢查點barriers 到達一個sink task。當一個sink task 收到一個barrier時,它會做一個barrier 調整(alignment),給它自己的狀態做檢查點,並向JobManager確認(acknowledge)它已收到barrierJobManager在收到一個application的所有task發送的checkpoint acknowledge后,它會記錄:此application的檢查點完成。下圖顯示了檢查點算法的最后一步,完成的檢查點可以用於從故障中恢復一個application

 

 

做檢查點的性能影響

Flink的檢查點算法可以在不停止整個application的情況下,從流應用中生成一致性分布式的檢查點。然而,它會增加application的處理延時(processing latency)。Flink 實現了輕微調整,以在某些特定條件下緩解性能影響。

在一個task對它的狀態做檢查點時,它會阻塞,並緩存它的輸入。因為state可以變的很大,並且檢查點的操作需要通過網絡寫入數據到一個遠端存儲系統,所以做檢查點的操作可能會很容易就花費幾秒到幾分鍾,這對於延時敏感的application來說,延時過長了。在Flink的設計中,做一個檢查點是由state backend負責的。一個taskstate如何精確的被復制,取決於state backend的實現。例如,FileSystem state backendRocksDB state backend支持異步做檢查點。當一個檢查點被觸發時,state backend在本地創建一個檢查點的副本。在本地副本創建完成后,task繼續它的正常處理。一個后端線程會異步地復制本地快照到遠端存儲,並在它完成檢查點后提醒task。異步檢查點可以顯著地降低一個task從暫停到繼續處理數據,這中間的時間。另外,RocksDB state backend也有增量檢查點的功能,可以減少數據的傳輸量。

另一個用於減少檢查點算法對處理延時影響的技術是:微調barrier排列步驟。若是一個應用需要非常短的延時,並且可以容忍at-least-once 狀態保證。Flink可以被配置為在buffer alignment時對所有到達的記錄做處理,而不是將這些記錄為已經到達的barrier緩存下來。對於一個檢查點,在它所有的barriers都到達后,operator為它的狀態做檢查點,現在這里可能也會包括:本應屬於下一個檢查點的recordsstate 做的修改。在錯誤發生時,這些records會被再次處理,也就是說,這里檢查點提供的是at-least-once 一致性保證,而不是excatly-once 一致性保證。

 

保存點(Savepoints

Flink的恢復算法是基於state檢查點。檢查點是定期執行並且會根據配置的策略自動丟棄。因為檢查點的目的是用於確保一個application可以在錯誤發生時,自動恢復並重啟,所以在一個application被明確地取消(終止)后,檢查點也會被刪除。然而,state的一致性快照除了用於錯誤恢復,也可以用於很多其他地方。

Flink其中一個非常有價值並很有特點的功能是保存點(savepoints)。原理上,保存點與檢查點用的是相同的算法創建的,所以保存點其實就是:檢查點加上一些額外的元數據。Flink不會自動做一個保存點,所以一個用戶(或是外部調度器)需要明確地觸發創建保存點。Flink也不會自動清理保存點。更多有關觸發與清除保存點的操作會在之后的章節里詳述。

 

使用保存點

給定一個application與一個與它兼容的保存點,我們可以從此保存點啟動一個application。這個可以將應用的state初始化為保存點的state,並且讓application從保存點被創建時地地方開始運行。這個行為看起來與應用從故障恢復其實是完全一致的,所以故障恢復實際上僅僅只是一個特殊的用法。使用保存點可以在同一個集群上以同樣的配置啟動同樣的application。從一個保存點啟動一個application可以讓你做更多的事情。

·       你可以啟動從保存點啟動一個不同的但是兼容的應用。所以,你可以修復應用邏輯中的bug,並盡可能多的重新處理流的數據源中的事件,以修復(repair)應用的結果。被修改的application也可以用於跑A/B 測試,或是不同的業務邏輯測試。需要注意的是:application與保存點必須是兼容的,也就是說,application必須能夠從保存點中加載state

·       可以啟動同樣一個application,使用不同的並行度並對application做擴展或是縮容

·       可以在一個不同的集群上啟動同一個應用。這個可以允許你遷移一個應用到一個更新的Flink版本,或是一個不同的集群

·       可以使用保存點暫停一個應用,並在之后恢復它。這個可以使得釋放資源給更優先的應用變得可能,或是數據數據並不是持續提供的

·       可以做一個檢查點用於版本,並將一個應用的state歸檔

因為保存點是一個非常強大的功能,許多用戶會定期做檢查點,以讓應用可以及時的還原到之前的某個時間點。我們見過的一個較為有趣的使用方法是,使用保存點,不斷地將一個流應用遷移到更省成本的集群中去。

 

從一個保存點啟動一個應用

所有上面提到的保存點的使用場景,均遵循同樣的模式。首先,為一個運行中的應用做保存點,然后使用它的state啟動一個應用。在這節,我們會描述:從一個保存點啟動一個應用時,Flink如何初始化它的state

一個應用由多個operators組成。每個operator可以定義一個或多個keyed以及operator statesOperators以(一個或多個)task並行的方式執行。所以,一個典型的application包含多個state,這些state的分布跨越多個(可能執行在不同TaskManager進程中的)operator tasks

下圖顯示了一個有三個operators的應用,每個operator 有兩個tasksOperator 1 有一個operator stateOS-1)。另一個operatorOP-2)有兩個keyed statesKS-1 KS-2)。當一個保存點被制作時,所有tasksstates被復制到一個持久性存儲位置。

                   Figure 3-26. Taking a savepoint from an application and restoring an application from a savepoint

在保存點中的state 副本,以operator標識符和state名字的方式,進行組織管理。Operator標識符與state 名字是必須的,它們用於將一個保存點的states數據映射到一個正在啟動應用中的operatorsstates。當一個應用程序從一個保存點啟動后,Flink將保存點的數據重新分布到對應的operatorstasks中。

需要注意的是,保存點並不包含operator tasks 的信息。因為當一個應用以不同的並行度啟動時,它的tasks數量是可能會變的。我們在之前已經討論過Flink擴展有狀態operators時使用的策略。

如果一個修改后的應用是從一個保存點啟動的,則在保存點中的一個state,只能被映射到包含了對應operator標識符和state 名字的一個operator的應用中。默認情況下,Flink會分配獨一無二的operator標識符。然而,一個operator的標志符是基於它上游operators的標志符,確定性地生成的。所以,當一個operators的上游operators有變動時,它本身的標志符也會改變。例如,當一個operator加入或是被移除。事實上,使用默認operator 標志符的application,在如何不丟失狀態的情況下進行更新的場景下,能力是非常有限的。所以我們強烈建議手動為operators分配獨一無二的標志符,而不是依賴於Flink默認的分配。我們會在之后的章節詳細討論此方法。

 

總結

在這章我們討論的Flinkhigh-level架構,以及它內部的網絡棧,事件-時間處理模型,狀態管理,以及故障恢復機制。這些信息在設計高級流模型、建立配置集群、操作流應用、以及推算性能時,都會十分有用的


References

Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM