Checkpoint
checkpoint是Flink容錯的核心機制。它可以定期的將各個Operator處理的數據進行快照存儲(Snapshot)。
如果Flink程序出現宕機,可以重新從這些快照中恢復數據。
Flink容錯機制的核心就是持續創建分布式數據流及其狀態的一致快照。Flink的checkpoint是通過分布式快照實現的,
所以在flink中這兩個詞是一個意思。
checkpoint用來保證任務的錯誤恢復。任務失敗可以從最新的checkpoint恢復。
checkpoint機制需要一個可靠的可以回放數據的數據源(kafka,RabbitMQ,HDFS...)和一個存放state的持久存儲(hdfs,S3)
1、checkpointConfig
通過調用StreamExecutionEnvironment.enableCheckpointing(internal,mode) 啟用checkpoint 。
internal 默認是 -1,表示checkpoint不開啟,mode默認是EXACTLY_ONCE模式。
可設置checkpoint timeout,超過這個時間 checkpoint 沒有成功,checkpoint 終止。默認 10分鍾。
可設置 chekpoint 失敗任務是否任務也失敗,默認是true
可設置同時進行的checkpoint數量,默認是1.
2、barrier
將barrier插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier不會干擾正常數據,數據流嚴格有序。
一個barrier把數據流分割成兩部分:一部分進入到當前快照,另一部分進入到下一個快照。
每一個barrier都帶有快照ID,並且barrier之前的數據都進入了此快照。Barrier不會干擾數據流處理,所以非常輕量。
多個不同快照的多個barrier會在流中同時出現,即多個快照可能同時創建。
Barrier在數據源端插入,當 snapshot 的barrier 插入后,系統會記錄當前snashot 位置值n (用Sn表示。)
例如:在 Apache Kafka中,這個變量表示某個分區中最后一條數據的偏移量。這個位置值Sn 會被發送到一個稱為 checkpoint coordinator的模塊。
然后 barrier 繼續向下移動,當一個 operator 從其輸入流接收到所有標識 snapshot n 的 barrier時,它會向其所有輸入流插入一個標識 snapshot n 的
barrier。當sink operator (DAG流的終點) 從其輸入流接收到所有 barrier n時,它向 the checkpoint coordinator 確認 snapshot n 已完成。
當所有 sink 都確認了這個快照,快照就被標識為完成。
3、
3.1 如何觸發 checkpoint?
3.2 異步儲存快照