常用 State
Flink 有两种常见的 State类型,分别是:
Keyed State (键控状态)
Operator State(算子状态)
1) Keyed State(键控状态)
Keyed State:顾名思义就是基于 KeyedStream 上的状态,这个状态是跟特定的Key绑定的。KeyedStrean 流上的每一个Key,都对应一个 State。Flink针对Keyed State提供了以下可以保存 State 的数据结构:
※ ValueState<T>:保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 Key,
因此算子接收到的每个Key都可能对应一个值)。这个值可以通过 update(T)进行更新,通过 T value() 进行检索。
※ ListState<T>:保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获取整个列表。还可以通过 update(List<T>) 覆盖当前的列表。
※ ReducingState<T>:保存一个单值,表示添加到状态的所有聚合。接口与ListState 类似,使用 add(T)增加元素,会使用提供的 ReduceFunction 进行聚合
※ AggregatingState<IN.OUT>:保留一个单值,表示添加到状态的所以值得聚合。和ReducingState 相反得是,聚合类型可能与添加到状态得元素得类型不同。接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
※ FoldingState<T,ACC>:保留一个单值,表示添加到状态的所有值的聚合。与 ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState类型,但使用 add(T) 添加的元素会用指定的FoldFunction 折叠成聚合值。
※ MapState<UK,UV>:维护了一个添加映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分别检索映射、键和值的可迭代视图。
2)Operator State(算子状态)
Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition,offset)映射。
station.log 内容:
station_0,18600003612,18900004575,barring,1577080453123,0 station_9,18600003686,18900002113,success,1577080453123,32 station_3,18600003794,18900009608,success,1577080453123,4 station_1,18600000005,18900007729,fail,1577080453123,0 station_8,18600007461,18900006987,barring,1577080453123,0 station_5,18600009356,18900006066,busy,1577080455129,0 station_4,18600001914,18900003949,busy,1577080455129,0
StationLog类:
/** * * @param sid 基站ID * @param callOut 主叫号码 * @param callIn 被叫号码 * @param callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success): * @param callTime 呼叫时间戳,精确到毫秒 * @param duration 通话时长 单位:秒 */ case class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)
实现代码:
package com.apple.flink.keyedstate import com.apple.flink.source.StationLog import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector /** * 统计每个 手机呼叫间隔时间,并输出 */ object StateCallInterval { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题 import org.apache.flink.streaming.api.scala._ //读取文件数据 val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath) .map(line => { var arr = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //方法一 data.keyBy(_.callIn) //按照呼叫手机号分组 .flatMap(new CallIntervalFunction()) .print() //方法二:调用 flatMapWithState算子 data.keyBy(_.callIn) //按照呼叫手机号分组 .flatMapWithState[(String, Long), StationLog] { case (in: StationLog, None) => (List.empty, Some(in)) //如果状态中没有,则存入 case (in: StationLog, pre: Some[StationLog]) => { //如果状态中有值则计算时间间隔 var interval = in.callTime - pre.get.callTime (List((in.callIn, interval)), Some(in)) } }.print() //方法三 mapWithState 算子也可以 data.keyBy(_.callIn) //按照呼叫手机号分组 .mapWithState[(String, Long), StationLog] { case (in: StationLog, None) => ((in.callIn, 0), Some(in)) case (in: StationLog, pre: Some[StationLog]) => { var interval = in.callTime - pre.get.callTime ((in.callIn, interval), Some(in)) } }.print() streamEnv.execute() } class CallIntervalFunction extends RichFlatMapFunction[StationLog, (String, Long)] { //定义一个保存前一条呼叫的数据的状态对象 private var preData: ValueState[StationLog] = _ override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor[StationLog]("pre", classOf[StationLog]) preData = getRuntimeContext.getState(stateDescriptor) } override def flatMap(in: StationLog, collector: Collector[(String, Long)]): Unit = { var pre: StationLog = preData.value() if (pre == null) { //如果状态中没有,则存入 preData.update(in) } else { //如果状态有值则计算时间间隔var var interval = in.callTime - pre.callTime collector.collect((in.callIn, interval)) } } } }