Flink State 状态


常用 State

Flink 有两种常见的 State类型,分别是:

Keyed State (键控状态)

Operator State(算子状态)

1) Keyed State(键控状态)

Keyed State:顾名思义就是基于 KeyedStream 上的状态,这个状态是跟特定的Key绑定的。KeyedStrean 流上的每一个Key,都对应一个 State。Flink针对Keyed State提供了以下可以保存 State 的数据结构:

※ ValueState<T>:保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 Key,

因此算子接收到的每个Key都可能对应一个值)。这个值可以通过 update(T)进行更新,通过 T value() 进行检索。

※ ListState<T>:保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获取整个列表。还可以通过 update(List<T>) 覆盖当前的列表。

※ ReducingState<T>:保存一个单值,表示添加到状态的所有聚合。接口与ListState 类似,使用 add(T)增加元素,会使用提供的 ReduceFunction 进行聚合

※ AggregatingState<IN.OUT>:保留一个单值,表示添加到状态的所以值得聚合。和ReducingState 相反得是,聚合类型可能与添加到状态得元素得类型不同。接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

※ FoldingState<T,ACC>:保留一个单值,表示添加到状态的所有值的聚合。与 ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState类型,但使用 add(T) 添加的元素会用指定的FoldFunction 折叠成聚合值。

※ MapState<UK,UV>:维护了一个添加映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分别检索映射、键和值的可迭代视图。

2)Operator State(算子状态)

Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition,offset)映射。

 


 

station.log 内容:

station_0,18600003612,18900004575,barring,1577080453123,0 station_9,18600003686,18900002113,success,1577080453123,32 station_3,18600003794,18900009608,success,1577080453123,4 station_1,18600000005,18900007729,fail,1577080453123,0 station_8,18600007461,18900006987,barring,1577080453123,0 station_5,18600009356,18900006066,busy,1577080455129,0 station_4,18600001914,18900003949,busy,1577080455129,0

 

StationLog类:

/**
 *
 * @param sid      基站ID
 * @param callOut  主叫号码
 * @param callIn   被叫号码
 * @param callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success):
 * @param callTime 呼叫时间戳,精确到毫秒
 * @param duration 通话时长 单位:秒
 */
case class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

 

实现代码:

package com.apple.flink.keyedstate

import com.apple.flink.source.StationLog
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
 * 统计每个 手机呼叫间隔时间,并输出
 */
object StateCallInterval {

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    streamEnv.setParallelism(1)

    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._

    //读取文件数据
    val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
      .map(line => {
        var arr = line.split(",")
        new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })

    //方法一
    data.keyBy(_.callIn) //按照呼叫手机号分组
      .flatMap(new CallIntervalFunction())
      .print()

    //方法二:调用 flatMapWithState算子
    data.keyBy(_.callIn) //按照呼叫手机号分组
      .flatMapWithState[(String, Long), StationLog] {
        case (in: StationLog, None) => (List.empty, Some(in)) //如果状态中没有,则存入
        case (in: StationLog, pre: Some[StationLog]) => { //如果状态中有值则计算时间间隔
          var interval = in.callTime - pre.get.callTime
          (List((in.callIn, interval)), Some(in))
        }
      }.print()

    //方法三 mapWithState 算子也可以
    data.keyBy(_.callIn) //按照呼叫手机号分组
      .mapWithState[(String, Long), StationLog] {
        case (in: StationLog, None) => ((in.callIn, 0), Some(in))
        case (in: StationLog, pre: Some[StationLog]) => {
          var interval = in.callTime - pre.get.callTime
          ((in.callIn, interval), Some(in))
        }
      }.print()

    streamEnv.execute()

  }

  class CallIntervalFunction extends RichFlatMapFunction[StationLog, (String, Long)] {

    //定义一个保存前一条呼叫的数据的状态对象
    private var preData: ValueState[StationLog] = _

    override def open(parameters: Configuration): Unit = {
      val stateDescriptor = new ValueStateDescriptor[StationLog]("pre", classOf[StationLog])
      preData = getRuntimeContext.getState(stateDescriptor)
    }

    override def flatMap(in: StationLog, collector: Collector[(String, Long)]): Unit = {
      var pre: StationLog = preData.value()
      if (pre == null) { //如果状态中没有,则存入
        preData.update(in)
      } else { //如果状态有值则计算时间间隔var
        var interval = in.callTime - pre.callTime
        collector.collect((in.callIn, interval))
      }
    }
  }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM