Flink 檢查點(checkpoint)


Flink具體如何保證exactly-once呢? 它使用一種被稱為"檢查點"(checkpoint)的特性,在出現故障時將系統重置回正確狀態。下面通過簡單的類比來解釋檢查點的作用。

假設你和兩位朋友正在數項鏈上有多少顆珠子,如下圖所示。你捏住珠子,邊數邊撥,每撥過一顆珠子就給總數加一。你的朋友也這樣數他們手中的珠子。當你分神忘記數到哪里時,怎么辦呢? 如果項鏈上有很多珠子,你顯然不想從頭再數一遍,尤其是當三人的速度不一樣卻又試圖合作的時候,更是如此(比如想記錄前一分鍾三人一共數了多少顆珠子,回想一下一分鍾滾動窗口)。

於是,你想了一個更好的辦法: 在項鏈上每隔一段就松松地系上一根有色皮筋,將珠子分隔開; 當珠子被撥動的時候,皮筋也可以被撥動; 然后,你安排一個助手,讓他在你和朋友撥到皮筋時記錄總數。用這種方法,當有人數錯時,就不必從頭開始數。相反,你向其他人發出錯誤警示,然后你們都從上一根皮筋處開始重數,助手則會告訴每個人重數時的起始數值,例如在粉色皮筋處的數值是多少。

Flink檢查點的作用就類似於皮筋標記。數珠子這個類比的關鍵點是: 對於指定的皮筋而言,珠子的相對位置是確定的; 這讓皮筋成為重新計數的參考點。總狀態(珠子的總數)在每顆珠子被撥動之后更新一次,助手則會保存與每根皮筋對應的檢查點狀態,如當遇到粉色皮筋時一共數了多少珠子,當遇到橙色皮筋時又是多少。當問題出現時,這種方法使得重新計數變得簡單。

1、Flink的檢查點算法

Flink檢查點的核心作用是確保狀態正確,即使遇到程序中斷,也要正確。記住這一基本點之后,Flink為用戶提供了用來定義狀態的工具。

 1 val dataDS: DataStream[String] = env.readTextFile("input/data.txt")
 2 
 3 val mapDS: DataStream[(String, String, String)] = dataDS.map(data => {
 4     val datas = data.split(",")
 5     (datas(0), datas(1), datas(2))
 6 })
 7 val keyDS: KeyedStream[(String, String, String), Tuple] = mapDS.keyBy(0)
 8 
 9 keyDS.mapWithState{
10     case ( t, buffer ) => {
11         (t, buffer)
12     }
13 }

 

我們用一個例子來看檢查點是如何運行的:

以下這個Scala程序按照輸入記錄的第一個字段(一個字符串)進行分組並維護第三個字段的計數狀態.

 1 env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
 2 
 3 val dataDS: DataStream[String] = env.readTextFile("input/data.txt")
 4 
 5 val mapDS: DataStream[(String, String, Int)] = dataDS.map(data => {
 6     val datas = data.split(",")
 7     (datas(0), datas(1), datas(2).toInt)
 8 })
 9 val keyDS: KeyedStream[(String, String, Int), Tuple] = mapDS.keyBy(0)
10 
11 val mapStateDS = keyDS.mapWithState[(String, String, Int), Int] {
12     case (t:(String, String, Int), buffer:Option[Int]) => {
13         val i: Int = buffer.getOrElse(0)
14         println("buffer>>>" + i)
15         (t, Option(t._3 + i))
16     }
17 }
18 mapStateDS.print()

 

該程序有兩個算子: keyBy算子用來將記錄按照第一個元素(一個字符串)進行分組,根據該key將數據進行重新分區,然后將記錄再發送給下一個算子: 有狀態的map算子(mapWithState)。map算子在接收到每個元素后,將輸入記錄的第三個字段的數據加到現有總數中,再將更新過的元素發送出去。下圖表示程序的初始狀態: 輸入流中的6條記錄被檢查點分割線(checkpoint barrier)隔開,所有的map算子狀態均為0(計數還未開始)。所有key為sensor_1的記錄將被頂層的map算子處理,所有key為b的記錄將被中間層的map算子處理,所有key為c的記錄則將被底層的map算子處理。

上圖是程序的初始狀態。注意,a、b、c三組的初始計數狀態都是0,即三個圓柱上的值。ckpt表示檢查點分割線(checkpoint barriers)。每條記錄在處理順序上嚴格地遵守在檢查點之前或之后的規定,例如["b",2]在檢查點之前被處理,["a",2]則在檢查點之后被處理。

當該程序處理輸入流中的6條記錄時,涉及的操作遍布3個並行實例(節點、CPU內核等)。那么,檢查點該如何保證exactly-once呢?

檢查點分割線和普通數據記錄類似。它們由算子處理,但並不參與計算,而是會觸發與檢查點相關的行為。當讀取輸入流的數據源(在本例中與keyBy算子內聯)遇到檢查點屏障時,它將其在輸入流中的位置保存到持久化存儲中。如果輸入流來自消息傳輸系統(Kafka),這個位置就是偏移量。Flink的存儲機制是插件化的,持久化存儲可以是分布式文件系統,如HDFS。下圖展示了這個過程。

當Flink數據源(在本例中與keyBy算子內聯)遇到檢查點分界線(barrier)時,它會將其在輸入流中的位置保存到持久化存儲中。這讓 Flink可以根據該位置重啟。

檢查點像普通數據記錄一樣在算子之間流動。當map算子處理完前3條數據並收到檢查點分界線時,它們會將狀態以異步的方式寫入持久化存儲,如下圖所示。

位於檢查點之前的所有記錄(["b",2]、["b",3]和["c",1])被map算子處理之后的情況。此時,持久化存儲已經備份了檢查點分界線在輸入流中的位置(備份操作發生在barrier被輸入算子處理的時候)。map算子接着開始處理檢查點分界線,並觸發將狀態異步備份到穩定存儲中這個動作。

當map算子的狀態備份和檢查點分界線的位置備份被確認之后,該檢查點操作就可以被標記為完成,如下圖所示。我們在無須停止或者阻斷計算的條件下,在一個邏輯時間點(對應檢查點屏障在輸入流中的位置)為計算狀態拍了快照。通過確保備份的狀態和位置指向同一個邏輯時間點,后文將解釋如何基於備份恢復計算,從而保證exactly-once。值得注意的是,當沒有出現故障時,Flink檢查點的開銷極小,檢查點操作的速度由持久化存儲的可用帶寬決定。回顧數珠子的例子: 除了因為數錯而需要用到皮筋之外,皮筋會被很快地撥過。

檢查點操作完成,狀態和位置均已備份到穩定存儲中。輸入流中的所有數據記錄都已處理完成。值得注意的是,備份的狀態值與實際的狀態值是不同的。備份反映的是檢查點的狀態。

如果檢查點操作失敗,Flink可以丟棄該檢查點並繼續正常執行,因為之后的某一個檢查點可能會成功。雖然恢復時間可能更長,但是對於狀態的保證依舊很有力。只有在一系列連續的檢查點操作失敗之后,Flink才會拋出錯誤,因為這通常預示着發生了嚴重且持久的錯誤。 現在來看看下圖所示的情況: 檢查點操作已經完成,但故障緊隨其后。

 

在這種情況下,Flink會重新拓撲(可能會獲取新的執行資源),將輸入流倒回到上一個檢查點,然后恢復狀態值並從該處開始繼續計算。在本例中,["a",2]、["a",2]和["c",2]這幾條記錄將被重播。

下圖展示了這一重新處理過程。從上一個檢查點開始重新計算,可以保證在剩下的記錄被處理之后,得到的map算子的狀態值與沒有發生故障時的狀態值一致。

Flink將輸入流倒回到上一個檢查點屏障的位置,同時恢復map算子的狀態值。然后,Flink從此處開始重新處理。這樣做保證了在記錄被處理之后,map算子的狀態值與沒有發生故障時的一致。

Flink檢查點算法的正式名稱是異步分界線快照(asynchronous barrier snapshotting)。該算法大致基於Chandy-Lamport分布式快照算法

檢查點是Flink最有價值的創新之一,因為它使Flink可以保證exactly-once,並且不需要犧牲性能

2、Flink+Kafka如何實現端到端的Exactly-once語義

我們知道,端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?

  • 內部 —— 利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性
  • source —— kafka consumer作為source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性
  • sink —— kafka producer作為sink,采用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction

Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。

當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在算子間傳遞下去。

每個算子會對當前的狀態做個快照,保存到狀態后端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。

每個內部的 transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里。

sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務(還不能被消費);當遇到 barrier 時,把狀態保存到狀態后端,並開啟新的預提交事務。

當所有算子任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。

當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為“已確認”,數據就真正可以被消費了。

執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。

具體的兩階段提交步驟總結如下:

(1)第一條數據來了之后,開啟一個Kafka的事務(Transaction),正常寫入Kafka分區日志,但是標記未提交,這就是“預提交”

(2)JobManager觸發checkpoint操作,barrier從source開始向下傳遞,遇到barrier的算子將狀態存入狀態后端,並通知JobManager

(3)Sink連機器收到barrier,保存當前狀態,存入checkpoint,通知JobManager,並開啟下一階段的事務,用於提交下一個檢查點的數據

(4)JobManager收到所有任務的通知,發出確認信息,表示checkpoint完成

(5)sink任務收到JobManager的確認信息,正式提交這段時間的數據

(6)外部Kafka關閉事務,提交的數據可以正常消費

以下來自:官網:end-to-end-exactly-once-apache-flink

Let’s discuss how to extend a TwoPhaseCommitSinkFunction on a simple file-based example. We need to implement only four methods and present their implementations for an exactly-once file sink:

  1. beginTransaction - to begin the transaction, we create a temporary file in a temporary directory on our destination file system. Subsequently, we can write data to this file as we process it.
  2. preCommit - on pre-commit, we flush the file, close it, and never write to it again. We’ll also start a new transaction for any subsequent writes that belong to the next checkpoint.
  3. commit - on commit, we atomically move the pre-committed file to the actual destination directory. Please note that this increases the latency in the visibility of the output data.
  4. abort - on abort, we delete the temporary file.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM