Flink的CheckPoint


Checkpoint  

checkpoint是Flink容錯的核心機制。它可以定期的將各個Operator處理的數據進行快照存儲(Snapshot)。

如果Flink程序出現宕機,可以重新從這些快照中恢復數據。

 

Flink容錯機制的核心就是持續創建分布式數據流及其狀態的一致快照。Flink的checkpoint是通過分布式快照實現的,

所以在flink中這兩個詞是一個意思。

checkpoint用來保證任務的錯誤恢復。任務失敗可以從最新的checkpoint恢復。

checkpoint機制需要一個可靠的可以回放數據的數據源(kafka,RabbitMQ,HDFS...)和一個存放state的持久存儲(hdfs,S3)

 

1、checkpointConfig

通過調用StreamExecutionEnvironment.enableCheckpointing(internal,mode) 啟用checkpoint 。

internal 默認是 -1,表示checkpoint不開啟,mode默認是EXACTLY_ONCE模式。

可設置checkpoint timeout,超過這個時間 checkpoint 沒有成功,checkpoint 終止。默認 10分鍾。

可設置 chekpoint 失敗任務是否任務也失敗,默認是true

可設置同時進行的checkpoint數量,默認是1.

2、barrier

將barrier插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier不會干擾正常數據,數據流嚴格有序。

一個barrier把數據流分割成兩部分:一部分進入到當前快照,另一部分進入到下一個快照。

每一個barrier都帶有快照ID,並且barrier之前的數據都進入了此快照。Barrier不會干擾數據流處理,所以非常輕量。

多個不同快照的多個barrier會在流中同時出現,即多個快照可能同時創建。

 

Barrier在數據源端插入,當 snapshot 的barrier 插入后,系統會記錄當前snashot 位置值n (用Sn表示。)

例如:在 Apache Kafka中,這個變量表示某個分區中最后一條數據的偏移量。這個位置值Sn 會被發送到一個稱為 checkpoint coordinator的模塊。

 

然后 barrier 繼續向下移動,當一個 operator 從其輸入流接收到所有標識 snapshot n 的 barrier時,它會向其所有輸入流插入一個標識 snapshot n 的 

barrier。當sink operator (DAG流的終點) 從其輸入流接收到所有 barrier n時,它向 the checkpoint coordinator 確認 snapshot n 已完成。

當所有 sink 都確認了這個快照,快照就被標識為完成。

 

3、

3.1 如何觸發 checkpoint?

3.2 異步儲存快照

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM