date: 2020-10-20 16:20:00
updated: 2020-11-12 17:57:00
Flink
是一個框架和分布式處理引擎,用於對無界和有界數據流進行狀態計算
lambda架構
storm
- 低延遲 毫秒級
- 消息保障能力弱,消息傳輸可能重復但不會丟失
- 吞吐量低
spark streaming
- 以固定時間間隔(幾秒鍾)處理一段段的批處理作業(微批)
- 高延遲(秒級)
- 能夠保證消息傳輸不會丟失也不會重復
- 高吞吐
flink
- 支持原生流處理,即數據可以一條一條的進行處理
- 低延遲 毫秒級
- 能夠保證消息傳輸不會丟失也不會重復
- 高吞吐
為什么是flink?1. 低延遲(毫秒級) 2. 高吞吐(每秒千萬級) 3. 數據准確性(exactly-once) 4. 易用性(SQL/Table API/DataStream API)
\ | spark streaming | flink |
---|---|---|
流處理 | 數據需要打包成batch,這就會導致有延遲(秒級別),相當於一個偽實時 | 將數據全部當成流處理 |
數據模型 | RDD,DStream實際上也是一組組小批數據RDD的集合 | 數據流,以及事件序列 |
運行時架構 | 批計算,將DAG划分為不同的stage,一個完成后才可以計算下一個 | 標准的流執行模式,一個事件在一個節點處理完后才可以直接發往下一個節點進行處理 |
流處理和批處理的區別?比如同樣是max(溫度),spark streaming返回的是當前這一批有界數據的最大溫度,而flink會返回從程序開始一直到此刻最大的溫度 => 批是一種有界數據的概念,而流數據的話無界,最大最小值的話會考慮從頭開始到現在的整個數據。同樣,flink可以通過window api開窗,來做批處理
SQL/Table API(dynamic tables)
DataStream API(streams, windows)
ProcessFunction(events, state, time)
DataFlow 模型
- 數據從上一個 Operation 節點直接 Push 到下一個 Operation 節點。
- 各節點可以分布在不同的 Task 線程中運行,數據在 Operation 之間傳遞。
- 同樣具有 Shuffle 過程,但是數據不像 MapReduce 模型,Reduce 從 Map 端拉取數據,而是由上游把數據推給下游。
- 實現框架有 Apache Storm 和 Apache Flink。
watermark && window && allowedLatest
- watermark:由eventTime - 允許數據亂序的時間M秒得到,只增不減,即只有當當前數據對應的watermark大於之前的,才會更新watermark
- window:每N秒一段時間間隔
- allowedLatest:設置窗口銷毀延遲時間,及到時間了,但是還允許一定時間內的數據遲到
keyby(int... fields) // 0 代表第一個元素
keyby(String... fields) // 聲明為public的字段名或類的get方法,主要是方便了datastream嵌套復合類型比如tuple或者pojo類的時候
keyby(new KeySelector<T,K>()) // 覆寫getKey方法,來自定義返回key分組
除了env可以設置並行度,每一個算子也都可以單獨設置並行度,包括 print().setParallelism()
。默認並行度是cpu核數。一般來說,一個流的並行度,可以認為是其所有算子的並行度里最大的那個並行度。
一個流所需要的slot數量不一定就是流中所有算子的並行度的加和。事實上,flink 允許先后操作的算子放入到同一個slot里面(子任務共享slot),這樣可以減少數據的shuffle;在共享slot的情況下,可能會出現一個slot實現了source、transformation到sink的所有操作,即保留了整個流的pipeline過程,這個在 flink-yarn.xml 里有注釋說是允許的。
flink運行時組件:
- jobmanager
- 拿到客戶端提交的jar包,這個jar包包括:作業圖(jobGraph)、邏輯數據流圖(logical dataflow graph)和打包了所有的類、庫等等;把jobgraph轉換成一個物理層面的數據流圖--執行圖(executionGraph),包含了所有可以並發執行的任務;向rm申請slot資源,並分發到taskmanager上運行
- 還負責維護類似檢查點(checkpoint)這樣的操作
- taskmanager
- taskmanager數量 * 每個taskmanager下面對應的slot數量 = 整個集群最大的能夠運行任務數量
- 在內存中划分出一部分,稱為slot,用來運行task。宏觀來理解的話,可以把taskmanager想成是一個jvm進程,每一個slot是運行在上面的線程,只是對內存進行隔離,每個slot有自己的內存資源
- resourcemanager
- 主要是管理slot資源,每個taskmanager下面的slot都會在rm里進行注冊
- dispatcher
- 提供rest接口,提交app的時候會自動啟動,把app交給jobmanager;
- webUI界面。並不是必需的
flink執行圖可以分為四層,或者四個過程
- streamgraph:用戶通過stream api編寫的代碼生成的最初的圖,包含source、transformation、sink的一個拓撲結構
- jobgraph:客戶端在提交作業到jobmanager之前,會自動根據streamgraph進行合並優化,將符合條件的多個任務合並在一起作為一個任務,減少數據傳輸
- executiongraph:jobmanager根據jobgraph來生成executiongraph,將任務拆成可並行化的過程,交給taskmanager;調度層最核心的數據結構
- 物理執行圖:taskmanager上部署task后形成的圖,並不是一個具體的數據結構
數據傳輸形式
- one-to-one(forwarding)
- map、filter、flatmap等算子都是one-to-one的關系,即可以直接在同一個slot上執行計算
- 類似於spark的窄依賴
- redistributing
- stream的分區發生改變。每一個算子的子任務根據所選擇的transformation然后發送數據到不同的分區
- 比如keyby是基於hashcode重分區,而broadcast和rebalance會隨機重新分區
- 類似於spark的寬依賴,shuffle過程
什么樣的任務可以被合並在一起?相同並行度的one-to-one操作,滿足這兩個條件,flink會把相鄰的算子合並在一起,放在同一個slot里面進行計算,減少網絡傳輸。通過在算子后面調用 .filter(..).disableChaing()
可以斷開合並的任務鏈,.filter(..).startNewChain()
開始合並新的任務鏈。
針對某一個算子過程,可能會很復雜或者有特殊需求,需要單獨放在一個slot里運行?在算子后面調用 slotSharingGroup("key")
,表示從當前算子開始,之后的所有操作都會放在一個slot里面,通過 key 來區分多個slot共享組。默認slot共享組的key是"dafault"。
DataStream API
- source
- env.addSource()
- transformation
- map
- flatmap
- filter
- keyby
- 同一個key肯定在同一個分區,但是同一個分區不一定只有一個key,2個key的數據經過hash可能被分到同一個分區里
- 滾動聚合算子(rollingAggregation)
- 針對KeyedStream每一個支流做聚合
- 以min和minby為例,如果是min()只會返回指定字段的最小值,如果是minby則會返回指定字段的最小值對應的那一整個對象
- 聚合算子(sum、max、min、maxBy、minBy)底層實現是調用 keyedStream.aggregate() 方法,只是創建的ComparableAggregator的AggregationType不一樣,分別是SUM, MAX, MIN, MAXBY, MINBY。ComparableAggregator類繼承了AggregationFunction,而AggregationFunction則實現了ReduceFunction接口,所以ComparableAggregator類實現了reduce方法,首先是通過Comparator來比較兩個對象,然后會判斷byAggregate是否為真,即是否是minby或maxby操作,如果是的話,再判斷isfirst是否為真,即當出現多個同樣值的時候,是返回第一個還是返回最后一個
- reduce
- 自定義 reduce() 方法需要繼承 ReduceFunction 類
- split和select
- split 將 DataStream 會轉換成 SplitStream,select 從一個splitStream里通過tag來獲取一個或多個DataStream
- 被遺棄,使用 sideoutput 替代
- connect和map
- connect 將兩個DataStream合並為一個ConnectedStreams。此時數據只是放在了一個流里,數據本身和形式並不發生任何的變化
- map、flatmap、keyby等算子實現的function,會單獨作用於每一個datastream
- union
- sink
- kafka
- 初始化FlinkKafkaProducer的時候有三個構造函數,不加kafkaProducersPoolSize、加kafkaProducersPoolSize和一個帶着自定義分區的函數。一般用前兩個就行,如果說業務數據需要根據某種條件將數據寫入到N多個topic中,可以用第三個,實現KeyedSerializationSchema類getTargetTopic()方法,參考地址
- FlinkKafkaConsumer 消費過程:
父類FlinkKafkaConsumerBase- initializeState():從最后一個成功的checkpoint中獲取各個partition的offset到restoredState中。
2.open():從restoredState中獲取這個subTask所消費的topic的partition的起始offset,保存到subscribedPartitionsToStartOffsets中;如果這是一個第一次向topic消費的job的subTask,那么Flink根據job的並行度以及這個subTask的index均勻的分配partition給這個subTask消費。此時,partition的起始offset就由我們在上文中介紹的配置來決定。
3.run(): 如果subscribedPartitionsToStartOffsets不為空,創建KafkaFetcher,執行其runFetchLoop()。
- initializeState():從最后一個成功的checkpoint中獲取各個partition的offset到restoredState中。
- redis
- kafka
Window類型
- 時間窗口
- 滾動時間窗口 Tumbling Window
- 參數只有一個 window size,沒有重疊;區間范圍是左閉右開
- 滑動時間窗口 Sliding Window
- 參數有兩個 window size 和 slide step,可以有重疊
- 會話窗口
- 設置一個timeout時間,如果一段時間沒有接收到新的數據,就會生成一個新的窗口
- 滾動時間窗口 Tumbling Window
- 計數窗口
- 滾動計數窗口
- 滑動計數窗口
Window API
在keyby()
之后調用.window()
方法,或者dataStream.windowAll()
。一般是前者。也可以直接調用 .timeWindow()
,傳一個參數就是滾動事件窗口,傳兩個參數就是滑動時間窗口。如果要用到offset,那就只能用window()。
.countWindow()
底層調用的是 GlobalWindows 方法,全局窗口是把所有數據都放在一個窗口里,需要設置 trigger 觸發器和 evictor 移除器,來保證窗口是什么時候觸發什么情況下移除數據
org.apache.flink.streaming.api.windowing.assigners 包下面有各個窗口分配器的類,比如 TumblingEventTimeWindows、TumblingProcessingTimeWindows 等,在 window() 方法中,需要指定窗口分配器,比如 window(TumblingProcessingTimeWindows.of(windowSize, offset))
,windowSize 就是窗口大小,offset是指和整點的偏移量,比如8點05到9點05,那就是偏移5分鍾,Time.minutes(5)。offset的主要作用是時區
窗口的意義:把無限的數據流進行切分,得到有限的數據集進行處理
窗口函數 WindowedStream 函數
- 增量聚合函數
- 每條數據到來都會進行計算,保持一個簡單的狀態
- ReduceFunction, AggregateFunction
- 全量窗口函數
-先把窗口所有數據收集起來,等到計算的時候再遍歷所有的數據- ProcessWindowFunction
窗口相關的其他可選API
- trigger(): 觸發器,定義window什么時候關閉,觸發計算並輸出結果
- evictor(): 移除器,定義移除某些數據的邏輯
- allowedLateness(): 允許一定時間內遲到的數據也划分在上一個窗口里進行計算,這個時間是以watermark為准,不是eventTime
- sideOutPutLateData(): 將遲到的數據放入側輸出流
- getSideOutPut(): 獲取側輸出流,在所有計算完之后,dataStream.getSideOutPut(tag).print() 輸出
時間語義
- eventTime: 事件創建的時間
- ingestionTime: 數據進入Flink的時間
- processTime: 執行操作算子的本地系統時間,與機器相關
使用 eventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- 然后和watermark一起搭配使用,來處理亂序數據
waterMark 的意義:解決亂序問題
waterMark 的傳遞:上游向下游傳遞是通過廣播傳遞給它分區的所有下游,而下游會保存所有上游的watermark然后取最小的那個來計算。
watermark 的引用
.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) // 隔一段時間,周期性生成waterMark,這個周期時間在env.setStreamTimeCharacteristic() 默認值是200毫秒,也可以自定義設置
eg:
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(maxOutOfOrderness)) 去重寫extractTimestamp()方法,提取eventTime,在BoundedOutOfOrdernessTimestampExtractor類下getCurrentWatermark()方法里,會先通過eventTime-maxOutOfOrderness得到一個時間戳,會和已有的watermark比較,取最大值。maxOutOfOrderness指的是一個窗口延遲時間,maxOutOfOrderness設置的太大,窗口計算的結果就太慢,太小的話計算的結果准確性就下降了
.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) // 每來一條數據都會生成一個watermark
窗口起始時間的確定
以 TumblingEventTimeWindows 為例,有一個 assignWindows() 方法
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
// offset 默認是 0
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
// offset默認是0,+windowSize再取余,相當於沒加,所以相當於 timestamp - timestam%windowSize
// 相當於取了一個windowSize的整數倍
}
flink 有狀態的數據流
狀態
比如聚合計算的一些結果,需要保存下來,這個就算是任務的狀態。可以認為是一個本地變量,flink會進行狀態管理,包括狀態一致性、故障處理以及高效存儲和訪問
- 算子狀態 Operatior State
- 作用范圍限定為算子任務,由同一並行任務所處理的所有數據都可以訪問到相同的狀態,比如最小值,同一個task下的所有數據都可以訪問到,但是不同的task訪問不到,因為不在同一個內存下
- 算子狀態的數據結構
- 列表狀態 list state: 將狀態表示為一組數據的列表
- 聯合列表狀態 union list state
- 廣播狀態 broadcast state
- 鍵控狀態 keyed State
- 同一個分區下,可能有不同的key,針對這些key,會保存每一個key自己的一個狀態實例
- 鍵控狀態的數據結構
- 值狀態
- 列表狀態
- 映射狀態
- 聚合狀態
- 狀態后端 State Backends
- 主要負責本地的狀態管理,以及將檢查點狀態寫入遠程存儲
- 主要有3種
- MemoryStateBackend
- 內存級的狀態后端,會將鍵控狀態作為內存中的對象進行管理,將它們存儲在taskmanager的jvm堆上,而將checkpoint存儲在jobmanager中
- 特點:快速,低延遲,但不穩定
- FsStateBackend
- 將checkpoint存到遠程的持久化文件系統,而對於本地狀態,跟MemoryStateBackend一樣,也會存在taskmanager的jvm堆上
- 特點:同時擁有內存級的本地訪問速度,和更好的容錯保證。但是如果系統特別龐大,taskmanager堆上的內存都不足以存儲下,就有第三種狀態后端
// 狀態后端 // env.setStateBackend(new FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots));
- RocksDBStateBackend
- 將所有狀態序列化后,存入本地的RocksDB中存儲。相當於落入磁盤,不會丟數,但是會影響速度
- 需要再引入 flink-statebackend-rocksdb 依賴
- MemoryStateBackend
從運行時上下文拿到狀態 => 需要在 richFunction 里面去拿到上下文
比如溫度監控,同一個傳感器如果這一次的溫度和上一次的溫度相差十度,就預警。需要把上一條數據的溫度不斷更新在狀態里,然后進行比較。
dataStream.keyBy().flatmap(new TempAlert(溫度閾值))
class TempAlert extends RichFlatMapFunction{
private int threshold = 0;
private ValueState lastTempValueState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
public TempAlert(int i) {
this.threshold = i;
}
@Override
public void flatMap(Object value, Collector out) throws Exception {
// 先獲取上一條數據的溫度值
double lastTemp = (double) lastTempValueState.value();
if(Math.abs(value.getTemp() - lastTemp) >= threshold){
out.collect("預警信息");
}
// 更新溫度值
lastTempValueState.update(value.getTemp());
}
}
ProcessFunction
本身也繼承了 AbstractRichFunction, 即實現了 RichFunction,即加強版的富函數,可以拿到各種上下文、變量、狀態,還可以實現分流輸出的效果
context.timeService().registerEventTimeTimer(long time) // 注冊一個定時器,然后重寫 onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
方法; 可以注冊多個定時器,只要參數time不一樣,就是不一樣的定時器,但是實現的話都是 onTimer() 方法,通過判斷timestamp(onTimer()方法被激活時的時間戳)的不一樣,來執行不同的操作。類似於在同一個鬧鍾app設置定時,根據時間來區分,激活方法都是一樣的,只是去判斷當前timestamp是哪一個,來執行不同的操作
比如連續N秒鍾,溫度一直上升,就發送預警信息。如果用滾動窗口或者滑動窗口的話,有一定問題:第一個窗口前1秒是下降,后面的N-1秒是上升;第二個窗口,前面N-1秒是上升,最后1秒是下降。這樣兩個窗口都不會報警,但是實際上是應該報警的。
dataStream.keyBy().process(new TempWarning(連續時長))
class TempAlert2 extends KeyedProcessFunction {
// 保存上一個溫度值進行比較
private ValueState lastTempValueState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
// 保存上一個注冊定時器的時間戳,用於刪除
private ValueState timerTsValueState = getRuntimeContext().getState(new ValueStateDescriptor("timerTs", Long.class));
private int timeSpan = 0;
public TempAlert2(int i) {
this.timeSpan = i;
}
@Override
public void processElement(Object value, Context ctx, Collector out) throws Exception {
// 取出狀態
double lastTemp = (double) lastTempValueState.value();
long timerTs = (long) timerTsValueState.value();
// 更新上一次的溫度值
lastTempValueState.update(lastTemp);
// 溫度上升並且沒有注冊過定時器,那就注冊一個以當前時間戳開始的一個定時器
// timerTs 默認值是0,所以等於0的時候說明是第一次進行判斷
if(value.getTemp() > lastTemp && timerTs == 0){
// 按照當前處理時間 + timeSpan 作為時間戳來注冊定時器,也可以按照eventTime來設置時間戳
long ts = ctx.timerService().currentProcessingTime() + this.timeSpan*1000;
ctx.timerService().registerProcessingTimeTimer(ts);
timerTsValueState.update(ts);
}else if(value.getTemp() < lastTemp){
// 如果溫度下降,那么需要刪除定時器
ctx.timerService().deleteProcessingTimeTimer(timerTs);
timerTsValueState.getClass();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
out.collect("傳感器 " + ctx.getCurrentKey() + " 的溫度在 " + this.timeSpan/1000 + "秒內連續上升");
// 清空這一次的定時器,至於溫度狀態是否清空,如果清空了,就相當於再重新走一遍流程
timerTsValueState.clear();
}
}
實現分流輸出的時候,通過 ctx.output(OutputTag<X> outputTag, X value);
方法來實現
容錯機制
一致性檢查點 checkpoints: 在某個時間點對所有任務的狀態進行一次快照(一個任務的狀態可能很快就可以被更新,但是所有任務都做完,所有狀態合並出來的快照的時間可能就會比較慢);這個時間點應該是所有任務都恰好處理完一個相同的輸入數據的時候,比如針對偏移量5做快照,進行了keyby分區,那么應該保存所有分區在執行完偏移量5的數據之后的那個狀態。當恢復的時候是恢復最近一次成功保存的檢查點,然后會重新提交偏移量,這個就提供了 exactly-once 的一致性保證
類似於 jvm 的 safepoint
檢查點的實現算法
- 一種簡單的想法
- 暫停應用,保存狀態到檢查點,再重新恢復應用
- flink 的改進和實現
- 基於 Chandy-Lamport 算法的分布式快照
- 將檢查點的保存和數據處理分離開,不暫停整個應用,哪一個分區做完了就先做一個合照,等所有分區都做了合照之后再拼起來就可以了
檢查點屏障 checkpoint barrier: 類似於watermark,在處理數據的時候會打上一個barrier,就可以把一條流上的數據按照不同的檢查點分開。當前 barrier 前面到來的數據導致的狀態更改,都會包含在當前 barrier 所屬的檢查點中;當前 barrier 后面到來的數據導致的狀態更改,都會包含在后面的檢查點
jobmanager 會發送一個命令,告訴source,然后source會在數據里插入一個barrier,當task執行到對應的數據時,就知道這里需要做一次檢查點保存。
barrier 對齊: 類似於watermark會從上游廣播到所有的下游,而下游會分區來保存所有的watermark,然后取最小值來計算。barrier也會從上游廣播到所有的下游,對於下游來說,如果其中一個流source1的barrier先到,意味着這個流的數據已經計算完了,而其他流的barrier還沒到,此時source1的數據如果繼續到的話,會先緩存起來,要等其他流的barrier到,計算完了之后保存好狀態,再從緩存的數據開始陸續計算。
如果barrier不對齊,其中快的流的數據不斷計算,狀態就會不斷更新,慢的流快照保存,再次恢復上一次成功檢查點快照的時候,快的流那邊就會重復消費數據,就變成了 At Least Once。
前面說的都是 flink 內部的 Exactly-Once 和 At Least Once,如果每1分鍾快照一次,處理數據之后提交給mysql,在chk-100成功快照一次之后,過了30秒,程序down了,恢復到chk-100時的狀態,那么就會有30秒的數據會被重復提交到mysql,也就是說還需要一個端對端的精確一次實現
// checkpoint 默認時間間隔是500L
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
// checkpoint 超時時間,快的流等慢的流,超過這個時間就作廢
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 最大同時checkpoint個數 默認是1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(5);
// 兩個checkpoint執行的最小間隔,如果配了這一個,上面的最大同時執行個數就不會生效
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
// 能夠允許checkpoint失敗的次數
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
env.setStateBackend(new FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots));
在設置狀態后端這里,FsStateBackend 還有第二個參數,是否異步快照,即如果為true,當快的流計算完了,會把自己的狀態先緩存到文件里,然后繼續執行下面的計算,當慢的流計算完了之后再去合並快照
重啟策略
// 重啟3次,中間要間隔10秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
保存點 SavePoint
類似於checkpoint的實現,savepoint是自定義設置的保存功能,需要寫出來觸發創建操作,會同時保存一些額外的元數據上下文的信息
作用:有計划的手動備份;暫停和重啟應用;版本遷移等等
注意:要恢復savepoint的話需要保證計算流的拓撲結構是一樣的,也就是算子應該是不變的,最好在算子后面添加 .uid(String)
,這樣在恢復的時候可以更有針對性
狀態一致性
每個算子任務都有自己的計算狀態,一條數據不應該丟失,也不應該重復計算(重復計算指的是不能疊加計算)
狀態一致性分類
- at-most-once: 任務故障時,什么也不做,丟數就丟數。這樣的話會沒有快照的開銷,速度會變快,准確性會下降。比如直播視頻,因為網絡問題,丟幀也是可以接受的,udp協議
- at-least-once: 數據不會丟,數據可能被處理多次,即計算結果可能會進行疊加計算
- exactly-onde: 數據不會丟,只會處理一次
端到端的 exactly-once
- 內部保證:checkpoint
- source:可重設數據的讀取位置
- sink:從故障恢復時,數據不會重復寫入到外部系統
- 冪等寫入
- 事務寫入
冪等寫入 Idempotent Writes
思路:e的導數還是e。
含義:一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,后面再重復執行的操作就不起作用了。
實現:類似於hashmap,數據修改是針對於同一個key的,修改再多次也只相當於是一次。比如redis、mysql提交的時候,按照key來寫入數據,那重復寫入的話也不會影響到數據的變化,相當於是一次更新
缺點:1->5->10->1->5->10 在第一個10的位置發生故障,導致數據重復發送,產生數據跳變
事務寫入 Transactional Writes
思路:事務對應這checkpoint,等到checkpoint真正完成的時候,才把所有對應的結果放入到sink端中
實現:1. 預寫日志 2. 兩階段提交
預寫日志 Write-Ahead-Log WAL
實現:把結果數據先當成狀態保存,然后在收到checkpoint完成的通知后,一次性批量寫入sink端。DataStream API提供了一個模板類 GenericWriteAheadSink 來實現
缺點:由於checkpoint不能設置的太小(間隔太小的話,整個流就不斷在做快照了,都沒時間處理數據了),所以這一批數據到sink端會需要一定的時間,延遲性會比較高;另外從日志中批量寫入到sink端時,如果寫到一半sink端故障了,恢復的時候針對另一半沒寫入的日志數據如何處理也是一個問題
兩階段提交 Two-Phase-Commit 2PC
實現:對於每一個checkpoint,sink端會啟動一個事務,將所有計算得到的數據都放入到事務里,然后寫入到外部系統,但是並不提交,只是預提交(此時如果checkpoint掛了可以回滾事務)。當收到checkpoint完成的通知后,再提交事務,數據就會真正寫入。Flink提供了 TwoPhaseCommitSinkFunction 抽象類,eg: kafkaProducer
要求:對外部 sink 端的要求會比較高————需要支持事務;支持預寫入;可以回滾;提交事務必須是冪等操作
也許狀態很多,需要等所有狀態都合並成快照之后才能提交事務。而不是看到下一個barrier的時候就提交。看到新的barrier會繼續新的計算,放在新的事務里,當checkpoint完成之后,才會提交上一個事務
sink\source | 不可重置 | 可重置 |
---|---|---|
任意 | At-most-once | At-least-once(故障恢復時會出現暫時的不一致,數據跳變) |
冪等 | At-most-once | Exactly-once |
預寫日志 | At-most-once | At-least-once |
兩階段提交 | At-most-once | Exactly-once |
Flink + Kafka 端到端狀態一致性的保證
- 內部:利用checkpoint機制,把狀態存盤,發生故障的時候可以恢復
- source:kafka Consumer 作為source,可以把偏移量保存下來,故障恢復時可以重置偏移量,重新消費數據
- sink:FlinkKafkaProducer 底層繼承了 TwoPhaseCommitSinkFunction 類
綜上:
- jobmanager在source數據流插入barrier
- task看到barrier,就開始保存自己的狀態,把數據寫入到sink的事務里
- sink看到barrier,創建新的事務,當上一個barrier完成的時候,提交上一個事務
Table API 和 Flink SQL
需要引入 flink-table-planner 依賴,會自動引入 bridge 依賴。可以引入 flink-table-planner-blink 版本,比flink-table-planner要更完善一些。
// 創建table環境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 從外部鏈接創建一張表
①tableEnv.connect().createTemporaryTable("myTable1");
// 從已有的數據流生成一張表
②Table dataTable = tableEnv.fromDataStream(dataStream);
// 基於 Table API 查詢算子得到一張表 2種形式
①Table resultTable1 = tableEnv.from("myTable1").select().filter()
②Table resultTable1 = dataTable.select("id, temp").filter("id = \"sensor_1\"");
// 基於SQL來查詢得到一張表
②tableEnv.createTemporaryView("myTable2", dataTable);
②Table resultTable2 = tableEnv.sqlQuery("select * from myTable2"); // 表名和創建的那個view的名字需要是一樣的
①Table resultTable2 = tableEnv.sqlQuery("select * from myTable1"); // 因為前面指定了表名,所以這里直接寫就可以了
// 輸出最后的結果
DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Types.ROW(Types.INT, Types.LONG));
rowDataStream.print();