正好最近公司有個需求,要用到flink的狀態計算,需求是這樣的,收集數據庫新增的數據。
聽起來很簡單對吧?起初我也這么認為,現在發現,這尼瑪就是變相的動態讀取啊。
因為數據是一直在增加的,你需要記錄這次收集的結果,用於下一次的運算,所以要用到狀態計算。
廢話不多說,直接上干貨。
關於什么是有狀態的flink計算,官方給出的回答是這樣的:在flink程序內部存儲計算產生的中間結果,並提供給Function或算子計算結果使用。
了解了定義,我們接下來進入主題。
1.狀態類型
在Flink中根據數據集是否根據Key進行分區,將狀態分為Keyde state和Operator State兩種類型。
(1)Keyed State
表示和key相關的一種state,只能用於KeyedStream類型數據集對應的Functions和Operators之上。Keyed State是Operator State的特例,區別在於Keyed State事先按照key對數據集進行了分區,每個Key State僅對應一個Operator和Key的組合。Keyed State可以通過Key Groups進行管理,主要用於當算子並行度發生變化時,自動重新分布Keyed State數據。在系統運行過程種,一個Keyed算子實例可能運行一個或者多個Key Groups 的 keys。
(2)Operator State
2.Managed Keyed State
(1)Stateful Function定義
接下來通過完整的實例來說明如何在RichFlatmapFunction中使用ValueState,完成對介入數據最小值的獲取。
StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment; //創建元素數據集 DataStream<int,long> inputStream = env.fromElements((2,21L),(4,1L),(5,4L)); inputStream.keyBy(“1”).flatMap{ //定義和創建RichFlatMapFunction,第一個參數位輸入數據類型,第二個參數位輸出數據類型 new RichFlatMapFunction<Map(int,long),Map(int,Map(long,long))>(){ private ValueState leastValueState = null; @Override open(Configuration parameters){ ValueStateDescriptor leastValueStateDescriptor =new ValueStateDescriptor ("leastValueState ",class.of(long)); leastValueState = getRuntimeContext.getState(leastValueStateDescriptor ); } @Override flatMap(Collector collector,Tuple2(int,long) t){ long leastValue =leastValueState .value(); if(t.f1>leastValue){ collector.collect(t,leastValue); }else{ leastValueState.update(t.f1); collector.collect(t,leastValue); } } }
}
3.Managed Operator State
Operator State是一種non-keyed state,與並行的操作算子實際相關聯,例如在Kafka Connector中,每個Kafka消費端算子實例都對應到Kafka的一個分區中,維護Topic分區和Offsets偏移量作為算子的Operator State。在Flink中可以實現CheckpointedFunction或者ListCheckpointed兩個接口來定義操作Managed Operator State的函數。
(1)通過CheckpointedFunction接口操作Operator State
CheckpointedFunction接口定義:
public interface CheckpointedFunction{ //觸發checkpoint調用 void snapshotState(FunctionSnapshotContext context)throws Exception; //每次自定義函數初始化時,調用 void initializeState(FunctionInitializationContext context)throws Exception; }
在每個算子中Managed Operator State都是以List形式存儲,算子和算子之間的狀態數據相互獨立,List存儲比較適合狀態數據的重新分布,Flink目前支持對Managed OperatorState兩種重分布的策略,分別是Even-split Redistribution和Union Redistribution。
可以通過實現FlatMapFunction和CheckpointedFunction完成對輸入數據中每個key的數據元素數量和算子的元素數量的統計。
在initializeState()方法中分別簡歷keyedState和operator State兩種狀態,存儲基於Key相關的狀態值以及基於算子的狀態值。
private class CheckpointCount(int numElements)extends FlatMapFunction<Map(int,long),Map(int,Map(long,long))>with CheckpointedFunction{ //定義算子實例本地變量,存儲Operator數據數量 private long operatorCount = null; //定義keyedState,存儲和key相關的狀態值 private ValueState keyedState =null; //定義operatorState,存儲算子的狀態值 private ListState operatorState = null; @Override flatMap(Tuple(int,long)t,Collector collector){ long keyedCount okeyedState.value() +1; //更新keyedState數量 keyedState.update(keyedCount); //更新本地算子operatorCount值 operatorCount =operatorCount+1; //輸出結果,包括id,id對應的數量統計keyedCount,算子輸入數據的數量統計operatorCount collector.collect(t.f0,keyedCount,operatorCount); } //初始化狀態數據 @Override initializeState(FunctionInitializationContext context){ //定義並獲取keyedState ValueStateDescriptor KeyedDescriptor =new ValueStateDescriptor ("keyedState",createTypeInformation); keyedState = context.getKeyedStateStore.getState(KeyedDescriptor ); //定義並獲取operatorState ValueStateDescriptor OperatorDescriptor =new ValueStateDescriptor ("OperatorState",createTypeInformation); operatorState = context.getOperatorStateStore.getListState(); //定義在Restored過程中,從operatorState中回復數據的邏輯 if(context.isRestored){ operatorCount = operatorState.get() } //當發生snapshot時,將operatorCount添加到operatorState中 @Override snapshotState(FunctionSnapshotContext context){ operatorState.clear(); operatorState.add(operatorCount); } } }
可以從上述代碼看到,在snapshotState()方法中清理掉上一次checkpoint中存儲的operatorState的數據,然后再添加並更新本次算子中需要checkpoint的operatorCount狀態變量。當重啟時會調用initializeState方法,重新恢復keyedState和OperatorState,其中operatorCount數據可以從最新的operatorState中恢復。
(2)通過ListCheckpointed接口定義Operator State
ListCheckpointed接口和CheckpointedFunction接口相比再靈活性上相對較弱一點,只能支持List類型的狀態,並且在數據恢復時僅支持even-redistribution策略。
需要實現以下兩個方法來操作Operator State:
List<T> snapshotState(long checkpointId,long timestamp) throws Exception; void restoreState(List<T> state) throws Exception;
其中snapshotState方法定義數據元素List存儲到checkpoints的邏輯,restoreState方法則定義從checkpoints中恢復狀態的邏輯。
class numberRecordsCount extends FlatMapFunction(Map(String,long),Map(String,long))with ListCheckpointed{ private long numberRecords =0L; @Override flatMap(Tuple2(String,long)t,Collector collector){ //接入一條記錄則進行統計,並輸出 numberRecords +=1; collector.collect(t.f0,numberRecords); } @Override snapshotState(long checkpointId){ Collections.singletonList(numberRecords); } @Override restoreState(List<long> list){ numberRecords =0L; for(count <list){ //從狀態中恢復numberRecords數據 numberRecords +=count } } }