Flink狀態專題:keyed state和Operator state


        眾所周知,flink是有狀態的計算。所以學習flink不可不知狀態。

        正好最近公司有個需求,要用到flink的狀態計算,需求是這樣的,收集數據庫新增的數據。

        聽起來很簡單對吧?起初我也這么認為,現在發現,這尼瑪就是變相的動態讀取啊。

  因為數據是一直在增加的,你需要記錄這次收集的結果,用於下一次的運算,所以要用到狀態計算。

  廢話不多說,直接上干貨。

  關於什么是有狀態的flink計算,官方給出的回答是這樣的:在flink程序內部存儲計算產生的中間結果,並提供給Function或算子計算結果使用。

了解了定義,我們接下來進入主題。
  

  1.狀態類型

    在Flink中根據數據集是否根據Key進行分區,將狀態分為Keyde state和Operator State兩種類型。

    (1)Keyed State

      表示和key相關的一種state,只能用於KeyedStream類型數據集對應的Functions和Operators之上。Keyed State是Operator State的特例,區別在於Keyed State事先按照key對數據集進行了分區,每個Key State僅對應一個Operator和Key的組合。Keyed State可以通過Key Groups進行管理,主要用於當算子並行度發生變化時,自動重新分布Keyed State數據。在系統運行過程種,一個Keyed算子實例可能運行一個或者多個Key Groups 的 keys。

    (2)Operator State

      與Keyed State不同的是,Operator State只和並行的算子實例綁定,和數據元素種的key無關,每個算子實例中持有所有數據元素中的一部分狀態數據。Operator State
 
    支持當算子實例並行度發生變化時自動重新分配狀態數據。
 
    在Flink中,Keyed State和Operator State均具有兩種形式,托管狀態和原生狀態。(兩種狀態有什么不同就不啰嗦了,看的頭疼)
 

  2.Managed Keyed State

    Flink中有以下Managed Keyed State類型可以使用。ValueState[T],ListState[T],MapState[K,V]。

    (1)Stateful Function定義

    接下來通過完整的實例來說明如何在RichFlatmapFunction中使用ValueState,完成對介入數據最小值的獲取。

    

StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment;
//創建元素數據集
DataStream<int,long> inputStream = env.fromElements((2,21L),(4,1L),(5,4L));
inputStream.keyBy(“1”).flatMap{
//定義和創建RichFlatMapFunction,第一個參數位輸入數據類型,第二個參數位輸出數據類型
new RichFlatMapFunction<Map(int,long),Map(int,Map(long,long))>(){
  private ValueState leastValueState = null;  
  @Override 
  open(Configuration parameters){
     ValueStateDescriptor leastValueStateDescriptor =new ValueStateDescriptor ("leastValueState ",class.of(long));
   leastValueState = getRuntimeContext.getState(leastValueStateDescriptor );
    }

  @Override 
  flatMap(Collector collector,Tuple2(int,long) t){
  long leastValue =leastValueState .value();
  if(t.f1>leastValue){
    collector.collect(t,leastValue);
  }else{
    leastValueState.update(t.f1);
    collector.collect(t,leastValue);
  }
 }
 }
}

  3.Managed Operator State

  Operator State是一種non-keyed state,與並行的操作算子實際相關聯,例如在Kafka Connector中,每個Kafka消費端算子實例都對應到Kafka的一個分區中,維護Topic分區和Offsets偏移量作為算子的Operator State。在Flink中可以實現CheckpointedFunction或者ListCheckpointed兩個接口來定義操作Managed Operator State的函數。

  (1)通過CheckpointedFunction接口操作Operator State

        CheckpointedFunction接口定義:

public interface CheckpointedFunction{
//觸發checkpoint調用
  void snapshotState(FunctionSnapshotContext context)throws Exception;  
//每次自定義函數初始化時,調用
  void initializeState(FunctionInitializationContext context)throws Exception;
}

  在每個算子中Managed Operator State都是以List形式存儲,算子和算子之間的狀態數據相互獨立,List存儲比較適合狀態數據的重新分布,Flink目前支持對Managed OperatorState兩種重分布的策略,分別是Even-split Redistribution和Union Redistribution。

  可以通過實現FlatMapFunction和CheckpointedFunction完成對輸入數據中每個key的數據元素數量和算子的元素數量的統計。

  在initializeState()方法中分別簡歷keyedState和operator State兩種狀態,存儲基於Key相關的狀態值以及基於算子的狀態值。

private class CheckpointCount(int numElements)extends FlatMapFunction<Map(int,long),Map(int,Map(long,long))>with CheckpointedFunction{
//定義算子實例本地變量,存儲Operator數據數量
private long operatorCount = null;
//定義keyedState,存儲和key相關的狀態值
private ValueState keyedState =null;
//定義operatorState,存儲算子的狀態值
private ListState operatorState = null;
@Override
flatMap(Tuple(int,long)t,Collector collector){
long keyedCount okeyedState.value() +1;
//更新keyedState數量
keyedState.update(keyedCount);
//更新本地算子operatorCount值
operatorCount =operatorCount+1;
//輸出結果,包括id,id對應的數量統計keyedCount,算子輸入數據的數量統計operatorCount
collector.collect(t.f0,keyedCount,operatorCount);

}
//初始化狀態數據
@Override
initializeState(FunctionInitializationContext context){
//定義並獲取keyedState
ValueStateDescriptor KeyedDescriptor =new ValueStateDescriptor ("keyedState",createTypeInformation);
keyedState = context.getKeyedStateStore.getState(KeyedDescriptor );
//定義並獲取operatorState
ValueStateDescriptor OperatorDescriptor =new ValueStateDescriptor ("OperatorState",createTypeInformation);
operatorState = context.getOperatorStateStore.getListState();
//定義在Restored過程中,從operatorState中回復數據的邏輯
if(context.isRestored){
  operatorCount = operatorState.get()  
}
//當發生snapshot時,將operatorCount添加到operatorState中
@Override
snapshotState(FunctionSnapshotContext context){
operatorState.clear();
operatorState.add(operatorCount);
}
}
}

可以從上述代碼看到,在snapshotState()方法中清理掉上一次checkpoint中存儲的operatorState的數據,然后再添加並更新本次算子中需要checkpoint的operatorCount狀態變量。當重啟時會調用initializeState方法,重新恢復keyedState和OperatorState,其中operatorCount數據可以從最新的operatorState中恢復。

(2)通過ListCheckpointed接口定義Operator State

    ListCheckpointed接口和CheckpointedFunction接口相比再靈活性上相對較弱一點,只能支持List類型的狀態,並且在數據恢復時僅支持even-redistribution策略。

  需要實現以下兩個方法來操作Operator State:

  

List<T> snapshotState(long checkpointId,long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

  其中snapshotState方法定義數據元素List存儲到checkpoints的邏輯,restoreState方法則定義從checkpoints中恢復狀態的邏輯。

class numberRecordsCount extends FlatMapFunction(Map(String,long),Map(String,long))with ListCheckpointed{
  private long numberRecords =0L;
@Override
flatMap(Tuple2(String,long)t,Collector collector){
//接入一條記錄則進行統計,並輸出
numberRecords +=1;
collector.collect(t.f0,numberRecords);
}  
@Override
snapshotState(long checkpointId){
  Collections.singletonList(numberRecords);
}
@Override
restoreState(List<long> list){
 numberRecords =0L;
for(count <list){
 //從狀態中恢復numberRecords數據
numberRecords +=count
}
}
}

 

  空洞的代碼是沒有感染力的,所以前面我鋪墊了這么多,希望接下來的總結能對大家有所幫助。
  
  以上所有總結參考張利兵《flink原理、實戰與性能優化》第五章。附:原文是用scala寫的,因為樓主所在的公司用的java,所以將所有代碼用java改寫了一遍。如果有看着不方便的
 
朋友,可以去看原文嘿嘿。
 
  作為剛接觸flink的小白,本文只講了狀態的基本知識。后續可能會有如何存儲狀態以及flink狀態機制優化。
 
  歡迎大家不吝賜教。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM