
一、前言
有狀態的計算是流處理框架要實現的重要功能,因為稍復雜的流處理場景都需要記錄狀態,然后在新流入數據的基礎上不斷更新狀態。下面的幾個場景都需要使用流處理的狀態功能:
數據流中的數據有重復,想對重復數據去重,需要記錄哪些數據已經流入過應用,當新數據流入時,根據已流入過的數據來判斷去重。
檢查輸入流是否符合某個特定的模式,需要將之前流入的元素以狀態的形式緩存下來。比如,判斷一個溫度傳感器數據流中的溫度是否在持續上升。
對一個時間窗口內的數據進行聚合分析,分析一個小時內某項指標的75分位或99分位的數值。
一個狀態更新和獲取的流程如下圖所示,一個算子子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。一個簡單的例子是對一個時間窗口內輸入流的某個整數字段求和,那么當算子子任務接收到新元素時,會獲取已經存儲在狀態中的數值,然后將當前輸入加到狀態上,並將狀態數據更新。
二、狀態類型
Flink有兩種基本類型的狀態:托管狀態(Managed State)和原生狀態(Raw State)。
兩者的區別:Managed State是由Flink管理的,Flink幫忙存儲、恢復和優化,Raw State是開發者自己管理的,需要自己序列化。
具體區別有:
- 從狀態管理的方式上來說,Managed State由Flink Runtime托管,狀態是自動存儲、自動恢復的,Flink在存儲管理和持久化上做了一些優化。當橫向伸縮,或者說修改Flink應用的並行度時,狀態也能自動重新分布到多個並行實例上。Raw State是用戶自定義的狀態。
- 從狀態的數據結構上來說,Managed State支持了一系列常見的數據結構,如ValueState、ListState、MapState等。Raw State只支持字節,任何上層數據結構需要序列化為字節數組。使用時,需要用戶自己序列化,以非常底層的字節數組形式存儲,Flink並不知道存儲的是什么樣的數據結構。
- 從具體使用場景來說,絕大多數的算子都可以通過繼承Rich函數類或其他提供好的接口類,在里面使用Managed State。Raw State是在已有算子和Managed State不夠用時,用戶自定義算子時使用。
對Managed State繼續細分,它又有兩種類型:Keyed State和Operator State。
為了自定義Flink的算子,可以重寫Rich Function接口類,比如RichFlatMapFunction。使用Keyed State時,通過重寫Rich Function接口類,在里面創建和訪問狀態。對於Operator State,還需進一步實現CheckpointedFunction接口。
2.1、Keyed State
Flink 為每個鍵值維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個算子任務中,這個任務會維護和處理這個key對應的狀態。當任務處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key。因此,具有相同key的所有數據都會訪問相同的狀態。
需要注意的是鍵控狀態只能在 KeyedStream 上進行使用,可以通過 stream.keyBy(...) 來得到 KeyedStream 。
Flink 提供了以下數據格式來管理和存儲鍵控狀態 (Keyed State):
- ValueState:存儲單值類型的狀態。可以使用 update(T) 進行更新,並通過 T value() 進行檢索。
- ListState:存儲列表類型的狀態。可以使用 add(T) 或 addAll(List) 添加元素,update(T)進行更新;並通過 get() 獲得整個列表。
- ReducingState:用於存儲經過 ReduceFunction 計算后的結果,使用 add(T) 增加元素。
- AggregatingState:用於存儲經過 AggregatingState 計算后的結果,使用 add(IN) 添加元素。
- FoldingState:已被標識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。
- MapState:維護 Map 類型的狀態,get獲取,put更新,contains判斷包含,remove移除元素。
public class ListStateDemo extends RichFlatMapFunction<Tuple2<String, Long>,List<Tuple2<String, Long>>>{
private transient ListState<Tuple2<String, Long>> listState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
"listState",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
);
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
List<Tuple2<String, Long>> currentListState = Lists.newArrayList(listState.get().iterator());
currentListState.add(value);
listState.update(currentListState);
out.collect(currentListState);
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
Tuple2.of("a", 50L),Tuple2.of("a", 60L),Tuple2.of("a", 70L),
Tuple2.of("b", 50L),Tuple2.of("b", 60L),Tuple2.of("b", 70L),
Tuple2.of("c", 50L),Tuple2.of("c", 60L),Tuple2.of("c", 70L)
);
dataStream
.keyBy(0)
.flatMap(new ListStateDemo())
.print();
senv.execute(ListStateDemo.class.getSimpleName());
}
}
2.2、Operator State
Operator State可以用在所有算子上,每個算子子任務或者說每個算子實例共享一個狀態,流入這個算子子任務的數據可以訪問和更新這個狀態。
算子狀態不能由相同或不同算子的另一個實例訪問。
Flink為算子狀態提供三種基本數據結構:
- ListState:存儲列表類型的狀態。
- UnionListState:存儲列表類型的狀態,與 ListState 的區別在於:如果並行度發生變化,ListState 會將該算子的所有並發的狀態實例進行匯總,然后均分給新的 Task;而 UnionListState 只是將所有並發的狀態實例匯總起來,具體的划分行為則由用戶進行定義。
- BroadcastState:用於廣播的算子狀態。如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應用廣播狀態。
假設此時不需要區分監控數據的類型,只要有監控數據超過閾值並達到指定的次數后,就進行報警:
public class OperateStateDemo extends RichFlatMapFunction<Tuple2<String, Long>, List<Tuple2<String, Long>>>
implements CheckpointedFunction{
private final int threshold;
private transient ListState<Tuple2<String, Long>> checkpointedState;
private List<Tuple2<String, Long>> bufferedElements;
public OperateStateDemo(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
bufferedElements.add(value);
if(bufferedElements.size() == threshold) {
out.collect(bufferedElements);
bufferedElements.clear();
}
}
/**
* 進行checkpoint快照
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for(Tuple2<String, Long> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
"listState",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
);
checkpointedState = context.getOperatorStateStore().getListState(listStateDescriptor);
// 如果是故障恢復
if(context.isRestored()) {
for(Tuple2<String, Long> element : checkpointedState.get()) {
bufferedElements.add(element);
}
checkpointedState.clear();
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.getCheckpointConfig().setCheckpointInterval(500);
DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
Tuple2.of("a", 50L),Tuple2.of("a", 60L),Tuple2.of("a", 70L),
Tuple2.of("b", 50L),Tuple2.of("b", 60L),Tuple2.of("b", 70L),
Tuple2.of("c", 50L),Tuple2.of("c", 60L),Tuple2.of("c", 70L)
);
dataStream
.flatMap(new OperateStateDemo(2))
.print();
senv.execute(OperateStateDemo.class.getSimpleName());
}
}
三、狀態橫向擴展
狀態的橫向擴展問題主要是指修改Flink應用的並行度,確切的說,每個算子的並行實例數或算子子任務數發生了變化,應用需要關停或啟動一些算子子任務,某份在原來某個算子子任務上的狀態數據需要平滑更新到新的算子子任務上。
Flink的Checkpoint就是一個非常好的在各算子間遷移狀態數據的機制。算子的本地狀態將數據生成快照(snapshot),保存到分布式存儲(如HDFS)上。橫向伸縮后,算子子任務個數變化,子任務重啟,相應的狀態從分布式存儲上重建(restore)。
對於Keyed State和Operator State這兩種狀態,他們的橫向伸縮機制不太相同。由於每個Keyed State總是與某個Key相對應,當橫向伸縮時,Key總會被自動分配到某個算子子任務上,因此Keyed State會自動在多個並行子任務之間遷移。對於一個非KeyedStream,流入算子子任務的數據可能會隨着並行度的改變而改變。如上圖所示,假如一個應用的並行度原來為2,那么數據會被分成兩份並行地流入兩個算子子任務,每個算子子任務有一份自己的狀態,當並行度改為3時,數據流被拆成3支,或者並行度改為1,數據流合並為1支,此時狀態的存儲也相應發生了變化。對於橫向伸縮問題,Operator State有兩種狀態分配方式:一種是均勻分配,另一種是將所有狀態合並,再分發給每個實例上。
四、檢查點機制
為了使 Flink 的狀態具有良好的容錯性,Flink 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,Flink 定期在數據流上生成 checkpoint barrier ,當某個算子收到 barrier 時,即會基於當前狀態生成一份快照,然后再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 后,也基於當前狀態生成一份快照,依次傳遞直至到最后的 Sink 算子上。當出現異常后,Flink 就可以根據最近的一次的快照數據將所有算子恢復到先前的狀態。
4.1、開啟檢查點 (checkpoint)
默認情況下 checkpoint 是禁用的。通過調用 StreamExecutionEnvironment 的 enableCheckpointing(n) 來啟用 checkpoint,里面的 n 是進行 checkpoint 的間隔,單位毫秒。
Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基於Stream中各個Operator的狀態來生成Snapshot,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些Snapshot進行恢復,從而修正因為故障帶來的程序數據狀態中斷。這里,我們簡單理解一下Flink Checkpoint機制,如官網下圖所示:
Checkpoint指定觸發生成時間間隔后,每當需要觸發Checkpoint時,會向Flink程序運行時的多個分布式的Stream Source中插入一個Barrier標記,這些Barrier會根據Stream中的數據記錄一起流向下游的各個Operator。當一個Operator接收到一個Barrier時,它會暫停處理Steam中新接收到的數據記錄。因為一個Operator可能存在多個輸入的Stream,而每個Stream中都會存在對應的Barrier,該Operator要等到所有的輸入Stream中的Barrier都到達。當所有Stream中的Barrier都已經到達該Operator,這時所有的Barrier在時間上看來是同一個時刻點(表示已經對齊),在等待所有Barrier到達的過程中,Operator的Buffer中可能已經緩存了一些比Barrier早到達Operator的數據記錄(Outgoing Records),這時該Operator會將數據記錄(Outgoing Records)發射(Emit)出去,作為下游Operator的輸入,最后將Barrier對應Snapshot發射(Emit)出去作為此次Checkpoint的結果數據。
Checkpoint 其他的屬性包括:
- 精確一次(exactly-once)對比至少一次(at-least-once):你可以選擇向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中傳入一個模式來選擇使用兩種保證等級中的哪一種。對於大多數應用來說,精確一次是較好的選擇。至少一次可能與某些延遲超低(始終只有幾毫秒)的應用的關聯較大。
- checkpoint 超時:如果 checkpoint 執行的時間超過了該配置的閾值,還在進行中的 checkpoint 操作就會被拋棄。
- checkpoints 之間的最小時間:該屬性定義在 checkpoint 之間需要多久的時間,以確保流應用在 checkpoint 之間有足夠的進展。如果值設置為了 5000,無論 checkpoint 持續時間與間隔是多久,在前一個 checkpoint 完成時的至少五秒后會才開始下一個 checkpoint。
- 並發 checkpoint 的數目: 默認情況下,在上一個 checkpoint 未完成(失敗或者成功)的情況下,系統不會觸發另一個 checkpoint。這確保了拓撲不會在 checkpoint 上花費太多時間,從而影響正常的處理流程。不過允許多個 checkpoint 並行進行是可行的,對於有確定的處理延遲(例如某方法所調用比較耗時的外部服務),但是仍然想進行頻繁的 checkpoint 去最小化故障后重跑的 pipelines 來說,是有意義的。
- externalized checkpoints: 你可以配置周期存儲 checkpoint 到外部系統中。Externalized checkpoints 將他們的元數據寫到持久化存儲上並且在 job 失敗的時候不會被自動刪除。這種方式下,如果你的 job 失敗,你將會有一個現有的 checkpoint 去恢復。更多的細節請看 Externalized checkpoints 的部署文檔。
- 在 checkpoint 出錯時使 task 失敗或者繼續進行 task:他決定了在 task checkpoint 的過程中發生錯誤時,是否使 task 也失敗,使失敗是默認的行為。 或者禁用它時,這個任務將會簡單的把 checkpoint 錯誤信息報告給 checkpoint coordinator 並繼續運行。
- 優先從 checkpoint 恢復(prefer checkpoint for recovery):該屬性確定 job 是否在最新的 checkpoint 回退,即使有更近的 savepoint 可用,這可以潛在地減少恢復時間(checkpoint 恢復比 savepoint 恢復更快)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000);
// 高級選項:
// 設置模式為精確一次 (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必須在一分鍾內完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 開啟在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 允許在有更近 savepoint 時回退到 checkpoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
保存多個Checkpoint
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數:
state.checkpoints.num-retained: 20
保留了最近的20個Checkpoint。如果希望會退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現。
從Checkpoint進行恢復
從指定的checkpoint處啟動,最近的一個/flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584啟動,通常需要先停掉當前運行的flink-session,然后通過命令啟動:
../bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c com.code2144.helper_wink-1.0-SNAPSHOT.jar
可以把命令放到腳本里面,每次直接執行checkpoint
恢復腳本即可:
4.2、保存點機制 (Savepoints)
保存點機制 (Savepoints)是檢查點機制的一種特殊的實現,它允許通過手工的方式來觸發 Checkpoint,並將結果持久化存儲到指定路徑中,主要用於避免 Flink 集群在重啟或升級時導致狀態丟失。示例如下:
# 觸發指定id的作業的Savepoint,並將結果存儲到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
手動savepoint
/app/local/flink-1.6.2/bin/flink savepoint 0409251eaff826ef2dd775b6a2d5e219 [hdfs://bigdata/path]
成功觸發savepoint通常會提示:Savepoint completed. Path: hdfs://path...
:
手動取消任務
與checkpoint
異常停止或者手動Kill
掉不一樣,對於savepoint
通常是我們想要手動停止任務,然后更新代碼,可以使用flink cancel ...
命令:
/app/local/flink-1.6.2/bin/flink cancel 0409251eaff826ef2dd775b6a2d5e219
從指定savepoint啟動job
bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c com.code2144.helper_workflow.HelperWorkFlowStreaming jars/BSS-ONSS-Flink-1.0-SNAPSHOT.jar
五、狀態后端
Flink 提供了多種 state backends,它用於指定狀態的存儲方式和位置。
狀態可以位於 Java 的堆或堆外內存。取決於 state backend,Flink 也可以自己管理應用程序的狀態。為了讓應用程序可以維護非常大的狀態,Flink 可以自己管理內存(如果有必要可以溢寫到磁盤)。默認情況下,所有 Flink Job 會使用配置文件 flink-conf.yaml 中指定的 state backend。
但是,配置文件中指定的默認 state backend 會被 Job 中指定的 state backend 覆蓋。
5.1、狀態管理器分類
MemoryStateBackend
默認的方式,即基於 JVM 的堆內存進行存儲,主要適用於本地開發和調試。
FsStateBackend
基於文件系統進行存儲,可以是本地文件系統,也可以是 HDFS 等分布式文件系統。 需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的數據仍然是存儲在 TaskManager 的內存中的,只有在 checkpoint 時,才會將狀態快照寫入到指定文件系統上。
RocksDBStateBackend
RocksDBStateBackend 是 Flink 內置的第三方狀態管理器,采用嵌入式的 key-value 型數據庫 RocksDB 來存儲正在進行的數據。等到 checkpoint 時,再將其中的數據持久化到指定的文件系統中,所以采用 RocksDBStateBackend 時也需要配置持久化存儲的文件系統。之所以這樣做是因為 RocksDB 作為嵌入式數據庫安全性比較低,但比起全文件系統的方式,其讀取速率更快;比起全內存的方式,其存儲空間更大,因此它是一種比較均衡的方案。
5.2、配置方式
Flink 支持使用兩種方式來配置后端管理器:
第一種方式:基於代碼方式進行配置,只對當前作業生效:
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
配置 RocksDBStateBackend 時,需要額外導入下面的依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.0</version>
</dependency>
第二種方式:基於 flink-conf.yaml 配置文件的方式進行配置,對所有部署在該集群上的作業都生效:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
六、狀態一致性
6.1、端到端(end-to-end)
在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如 Kafka)和輸出到持久化系統。
端到端的一致性保證,意味着結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決於所有組件中一致性最弱的組件。具體可以划分如下:
- 內部保證:依賴checkpoint
- source 端:需要外部源可重設數據的讀取位置
- sink 端:需要保證從故障恢復時,數據不會重復寫入外部系統。
而對於sink端,又有兩種具體的實現方式: - 冪等(Idempotent)寫入:所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,后面再重復執行就不起作用了。
- 事務性(Transactional)寫入:需要構建事務來寫入外部系統,構建的事務對應着 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中。
對於事務性寫入,具體又有兩種實現方式:預寫日志(WAL)和兩階段提交(2PC)。Flink DataStream API 提供了GenericWriteAheadSink 模板類和 TwoPhaseCommitSinkFunction 接口,可以方便地實現這兩種方式的事務性寫入。
6.2、Flink+Kafka 實現端到端的 exactly-once語義
端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?
- 內部:利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復,保證內部的狀態一致性
- source:kafka consumer作為source,可以將偏移量保存下來,如果后續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性
- sink:kafka producer作為sink,采用兩階段提交 sink,需要實現一個TwoPhaseCommitSinkFunction內部的checkpoint機制。
EXACTLY_ONCE
語義簡稱EOS,指的是每條輸入消息只會影響最終結果一次,注意這里是影響一次,而非處理一次,Flink一直宣稱自己支持EOS,實際上主要是對於Flink應用內部來說的,對於外部系統(端到端)則有比較強的限制
- 外部系統寫入支持冪等性
- 外部系統支持以事務的方式寫入
Kafka在0.11版本之前只能保證At-Least-Once
或At-Most-Once
語義,從0.11版本開始,引入了冪等發送和事務,從而開始保證EXACTLY_ONCE
語義。
Maven依賴 | 開始支持的版本 | 生產/消費 類名 | kafka版本 | 注意 |
---|---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 |
0.8.x | 使用Kafka內部SimpleConsumer API. Flink把Offsets提交到ZK |
flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 |
0.9.x | 使用新版Kafka Consumer API. |
flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 |
0.10.x | 支持Kafka生產/消費消息帶時間戳 |
flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 |
0.11.x | 由於0.11.x Kafka不支持scala 2.10。此連接器支持Kafka事務消息傳遞,以便為生產者提供exactly once語義。 |
flink-connector-kafka_2.11 |
1.7.0 | FlinkKafkaConsumer FlinkKafkaProducer |
>=1.0.0 | 高版本向后兼容。但是,對於Kafka 0.11.x和0.10.x版本,我們建議分別使用專用的flink-connector-Kafka-0.11_2.11 和link-connector-Kafka-0.10_2.11 |
Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,封裝了兩階段提交邏輯,並在Kafka Sink connector中實現了TwoPhaseCommitSinkFunction,依賴Kafka版本為0.11+
public class FlinkKafkaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.enableCheckpointing(1000);
senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// kafka 數據源
Map<String, String> config = Configuration.initConfig("commons.xml");
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
kafkaProps.setProperty("group.id", config.get("kafka-groupid"));
SingleOutputStreamOperator<String> dataStream = senv.addSource(
new FlinkKafkaConsumer011(
config.get("kafka-topic"),
new SimpleStringSchema(),
kafkaProps
));
// sink 到 kafka
FlinkKafkaProducer011<String> producer011 = new FlinkKafkaProducer011<>(
config.get("kafka-ipport"),
"test-kafka-producer",
new SimpleStringSchema());
producer011.setWriteTimestampToKafka(true);
dataStream.map(x -> {
// 拋出異常
if("name4".equals(JSON.parseObject(x).get("name"))){
System.out.println("name4 exception test...");
// throw new RuntimeException("name4 exception test...");
}
return x;
}).addSink(producer011);
senv.execute(FlinkKafkaDemo.class.getSimpleName());
}
}
Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。
當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在算子間傳遞下去。
每個算子會對當前的狀態做個快照,保存到狀態后端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。
每個內部的 transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里。
sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務(還不能被消費);當遇到 barrier時,把狀態保存到狀態后端,並開啟新的預提交事務。
當所有算子任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為“已確認”,數據就真正可以被消費了。
所以看到,執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
具體的兩階段提交步驟總結如下:
- 第一條數據來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日志但標記為未提交,這就是“預提交”, jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態存入狀態后端,並通知 jobmanager
- sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據
- jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成
- sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據
- 外部kafka關閉事務,提交的數據可以正常消費了。
所以也可以看到,如果宕機需要通過StateBackend進行恢復,只能恢復所有確認提交的操作。
6.3、Kafka冪等性和事務
前面表格總結的可以看出,Kafka在0.11版本之前只能保證At-Least-Once
或At-Most-Once
語義,從0.11版本開始,引入了冪等發送和事務,從而開始保證EXACTLY_ONCE
語義。
冪等性
在未引入冪等性時,Kafka正常發送和重試發送消息流程圖如下:
為了實現Producer的冪等語義,Kafka引入了Producer ID(即PID)和Sequence Number。每個新的Producer在初始化的時候會被分配一個唯一的PID,該PID對用戶完全透明而不會暴露給用戶。
Producer發送每條消息<Topic, Partition>對於Sequence Number會從0開始單調遞增,broker端會為每個<PID, Topic, Partition>維護一個序號,每次commit一條消息此序號加一,對於接收的每條消息,如果其序號比Broker維護的序號(即最后一次Commit的消息的序號)大1以上,則Broker會接受它,否則將其丟棄:
- 序號比Broker維護的序號大1以上,說明存在亂序。
- 序號比Broker維護的序號小,說明此消息以及被保存,為重復數據。
有了冪等性,Kafka正常發送和重試發送消息流程圖如下:
事務
事務是指所有的操作作為一個原子,要么都成功,要么都失敗,而不會出現部分成功或部分失敗的可能。舉個例子,比如小明給小王轉賬1000元,那首先小明的賬戶會減去1000,然后小王的賬戶會增加1000,這兩個操作就必須作為一個事務,否則就會出現只減不增或只增不減的問題,因此要么都失敗,表示此次轉賬失敗。要么都成功,表示此次轉賬成功。分布式下為了保證事務,一般采用兩階段提交協議。
為了解決跨session和所有分區不能EXACTLY-ONCE問題,Kafka從0.11開始引入了事務。
為了支持事務,Kafka引入了Transacation Coordinator來協調整個事務的進行,並可將事務持久化到內部topic里,類似於offset和group的保存。
用戶為應用提供一個全局的Transacation ID,應用重啟后Transacation ID不會改變。為了保證新的Producer啟動后,舊的具有相同Transaction ID的Producer即失效,每次Producer通過Transaction ID拿到PID的同時,還會獲取一個單調遞增的epoch。由於舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識別出該Producer是老的Producer並拒絕其請求。有了Transaction ID后,Kafka可保證:
- 跨Session的數據冪等發送。當具有相同Transaction ID的新的Producer實例被創建且工作時,舊的Producer停止工作。
- 跨Session的事務恢復。如果某個應用實例宕機,新的實例可以保證任何未完成的舊的事務要么Commit要么Abort,使得新實例從一個正常狀態開始工作。
KIP-98 對Kafka
事務原理進行了詳細介紹,完整的流程圖如下:
- Producer向任意一個brokers發送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址;
- 找到Transaction Coordinator后,具有冪等特性的Producer必須發起InitPidRequest請求以獲取PID。
- 調用beginTransaction()方法開啟一個事務,Producer本地會記錄已經開啟了事務,但Transaction Coordinator只有在Producer發送第一條消息后才認為事務已經開啟。
- Consume-Transform-Produce這一階段,包含了整個事務的數據處理過程,並且包含了多種請求。
- 提交或回滾事務 一旦上述數據寫入操作完成,應用程序必須調用KafkaProducer的commitTransaction方法或者abortTransaction方法以結束當前事務。
6.4 兩階段提交協議
兩階段提交指的是一種協議,經常用來實現分布式事務,可以簡單理解為預提交+實際提交,一般分為協調器Coordinator(以下簡稱C)和若干事務參與者Participant(以下簡稱P)兩種角色。
- C先將prepare請求寫入本地日志,然后發送一個prepare的請求給P
- P收到prepare請求后,開始執行事務,如果執行成功返回一個Yes或OK狀態給C,否則返回No,並將狀態存到本地日志。
- C收到P返回的狀態,如果每個P的狀態都是Yes,則開始執行事務Commit操作,發Commit請求給每個P,P收到Commit請求后各自執行Commit事務操作。如果至少一個P的狀態為No,則會執行Abort操作,發Abort請求給每個P,P收到Abort請求后各自執行Abort事務操作。
注:C或P把發送或接收到的消息先寫到日志里,主要是為了故障后恢復用,類似WAL
七、鏈接文檔
橫向擴展相關來於:Flink狀態管理詳解:Keyed State和Operator List State深度解析
checkpoint 相關來於:Apache Flink v1.10 官方中文文檔
狀態一致性相關來於:再忙也需要看的Flink狀態管理