當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法,也就是說在成功處理故障並恢復之后得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登錄的用戶計數。在系統經歷故障之后,計數結果是多少?如果有偏差,是有漏掉的計數還是重復計數?
一致性級別
在流處理中,一致性可以分為3個級別:
- at-most-once: 這其實是沒有正確性保障的委婉說法——故障發生之后,計數結果可能丟失。同樣的還有udp。
- at-least-once: 這表示計數結果可能大於正確值,但絕不會小於正確值。也就是說,計數程序在發生故障后可能多算,但是絕不會少算。
- exactly-once: 這指的是系統保證在發生故障后得到的計數結果與正確值一致。
Flink的一個重大價值在於,它既保證了exactly-once,也具有低延遲和高吞吐的處理能力。
端到端(end-to-end)狀態一致性
目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在 Flink 流處理器內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如 Kafka)和輸出到持久化系統。
端到端的一致性保證,意味着結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決於所有組件中一致性最弱的組件。具體可以划分如下:
- 內部保證 —— 依賴checkpoint
- source 端 —— 需要外部源可重設數據的讀取位置
- sink 端 —— 需要保證從故障恢復時,數據不會重復寫入外部系統
而對於sink端,又有兩種具體的實現方式:冪等(Idempotent)寫入和事務性(Transactional)寫入。
- 冪等寫入
所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,后面再重復執行就不起作用了。 - 事務寫入
需要構建事務來寫入外部系統,構建的事務對應着 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中。
不同Source和Sink的一致性保證可用下表說明:
檢查點
檢查點的代碼實踐
public class CheckpointApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟checkpoint
/**
* 不開啟checkpoint: 不重啟
* 配置了重啟策略: 使用配置的重啟策略
* 1. 使用默認的重啟策略: Integer.MAX_VALUE
* 2. 配置了重啟策略, 使用配置的重啟策略覆蓋默認的
*
* 重啟策略的配置:
* 1. code
* 2. yaml
*/
env.enableCheckpointing(5000);
// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 作業完成后是否保留
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 設置狀態后端
config.setCheckpointStorage("file:////Users/carves/workspace/imook-flink");
// 自定義設置我們需要的重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts, 正常運行之后,進入錯誤再運行的次數
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
source.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value.contains("pk")) {
throw new RuntimeException("PK pk test!");
} else {
return value.toLowerCase();
}
}
}).flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] splits = value.split(",");
for (String split:
splits) {
out.collect(split);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}).keyBy(value -> value.f0)
.sum(1)
.print();
env.execute("CheckpointApp");
}
}
檢查點算法:
Flink檢查點算法的正式名稱是異步分界線快照(asynchronous barrier snapshotting)。該算法大致基於Chandy-Lamport分布式快照算法。
檢查點是Flink最有價值的創新之一,因為它使Flink可以保證exactly-once,並且不需要犧牲性能。
Flink + Kafka 實現exactly once 語義
我們知道,端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性
- source —— kafka consumer作為source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性
- sink —— kafka producer作為sink,采用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction
內部的checkpoint機制我們已經有了了解,那source和sink具體又是怎樣運行的呢?接下來我們逐步做一個分析。
我們知道Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。
2階段提交
執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在算子間傳遞下去。
每個算子會對當前的狀態做個快照,保存到狀態后端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。
具體的兩階段提交步驟總結如下:第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交,這就是“預提交”。jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,並通知 jobmanager。sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據。jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成。sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據。外部kafka關閉事務,提交的數據可以正常消費了。
2階段提交步驟
- 第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交,這就是“預提交”jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,並通知 jobmanager
- sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據
- jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成
- sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據
- 外部kafka關閉事務,提交的數據可以正常消費了。