本文是博主閱讀Flink官方文檔以及《Flink基礎教程》后結合自己理解所寫,若有表達有誤的地方歡迎大伙留言指出。
1. 前言
流式計算分為有狀態和無狀態兩種情況,所謂狀態就是計算過程中的中間值。對於無狀態計算,會獨立觀察每個獨立事件,並根據最后一個事件輸出結果。什么意思?大白話舉例:對於一個流式系統,接受到一系列的數字,當數字大於N則輸出,這時候在此之前的數字的值、和等情況,壓根不關心,只和最后這個大於N的數字相關,這就是無狀態計算。什么是有狀態計算了?想求過去一分鍾內所有數字的和或者平均數等,這種需要保存中間結果的情況是有狀態的計算。
當分布式計系統中引入狀態計算時,就無可避免一致性的問題。為什么了?因為若是計算過程中出現故障,中間數據咋辦了?若是不保存,那就只能重新從頭計算了,不然怎么保證計算結果的正確性。這就是要求系統具有容錯性了。
2. 一致性
談到容錯性,就沒法避免一致性這個概念。所謂一致性就是:成功處理故障並恢復之后得到的結果與沒有發生任何故障是得到的結果相比,前者的正確性。換句大白話,就是故障的發生是否影響得到的結果。在流處理過程,一致性分為3個級別[1]:
- at-most-once:至多一次。故障發生之后,計算結果可能丟失,就是無法保證結果的正確性;
- at-least-once:至少一次。計算結果可能大於正確值,但絕不會小於正確值,就是計算程序發生故障后可能多算,但是絕不可能少算;
- exactly-once:精確一次。系統保證發生故障后得到的計算結果的值和正確值一致;
Flink的容錯機制保證了exactly-once,也可以選擇at-least-once。Flink的容錯機制是通過對數據流不停的做快照(snapshot)實現的。針對FLink的容錯機制需要注意的是:要完全保證exactly-once,Flink的數據源系統需要有“重放”功能,什么意思了?且聽下面慢慢道來。
3. 檢查點(Checkpoint)
Flink做快照的過程是基於“輕量級異步快照”的算法,其核心思想就是在計算過程中保存中間狀態和在數據流中對應的位置,至於如何實現的會后續的博客中會詳細說明。這些保存的信息(快照)就相當於是系統的檢查點(checkpoint)(類似於window系統發生死機等問題時恢復系統到某個時間點的恢復點),做snapshot也是做一個checkpoint。在系統故障恢復時,系統會從最新的一個checkpoint開始重新計算,對應的數據源也會在對應的位置“重放“。這里的“重放”可能會導致數據的二次輸出,這點的處理也在后續的博客中說明。
3.1 屏障(Barriers)
在Flink做分布式快照過程中核心一個元素Barriers的使用。這些Barriers是在數據接入到Flink之初就注入到數據流中,並隨着數據流向每個算子(operator,這里所說的算子不是指類似map()等具體意義上個的,指在JobGraph中優化后的“頂點”),這里需要說明的有兩點:
- 算子對Barriers是免疫的,即Barriers是不參與計算的;
- Barriers和數據的相對位置是保持不變的,而且Barriers之間是線性遞增的;
如下圖所示,Barriers將將數據流分成了一個個數據集。值得提醒的是,當barriers流經算子時,會觸發與checkpoint相關的行為,保存的barriers的位置和狀態(中間計算結果)。
Update:checkpoint是由JobManager中的CheckpointCoordinator周期性觸發,然后在Task側生成barrier,具體為:在Source task(TaskManager中)中barrier會根據命令周期性的在原始數據中注入barrier,而對非source task則是遇到barrier做checkpoint,即非source task其做checkpoint的時間間隔也許不是周期的,影響因素較多。此外,每個算子做checkpoint的方式也許不同。
可以打個比方,在河上有個大壩(相當於算子),接上級通知(Flink中的JobManager)要統計水流量等信息,所以有人在上游定期(source task)放一根木頭(barrier)到河中,當第一木頭遇到大壩時,大壩就記下通過大壩木頭的位置、水流量等相關情況,即做checkpoint(實際生活中不太可能),當第二木頭遇到大壩時記下第一個木頭和第二根木頭之間的水流量等情況,不需要重開始計算。這里先不管故障了,不然就不好解釋相同的水的“重放”問題了。
當一個算子有多個數據源時,又如何做checkpoint了?
如下圖,從左往右一共4副圖。當算子收到其中一個數據源的barriers,而未收到另一個數據源的barriers時(如左1圖),會將先到barriers的數據源中的數據先緩沖起來,等待另一個barriers(如左2圖),當收到兩個barriers(如左3圖)即接收到全部數據源的barrier時,會做checkpoint,保存barriers位置和狀態,發射緩沖中的數據,釋放一個對應的barriers。這里需要注意是,當緩存中數據沒有被發射完時,是不會處理后續數據的,這樣是為了保證數據的有序性。
這里其實有一點需要注意的是,因為系統設置checkpoint的方式是通過時間間隔的形式(enableCheckpointing(intervalTime)
),所以會導致一個問題:當一個checkpoint所需時間遠大於兩次checkpoint之間的時間間隔時,就很有可能會導致后續的checkpoint會失敗,若是這樣情況比較嚴重時會導致任務失敗,這樣Flink系統的容錯性的優勢就等不到保證了,所以需要合理設計checkpoint間隔時間。
3.2 狀態(State)
如下圖所示,在一次snapshot中,算子會在接受到其數據源的所有barriers的以后snapshot它們的狀態,然后在發射barriers到輸出流中,直到最后所有的sink算子都完成snapshot才算完成一次snapshot。其中,在准備發射的barriers形成之前,state 形式是可以改變的,之后就不可以了。state的存貯方式是可以配置的,如HDFS,默認是在JobManager的內存中。
3.3 異步快照(asynchronous state snapshot)
上述描述中,需要等待算子接收到所有barriers后,開始做snapshot,存儲對應的狀態后,再進行下一次snapshot,其狀態的存儲是同步的,這樣可能會造成因snapshot引起較大延時。可以讓算子在存儲快照時繼續處理數據,讓快照存儲異步在后台運行。為此,算子必須能生成一個 state 對象,保證后續狀態的修改不會改變這個 state 對象。例如 RocksDB 中使用的 copy-on-write(寫時復制)類型的數據結構,即異步狀態快照。對異步狀態快照,其可以讓算子接受到barriers后開始在后台異步拷貝其狀態,而不必等待所有的barriers的到來。一旦后台的拷貝完成,將會通知JobManager。只有當所有的sink接收到這個barriers,和所有的有狀態的算子都確認完成狀態的備份時,一次snapshot才算完成。如何實現的,這點后續博客將仔細分析。
Ref:
[1]《Flink基礎教程》
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html