Flink学习(十八) 状态管理与状态编程


Flink中的状态

 

 

由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态;可以认为状态就是一个本地变量,可以被任务的业务逻辑访问;Flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以使开发人员可以专注于应用程序的逻辑。

在Flink中,状态始终与特定算子相关联,为了使运行的Flink了解算子的状态,算子需要预先注册其状态。

总的来说,有两种类型的状态:

算子状态(Operator State): 算子状态的作用范围限定为算子任务。

键控状态(Keyed State): 根据输入数据流中定义的键(key)来维护和访问。

 

算子状态(Operator State)

 

 

算子状态的作用范围限定为算子任务,由同一个并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言使共享的,算子状态不能由相同或不同算子的另一个任务访问。

 

算子状态数据结构

列表状态(List State): 将状态表示为一组数据的列表。

联合列表状态(Union list state): 也将状态表示为数据的列表,它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

广播状态(Broadcast state): 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

 

键控状态(Keyed State)

 

 

键控状态时根据输入数据流中定义的键(key)来维护和访问的,Flink为每隔key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态,当文物处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。

 

键控状态数据结构

值状态(Value state): 将状态表示为单个的值。

列表状态(list state): 将状态表示为一组数据的列表。

映射状态(Map state): 将状态表示为一组key-value对。

聚合状态(Reducing state & aggregating State): 将状态表示为一个用于聚合操作的列表。

 

键控状态的使用(值状态(Value state)举例:)

声明一个键控状态

//定义一个状态,用来保存上一个数据的温度值
//用懒加载的方式,一开始定义的时候我们还不执行,等到调用的时候去执行
//所有的状态都这么定义 当成一个变量直接用
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))

读取状态

//每来一条数据的时候,从状态中取出上一次的温度值

val preTemp = lastTemp.value()

对状态赋值

//更新温度值

 lastTemp.update(value.temperature)

 

状态后端(State Backends)

每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每隔并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储,访问以及维护,由一个可插入的组件决定,这个组件就叫做状态呀

后端(state backend),状态后端主要负责两件事:一是本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。

    //开启checkpoint 传入参数代表每隔多久进行checkpoint
    env.enableCheckpointing(60000) //状态后端 // env.setStateBackend(new MemoryStateBackend()) // env.setStateBackend(new RocksDBStateBackend(""))

 

 

 

 

状态编程小应用(三种方式实现)

/**
* 需求:
* 检测两次温度超过一定的范围的话 就报警
* 传感器的温度 跳变太大
*/

方式一:继承 KeyedProcessFunction 抽象类
class TempChangeAlert(thresHold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { //定义一个状态变量,保存上次的温度值
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { //获取上一次的温度值
    val lastTemp = lastTempState.value() //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
    val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法一,第一条数据进来,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次处理之后将状态进行更新
 lastTempState.update(value.temperature) } }

 

方式二:继承 RichFlatMapFunction 抽象类
class TempChangeAlertFlatMap(thresHold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{ //定义
  private var lastTempState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { //初始化的时候声明变量
    lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double])) } override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上一次的温度值
    val lastTemp = lastTempState.value() //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
    val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法二,第一条数据进来,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次处理之后将状态进行更新
 lastTempState.update(value.temperature) } }

 

方式三:keyBy后调用 flatMapWithState 方法

 //实现方式3
    val processedStream4 = dataStream.keyBy(_.id) .flatMapWithState[(String,Double,Double),Double]{ //没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
      case (input:SensorReading,None) => { println(s"方法三,第一条数据进来,$input") (List.empty,Some(input.temperature)) } //如果有状态的话,就与上一次的温度值比较差值,如果大于预设值,就报警
      case (input:SensorReading,lastTemp:Some[Double]) => { val diff = (input.temperature - lastTemp.get).abs if(diff>10.0){ (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature)) }else{ (List.empty,Some(input.temperature)) } } }

 

完整代码:

package com.wyh.statebackendApi import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector /** * 需求: * 检测两次温度超过一定的范围的话 就报警 * 传感器的温度 跳变太大 */

//温度传感器读数样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double) object Demo1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //开启checkpoint 传入参数代表每隔多久进行checkpoint
    env.enableCheckpointing(60000) //状态后端 // env.setStateBackend(new MemoryStateBackend()) // env.setStateBackend(new RocksDBStateBackend(""))
 val stream = env.socketTextStream("localhost", 7777) //Transform操作
    val dataStream: DataStream[SensorReading] = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //===到来的数据是升序的,准时发车,用assignAscendingTimestamps //指定哪个字段是时间戳 需要的是毫秒 * 1000 // .assignAscendingTimestamps(_.timestamp * 1000) //===处理乱序数据 // .assignTimestampsAndWatermarks(new MyAssignerPeriodic()) //==底层也是周期性生成的一个方法 处理乱序数据 延迟1秒种生成水位 同时分配水位和时间戳 括号里传的是等待延迟的时间
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = { t.timestamp * 1000 } }) // val processedStream = dataStream.keyBy(_.id) // .process(new TempIncreAlert()) //实现方式1
    val processedStream2 = dataStream.keyBy(_.id) .process(new TempChangeAlert(10.0)) //实现方式2
    val processedStream3 = dataStream.keyBy(_.id) .flatMap(new TempChangeAlertFlatMap(10.0)) //实现方式3
    val processedStream4 = dataStream.keyBy(_.id) .flatMapWithState[(String,Double,Double),Double]{ //没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
      case (input:SensorReading,None) => { println(s"方法三,第一条数据进来,$input") (List.empty,Some(input.temperature)) } //如果有状态的话,就与上一次的温度值比较差值,如果大于预设值,就报警
      case (input:SensorReading,lastTemp:Some[Double]) => { val diff = (input.temperature - lastTemp.get).abs if(diff>10.0){ (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature)) }else{ (List.empty,Some(input.temperature)) } } } processedStream2.printToErr("processedStream2 data") processedStream3.printToErr("processedStream3 data") processedStream4.printToErr("processedStream4 data") dataStream.print("input data") env.execute("StateBackend Test") } } class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String] { //定义一个状态,用来保存上一个数据的温度值 //用懒加载的方式,一开始定义的时候我们还不执行,等到调用的时候去执行 //所有的状态都这么定义 当成一个变量直接用
  lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) //定义一个状态用来保存定时器的时间戳
  lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer", classOf[Long])) //判断温度连续上升 //跟上一次数据进行比较 如果比较一直大 10秒种内进行报警 //注册一个定时器 把上一次的数据保存成当前的状态
  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { //每来一条数据的时候,从状态中取出上一次的温度值
    val preTemp = lastTemp.value() var curTimerTs = currentTimer.value() //更新温度值
 lastTemp.update(value.temperature) //加个if判断最开始的温度是否为0来判断是否是第一条数据 温度上升且没有设置过定时器,则注册定时器
    if (preTemp == 0.0) { println("这是第一条数据进来") } else if ((value.temperature > preTemp) && (curTimerTs == 0L)) { val timerTs = ctx.timerService().currentProcessingTime() + 10000L

      //传入当前时间加1 是时间戳
 ctx.timerService().registerProcessingTimeTimer(timerTs) currentTimer.update(timerTs) } else if (value.temperature <= preTemp) { //如果温度下降 或者是第一条数据 删除定时器
 ctx.timerService().deleteProcessingTimeTimer(curTimerTs) //删除定时器之后将状态清空
 currentTimer.clear() } } //在回调函数中执行定时器到的逻辑 //当前的时间 ctx上下文 out输出信息
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { //直接输出报警信息
    out.collect(ctx.getCurrentKey + "温度连续上升") //考虑真实情况,将状态都清空
 currentTimer.clear() } } class TempChangeAlert(thresHold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { //定义一个状态变量,保存上次的温度值
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { //获取上一次的温度值
    val lastTemp = lastTempState.value() //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
    val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法一,第一条数据进来,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次处理之后将状态进行更新
 lastTempState.update(value.temperature) } } class TempChangeAlertFlatMap(thresHold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{ //定义
  private var lastTempState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { //初始化的时候声明变量
    lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double])) } override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //获取上一次的温度值
    val lastTemp = lastTempState.value() //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
    val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法二,第一条数据进来,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次处理之后将状态进行更新
 lastTempState.update(value.temperature) } }

 

运行测试:

输入数据:

 

 

结果:


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM