Flink具體如何保證exactly-once呢? 它使用一種被稱為"檢查點"(checkpoint)的特性,在出現故障時將系統重置回正確狀態。下面通過簡單的類比來解釋檢查點的作用。
假設你和兩位朋友正在數項鏈上有多少顆珠子,如下圖所示。你捏住珠子,邊數邊撥,每撥過一顆珠子就給總數加一。你的朋友也這樣數他們手中的珠子。當你分神忘記數到哪里時,怎么辦呢? 如果項鏈上有很多珠子,你顯然不想從頭再數一遍,尤其是當三人的速度不一樣卻又試圖合作的時候,更是如此(比如想記錄前一分鍾三人一共數了多少顆珠子,回想一下一分鍾滾動窗口)。
於是,你想了一個更好的辦法: 在項鏈上每隔一段就松松地系上一根有色皮筋,將珠子分隔開; 當珠子被撥動的時候,皮筋也可以被撥動; 然后,你安排一個助手,讓他在你和朋友撥到皮筋時記錄總數。用這種方法,當有人數錯時,就不必從頭開始數。相反,你向其他人發出錯誤警示,然后你們都從上一根皮筋處開始重數,助手則會告訴每個人重數時的起始數值,例如在粉色皮筋處的數值是多少。
Flink檢查點的作用就類似於皮筋標記。數珠子這個類比的關鍵點是: 對於指定的皮筋而言,珠子的相對位置是確定的; 這讓皮筋成為重新計數的參考點。總狀態(珠子的總數)在每顆珠子被撥動之后更新一次,助手則會保存與每根皮筋對應的檢查點狀態,如當遇到粉色皮筋時一共數了多少珠子,當遇到橙色皮筋時又是多少。當問題出現時,這種方法使得重新計數變得簡單。
1、Flink的檢查點算法
Flink檢查點的核心作用是確保狀態正確,即使遇到程序中斷,也要正確。記住這一基本點之后,Flink為用戶提供了用來定義狀態的工具。
1 val dataDS: DataStream[String] = env.readTextFile("input/data.txt") 2 3 val mapDS: DataStream[(String, String, String)] = dataDS.map(data => { 4 val datas = data.split(",") 5 (datas(0), datas(1), datas(2)) 6 }) 7 val keyDS: KeyedStream[(String, String, String), Tuple] = mapDS.keyBy(0) 8 9 keyDS.mapWithState{ 10 case ( t, buffer ) => { 11 (t, buffer) 12 } 13 }
我們用一個例子來看檢查點是如何運行的:
以下這個Scala程序按照輸入記錄的第一個字段(一個字符串)進行分組並維護第三個字段的計數狀態.
1 env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE) 2 3 val dataDS: DataStream[String] = env.readTextFile("input/data.txt") 4 5 val mapDS: DataStream[(String, String, Int)] = dataDS.map(data => { 6 val datas = data.split(",") 7 (datas(0), datas(1), datas(2).toInt) 8 }) 9 val keyDS: KeyedStream[(String, String, Int), Tuple] = mapDS.keyBy(0) 10 11 val mapStateDS = keyDS.mapWithState[(String, String, Int), Int] { 12 case (t:(String, String, Int), buffer:Option[Int]) => { 13 val i: Int = buffer.getOrElse(0) 14 println("buffer>>>" + i) 15 (t, Option(t._3 + i)) 16 } 17 } 18 mapStateDS.print()
該程序有兩個算子: keyBy算子用來將記錄按照第一個元素(一個字符串)進行分組,根據該key將數據進行重新分區,然后將記錄再發送給下一個算子: 有狀態的map算子(mapWithState)。map算子在接收到每個元素后,將輸入記錄的第三個字段的數據加到現有總數中,再將更新過的元素發送出去。下圖表示程序的初始狀態: 輸入流中的6條記錄被檢查點分割線(checkpoint barrier)隔開,所有的map算子狀態均為0(計數還未開始)。所有key為sensor_1的記錄將被頂層的map算子處理,所有key為b的記錄將被中間層的map算子處理,所有key為c的記錄則將被底層的map算子處理。
上圖是程序的初始狀態。注意,a、b、c三組的初始計數狀態都是0,即三個圓柱上的值。ckpt表示檢查點分割線(checkpoint barriers)。每條記錄在處理順序上嚴格地遵守在檢查點之前或之后的規定,例如["b",2]在檢查點之前被處理,["a",2]則在檢查點之后被處理。
當該程序處理輸入流中的6條記錄時,涉及的操作遍布3個並行實例(節點、CPU內核等)。那么,檢查點該如何保證exactly-once呢?
檢查點分割線和普通數據記錄類似。它們由算子處理,但並不參與計算,而是會觸發與檢查點相關的行為。當讀取輸入流的數據源(在本例中與keyBy算子內聯)遇到檢查點屏障時,它將其在輸入流中的位置保存到持久化存儲中。如果輸入流來自消息傳輸系統(Kafka),這個位置就是偏移量。Flink的存儲機制是插件化的,持久化存儲可以是分布式文件系統,如HDFS。下圖展示了這個過程。
當Flink數據源(在本例中與keyBy算子內聯)遇到檢查點分界線(barrier)時,它會將其在輸入流中的位置保存到持久化存儲中。這讓 Flink可以根據該位置重啟。
檢查點像普通數據記錄一樣在算子之間流動。當map算子處理完前3條數據並收到檢查點分界線時,它們會將狀態以異步的方式寫入持久化存儲,如下圖所示。
位於檢查點之前的所有記錄(["b",2]、["b",3]和["c",1])被map算子處理之后的情況。此時,持久化存儲已經備份了檢查點分界線在輸入流中的位置(備份操作發生在barrier被輸入算子處理的時候)。map算子接着開始處理檢查點分界線,並觸發將狀態異步備份到穩定存儲中這個動作。
當map算子的狀態備份和檢查點分界線的位置備份被確認之后,該檢查點操作就可以被標記為完成,如下圖所示。我們在無須停止或者阻斷計算的條件下,在一個邏輯時間點(對應檢查點屏障在輸入流中的位置)為計算狀態拍了快照。通過確保備份的狀態和位置指向同一個邏輯時間點,后文將解釋如何基於備份恢復計算,從而保證exactly-once。值得注意的是,當沒有出現故障時,Flink檢查點的開銷極小,檢查點操作的速度由持久化存儲的可用帶寬決定。回顧數珠子的例子: 除了因為數錯而需要用到皮筋之外,皮筋會被很快地撥過。
檢查點操作完成,狀態和位置均已備份到穩定存儲中。輸入流中的所有數據記錄都已處理完成。值得注意的是,備份的狀態值與實際的狀態值是不同的。備份反映的是檢查點的狀態。
如果檢查點操作失敗,Flink可以丟棄該檢查點並繼續正常執行,因為之后的某一個檢查點可能會成功。雖然恢復時間可能更長,但是對於狀態的保證依舊很有力。只有在一系列連續的檢查點操作失敗之后,Flink才會拋出錯誤,因為這通常預示着發生了嚴重且持久的錯誤。 現在來看看下圖所示的情況: 檢查點操作已經完成,但故障緊隨其后。
在這種情況下,Flink會重新拓撲(可能會獲取新的執行資源),將輸入流倒回到上一個檢查點,然后恢復狀態值並從該處開始繼續計算。在本例中,["a",2]、["a",2]和["c",2]這幾條記錄將被重播。
下圖展示了這一重新處理過程。從上一個檢查點開始重新計算,可以保證在剩下的記錄被處理之后,得到的map算子的狀態值與沒有發生故障時的狀態值一致。
Flink將輸入流倒回到上一個檢查點屏障的位置,同時恢復map算子的狀態值。然后,Flink從此處開始重新處理。這樣做保證了在記錄被處理之后,map算子的狀態值與沒有發生故障時的一致。
Flink檢查點算法的正式名稱是異步分界線快照(asynchronous barrier snapshotting)
。該算法大致基於Chandy-Lamport分布式快照算法。
檢查點是Flink最有價值的創新之一,因為它使Flink可以保證exactly-once,並且不需要犧牲性能。
2、Flink+Kafka如何實現端到端的Exactly-once語義
我們知道,端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?
- 內部 —— 利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性
- source —— kafka consumer作為source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性
- sink —— kafka producer作為sink,采用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction
Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。
當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在算子間傳遞下去。
每個算子會對當前的狀態做個快照,保存到狀態后端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。
每個內部的 transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里。
sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務(還不能被消費);當遇到 barrier 時,把狀態保存到狀態后端,並開啟新的預提交事務。
當所有算子任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。
當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為“已確認”,數據就真正可以被消費了。
執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
具體的兩階段提交步驟總結如下:
(1)第一條數據來了之后,開啟一個Kafka的事務(Transaction),正常寫入Kafka分區日志,但是標記未提交,這就是“預提交”
(2)JobManager觸發checkpoint操作,barrier從source開始向下傳遞,遇到barrier的算子將狀態存入狀態后端,並通知JobManager
(3)Sink連機器收到barrier,保存當前狀態,存入checkpoint,通知JobManager,並開啟下一階段的事務,用於提交下一個檢查點的數據
(4)JobManager收到所有任務的通知,發出確認信息,表示checkpoint完成
(5)sink任務收到JobManager的確認信息,正式提交這段時間的數據
(6)外部Kafka關閉事務,提交的數據可以正常消費
以下來自:官網:end-to-end-exactly-once-apache-flink
Let’s discuss how to extend a TwoPhaseCommitSinkFunction
on a simple file-based example. We need to implement only four methods and present their implementations for an exactly-once file sink:
beginTransaction -
to begin the transaction, we create a temporary file in a temporary directory on our destination file system. Subsequently, we can write data to this file as we process it.preCommit -
on pre-commit, we flush the file, close it, and never write to it again. We’ll also start a new transaction for any subsequent writes that belong to the next checkpoint.commit -
on commit, we atomically move the pre-committed file to the actual destination directory. Please note that this increases the latency in the visibility of the output data.abort -
on abort, we delete the temporary file.