Apache Flink提供了一種容錯機制,可以持續恢復數據流應用程序的狀態。該機制確保即使出現故障,程序的狀態最終也會反映來自數據流的每條記錄(只有一次)。
從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。
state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。
Flink通過定期地做checkpoint來實現容錯和恢復,容錯機制連續繪制了分布式流數據流的快照。對於小狀態的流應用程序,這些快照非常輕量級並且可以經常繪制,而不會對性能產生太大的影響。流應用程序的狀態存儲在一個可配置的地方(例如主節點或HDFS)。
如果出現程序故障(由於機器、網絡或軟件故障),Flink將停止分布式流數據流。然后系統重新啟動操作符並將其重新設置為最新成功的檢查點。輸入流被重置到狀態快照的點。默認情況下,檢查點是禁用的。
要使此機制實現其全部的保證,數據流源(如消息隊列或代理)需要能夠將流倒回到其定義的最近點。Apache Kafka可以做到,而Flink的Kafka連接器可以利用這些。
因為Flink通過分布式檢查點實現快照,我們使用快照和檢查點互換。
checkpointing:
- 檢查點默認情況下不被保留,並且僅用於從失敗中恢復作業。當程序被取消時,檢查點被刪除,你可以配置定期的檢查點使他們得以保留。
- Flink容錯機制的核心部分是繪制分布式數據流和操作符狀態的一致的快照。這些快照充當一致的檢查點,在出現故障時系統可以退回到檢查點。
- Barriers:Flink的分布式快照的核心元素是stream barriers。這些barriers被注入到數據流中和記錄一樣作為數據流的一部分流動。Barriers從不會超過記錄。Barriers將數據流中的記錄分為進入當前快照的記錄集和進入下一個快照的記錄。每個barriers都帶有快照的ID,該快照的記錄在其前面推送。Barriers不會阻斷流的流動。
流barriers被注入到流數據源的並行數據流中,快照n的barriers(我們稱之為Sn)被注入的點是源流中快照覆蓋數據的位置。例如,在Apache Kafka中,此位置是分區中最后一條記錄的偏移量。該位置Sn被報告給Flink的JobManager。然后barriers繼續流動,當中間操作符從其所有輸入流都收到快照n的barriers時,他會向所有輸出流發出(emit)快照n的barriers。一旦操作符接收器(流DAG的末端)從它的所有輸入流接收到barrier n,它就向快照n確認檢查點協調器。在所有接收器確認快照后,它被視為已完成。一旦完成快照n,作業將永遠不再向源請求來自Sn之前的記錄,因為此時這些記錄(及其后代記錄)將通過整個數據流拓撲。
接收多個輸入流的運算符需要在快照barriers上對齊輸入流。上圖說明了這一點:
- 一旦操作員從輸入流接收到快照barriers n,它就不能處理來自該流的任何其他記錄(而是緩存),直到它從其他輸入接收到barrier n為止。否則它會混合屬於快照n和屬於快照n + 1的記錄。(begin aligning - aligning)
- 報告barrier n的流暫時被擱置。從這些流接收的記錄不會被處理,而是放入輸入緩沖區。(aligning)
- 一旦最后一個輸入流接收到barrier n,操作符就會發出所有掛起的傳出記錄,然后自己發出快照n的barriers。(checkpoint - continue)
- 之后,它恢復處理來自所有輸入流的記錄,在處理來自流的記錄之前處理來自輸入緩沖區的記錄。(continue)
- State:當運算符包含任何形式的狀態時,此狀態也必須是快照的一部分。運算符狀態有不同的形式:
- 用戶定義的狀態:這是由轉換函數(如
map()
或filter()
)直接創建和修改的狀態。 - 系統狀態:此狀態是指作為運算符計算一部分的數據緩沖區。此狀態的典型示例是窗口緩沖區,系統在其中收集(和聚合)窗口記錄,直到窗口被評估和逐出。 運算符在他們從輸入流接收到所有快照barriers時,在向其輸出流發出barriers之前立即對其狀態進行快照。此時,將根據barriers之前的記錄對狀態進行所有更新,並且在應用barriers之后不依賴於記錄的更新。由於快照的狀態可能很大,因此它存儲在可配置的狀態后端(state backend)中。默認情況下,這是JobManager的內存,但對於生產使用,應配置分布式可靠存儲(例如HDFS)。在存儲狀態之后,運算符確認檢查點,將快照barriers發送到輸出流中,然后繼續。 生成的快照現在包含:
- 對於每個並行流數據源,啟動快照時流中的偏移/位置。
- 對於每個運算符,指向作為快照的一部分存儲的狀態的指針。
- 僅有一次或至少一次:對齊(alignment)步驟可以增加流式傳輸程序的等待時間。Flink可以在檢查點期間跳過流對齊。一旦運算符看到每個輸入的檢查點barrier,仍然會繪制檢查點快照。當跳過對齊時,即使在檢查點n的某些檢查點barrier到達之后,運算符仍繼續處理所有輸入。這樣,操作員還可以在獲取檢查點n的狀態快照之前處理屬於檢查點n + 1的元素。在還原時,這些記錄將作為重復記錄出現,因為它們都包含在檢查點n的狀態快照中,並將在檢查點n之后作為數據的一部分進行重放。對齊僅適用於具有多個前驅(連接)的運算符以及具有多個發送方的運算符(在流重新分區/隨機播放之后)。正因為如此,即使在至少一次(at least once)模式中,數據流實際上在尷尬的並行流操作(
map()
,flatMap()
,filter()
,...)中給了正好一次(exactly once)保證。
- 異步狀態快照:上述機制意味着運算符在將狀態的快照存儲在狀態后端時停止處理輸入記錄。每次拍攝快照時,此同步狀態快照都會引入延遲。可以讓運算符在存儲狀態快照時繼續處理,有效地讓狀態快照在后台異步發生。為此,運算符必須能夠生成一個狀態對象,該狀態對象應以某種方式存儲,以便對運算符狀態的進一步修改不會影響該狀態對象。 在接收到輸入的檢查點barriers后,運算符啟動其狀態的異步快照復制。它立即釋放其輸出的barriers,並繼續進行常規流處理。后台復制過程完成后,它會向檢查點協調者(JobManager)確認檢查點。檢查點現在僅在所有接收器都已收到barriers並且所有有狀態運算符已確認其完成備份(可能在barriers到達接收器之后)之后才完成。
- 恢復:當失敗時,Flink選擇最新完成的檢查點k。然后,系統重新部署整個分布式數據流,並為每個操作符提供作為檢查點k的一部分的快照的狀態。設置源從位置Sk開始讀取流。例如,在Apache Kafka中,這意味着告訴消費者從偏移量Sk開始提取。如果狀態以遞增方式快照,則運算符從最新完整快照的狀態開始,然后對該狀態應用一系列增量快照進行更新。
- 運算符快照實現:在執行運算符快照時,有同步和異步兩部分。運算符和狀態后端將他們的快照作為一個Java FutureTask。該任務包含已完成的同步部分且處於掛起狀態的異步部分。然后異步部分由該檢查點的后台線程執行。檢查點純粹同步地返回已經完成的運算符
FutureTask,如果需要執行異步操作,則以該
run()
方法執行FutureTask
。任務是可取消的,因此流和其他消耗句柄的資源是可以被釋放的。