常用 State
Flink 有兩種常見的 State類型,分別是:
Keyed State (鍵控狀態)
Operator State(算子狀態)
1) Keyed State(鍵控狀態)
Keyed State:顧名思義就是基於 KeyedStream 上的狀態,這個狀態是跟特定的Key綁定的。KeyedStrean 流上的每一個Key,都對應一個 State。Flink針對Keyed State提供了以下可以保存 State 的數據結構:
※ ValueState<T>:保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數據的 Key,
因此算子接收到的每個Key都可能對應一個值)。這個值可以通過 update(T)進行更新,通過 T value() 進行檢索。
※ ListState<T>:保存一個元素的列表。可以往這個列表中追加數據,並在當前的列表上進行檢索。可以通過 add(T) 或者 addAll(List<T>) 進行添加元素,通過 Iterable<T> get() 獲取整個列表。還可以通過 update(List<T>) 覆蓋當前的列表。
※ ReducingState<T>:保存一個單值,表示添加到狀態的所有聚合。接口與ListState 類似,使用 add(T)增加元素,會使用提供的 ReduceFunction 進行聚合
※ AggregatingState<IN.OUT>:保留一個單值,表示添加到狀態的所以值得聚合。和ReducingState 相反得是,聚合類型可能與添加到狀態得元素得類型不同。接口與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合。
※ FoldingState<T,ACC>:保留一個單值,表示添加到狀態的所有值的聚合。與 ReducingState 相反,聚合類型可能與添加到狀態的元素類型不同。接口與ListState類型,但使用 add(T) 添加的元素會用指定的FoldFunction 折疊成聚合值。
※ MapState<UK,UV>:維護了一個添加映射列表。你可以添加鍵值對到狀態中,也可以獲得反映當前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分別檢索映射、鍵和值的可迭代視圖。
2)Operator State(算子狀態)
Operator State 與 Key 無關,而是與 Operator 綁定,整個 Operator 只對應一個 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它會在每個 Connector 實例中,保存該實例消費 Topic 的所有(partition,offset)映射。
station.log 內容:
station_0,18600003612,18900004575,barring,1577080453123,0 station_9,18600003686,18900002113,success,1577080453123,32 station_3,18600003794,18900009608,success,1577080453123,4 station_1,18600000005,18900007729,fail,1577080453123,0 station_8,18600007461,18900006987,barring,1577080453123,0 station_5,18600009356,18900006066,busy,1577080455129,0 station_4,18600001914,18900003949,busy,1577080455129,0
StationLog類:
/** * * @param sid 基站ID * @param callOut 主叫號碼 * @param callIn 被叫號碼 * @param callType 通話類型eg:呼叫失敗(fail),占線(busy),拒接(barring),接通(success): * @param callTime 呼叫時間戳,精確到毫秒 * @param duration 通話時長 單位:秒 */ case class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)
實現代碼:
package com.apple.flink.keyedstate import com.apple.flink.source.StationLog import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector /** * 統計每個 手機呼叫間隔時間,並輸出 */ object StateCallInterval { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) //導入隱式轉換,建議寫在這里,可以防止IDEA代碼提示出錯的問題 import org.apache.flink.streaming.api.scala._ //讀取文件數據 val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath) .map(line => { var arr = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //方法一 data.keyBy(_.callIn) //按照呼叫手機號分組 .flatMap(new CallIntervalFunction()) .print() //方法二:調用 flatMapWithState算子 data.keyBy(_.callIn) //按照呼叫手機號分組 .flatMapWithState[(String, Long), StationLog] { case (in: StationLog, None) => (List.empty, Some(in)) //如果狀態中沒有,則存入 case (in: StationLog, pre: Some[StationLog]) => { //如果狀態中有值則計算時間間隔 var interval = in.callTime - pre.get.callTime (List((in.callIn, interval)), Some(in)) } }.print() //方法三 mapWithState 算子也可以 data.keyBy(_.callIn) //按照呼叫手機號分組 .mapWithState[(String, Long), StationLog] { case (in: StationLog, None) => ((in.callIn, 0), Some(in)) case (in: StationLog, pre: Some[StationLog]) => { var interval = in.callTime - pre.get.callTime ((in.callIn, interval), Some(in)) } }.print() streamEnv.execute() } class CallIntervalFunction extends RichFlatMapFunction[StationLog, (String, Long)] { //定義一個保存前一條呼叫的數據的狀態對象 private var preData: ValueState[StationLog] = _ override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor[StationLog]("pre", classOf[StationLog]) preData = getRuntimeContext.getState(stateDescriptor) } override def flatMap(in: StationLog, collector: Collector[(String, Long)]): Unit = { var pre: StationLog = preData.value() if (pre == null) { //如果狀態中沒有,則存入 preData.update(in) } else { //如果狀態有值則計算時間間隔var var interval = in.callTime - pre.callTime collector.collect((in.callIn, interval)) } } } }