Flink如何保證數據的一致性


當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法,也就是說在成功處理故障並恢復之后得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登錄的用戶計數。在系統經歷故障之后,計數結果是多少?如果有偏差,是有漏掉的計數還是重復計數?

一致性級別

在流處理中,一致性可以分為3個級別:

  • at-most-once: 這其實是沒有正確性保障的委婉說法——故障發生之后,計數結果可能丟失。同樣的還有udp。
  • at-least-once: 這表示計數結果可能大於正確值,但絕不會小於正確值。也就是說,計數程序在發生故障后可能多算,但是絕不會少算。
  • exactly-once: 這指的是系統保證在發生故障后得到的計數結果與正確值一致。

Flink的一個重大價值在於,它既保證了exactly-once,也具有低延遲和高吞吐的處理能力

端到端(end-to-end)狀態一致性

目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在 Flink 流處理器內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如 Kafka)和輸出到持久化系統。

端到端的一致性保證,意味着結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決於所有組件中一致性最弱的組件。具體可以划分如下:

  • 內部保證 —— 依賴checkpoint
  • source 端 —— 需要外部源可重設數據的讀取位置
  • sink 端 —— 需要保證從故障恢復時,數據不會重復寫入外部系統

而對於sink端,又有兩種具體的實現方式:冪等(Idempotent)寫入和事務性(Transactional)寫入。

  • 冪等寫入
    所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,后面再重復執行就不起作用了。
  • 事務寫入
    需要構建事務來寫入外部系統,構建的事務對應着 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中。

不同Source和Sink的一致性保證可用下表說明:
不同數據流的一致性

檢查點

檢查點的代碼實踐


public class CheckpointApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 開啟checkpoint
        /**
         * 不開啟checkpoint: 不重啟
         * 配置了重啟策略: 使用配置的重啟策略
         * 1. 使用默認的重啟策略: Integer.MAX_VALUE
         * 2. 配置了重啟策略, 使用配置的重啟策略覆蓋默認的
         *
         * 重啟策略的配置:
         * 1. code
         * 2. yaml
         */
        env.enableCheckpointing(5000);
		// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        // 作業完成后是否保留
        CheckpointConfig config = env.getCheckpointConfig();
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 設置狀態后端
        config.setCheckpointStorage("file:////Users/carves/workspace/imook-flink");

        // 自定義設置我們需要的重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts, 正常運行之后,進入錯誤再運行的次數
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
        source.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value.contains("pk")) {
                    throw new RuntimeException("PK pk test!");
                } else {
                    return value.toLowerCase();
                }
            }
        }).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] splits = value.split(",");
                for (String split:
                     splits) {
                    out.collect(split);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        }).keyBy(value -> value.f0)
                .sum(1)
                .print();
        env.execute("CheckpointApp");
    }
}

檢查點算法:
Flink檢查點算法的正式名稱是異步分界線快照(asynchronous barrier snapshotting)。該算法大致基於Chandy-Lamport分布式快照算法。
檢查點是Flink最有價值的創新之一,因為它使Flink可以保證exactly-once,並且不需要犧牲性能

我們知道,端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性

  • source —— kafka consumer作為source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性
  • sink —— kafka producer作為sink,采用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction
    內部的checkpoint機制我們已經有了了解,那source和sink具體又是怎樣運行的呢?接下來我們逐步做一個分析。
    我們知道Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。

2階段提交

執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在算子間傳遞下去。
2階段提交流程

每個算子會對當前的狀態做個快照,保存到狀態后端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。

具體的兩階段提交步驟總結如下:第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交,這就是“預提交”。jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,並通知 jobmanager。sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據。jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成。sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據。外部kafka關閉事務,提交的數據可以正常消費了。

2階段提交步驟

  1. 第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交,這就是“預提交”jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,並通知 jobmanager
  2. sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據
  3. jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成
  4. sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據
  5. 外部kafka關閉事務,提交的數據可以正常消費了。

state 
checkpointing 
狀態后端
流式數據的處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM