Flink中的狀態
由一個任務維護,並且用來計算某個結果的所有數據,都屬於這個任務的狀態;可以認為狀態就是一個本地變量,可以被任務的業務邏輯訪問;Flink會進行狀態管理,包括狀態一致性,故障處理以及高效存儲和訪問,以使開發人員可以專注於應用程序的邏輯。
在Flink中,狀態始終與特定算子相關聯,為了使運行的Flink了解算子的狀態,算子需要預先注冊其狀態。
總的來說,有兩種類型的狀態:
算子狀態(Operator State): 算子狀態的作用范圍限定為算子任務。
鍵控狀態(Keyed State): 根據輸入數據流中定義的鍵(key)來維護和訪問。
算子狀態(Operator State)
算子狀態的作用范圍限定為算子任務,由同一個並行任務所處理的所有數據都可以訪問到相同的狀態,狀態對於同一任務而言使共享的,算子狀態不能由相同或不同算子的另一個任務訪問。
算子狀態數據結構
列表狀態(List State): 將狀態表示為一組數據的列表。
聯合列表狀態(Union list state): 也將狀態表示為數據的列表,它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復。
廣播狀態(Broadcast state): 如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應用廣播狀態。
鍵控狀態(Keyed State)
鍵控狀態時根據輸入數據流中定義的鍵(key)來維護和訪問的,Flink為每隔key維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個算子任務中,這個任務會維護和處理這個key對應的狀態,當文物處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key。
鍵控狀態數據結構
值狀態(Value state): 將狀態表示為單個的值。
列表狀態(list state): 將狀態表示為一組數據的列表。
映射狀態(Map state): 將狀態表示為一組key-value對。
聚合狀態(Reducing state & aggregating State): 將狀態表示為一個用於聚合操作的列表。
鍵控狀態的使用(值狀態(Value state)舉例:)
聲明一個鍵控狀態
//定義一個狀態,用來保存上一個數據的溫度值
//用懶加載的方式,一開始定義的時候我們還不執行,等到調用的時候去執行
//所有的狀態都這么定義 當成一個變量直接用
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
讀取狀態
//每來一條數據的時候,從狀態中取出上一次的溫度值
val preTemp = lastTemp.value()
對狀態賦值
//更新溫度值
lastTemp.update(value.temperature)
狀態后端(State Backends)
每傳入一條數據,有狀態的算子任務都會讀取和更新狀態。由於有效的狀態訪問對於處理數據的低延遲至關重要,因此每隔並行任務都會在本地維護其狀態,以確保快速的狀態訪問。狀態的存儲,訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態呀
后端(state backend),狀態后端主要負責兩件事:一是本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠程存儲。
//開啟checkpoint 傳入參數代表每隔多久進行checkpoint
env.enableCheckpointing(60000) //狀態后端 // env.setStateBackend(new MemoryStateBackend()) // env.setStateBackend(new RocksDBStateBackend(""))
狀態編程小應用(三種方式實現)
/**
* 需求:
* 檢測兩次溫度超過一定的范圍的話 就報警
* 傳感器的溫度 跳變太大
*/
方式一:繼承 KeyedProcessFunction 抽象類
class TempChangeAlert(thresHold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { //定義一個狀態變量,保存上次的溫度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { //獲取上一次的溫度值
val lastTemp = lastTempState.value() //用當前的溫度值和上次的溫度值計算一個差值,如果大於預設值,就報警‘
val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法一,第一條數據進來,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次處理之后將狀態進行更新
lastTempState.update(value.temperature) } }
方式二:繼承 RichFlatMapFunction 抽象類
class TempChangeAlertFlatMap(thresHold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{ //定義
private var lastTempState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { //初始化的時候聲明變量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double])) } override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //獲取上一次的溫度值
val lastTemp = lastTempState.value() //用當前的溫度值和上次的溫度值計算一個差值,如果大於預設值,就報警‘
val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法二,第一條數據進來,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次處理之后將狀態進行更新
lastTempState.update(value.temperature) } }
方式三:keyBy后調用 flatMapWithState 方法
//實現方式3
val processedStream4 = dataStream.keyBy(_.id) .flatMapWithState[(String,Double,Double),Double]{ //沒有狀態的話,也就是沒有數據來過,那么就將當前數據溫度值存入狀態
case (input:SensorReading,None) => { println(s"方法三,第一條數據進來,$input") (List.empty,Some(input.temperature)) } //如果有狀態的話,就與上一次的溫度值比較差值,如果大於預設值,就報警
case (input:SensorReading,lastTemp:Some[Double]) => { val diff = (input.temperature - lastTemp.get).abs if(diff>10.0){ (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature)) }else{ (List.empty,Some(input.temperature)) } } }
完整代碼:
package com.wyh.statebackendApi import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector /** * 需求: * 檢測兩次溫度超過一定的范圍的話 就報警 * 傳感器的溫度 跳變太大 */
//溫度傳感器讀數樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double) object Demo1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //開啟checkpoint 傳入參數代表每隔多久進行checkpoint
env.enableCheckpointing(60000) //狀態后端 // env.setStateBackend(new MemoryStateBackend()) // env.setStateBackend(new RocksDBStateBackend(""))
val stream = env.socketTextStream("localhost", 7777) //Transform操作
val dataStream: DataStream[SensorReading] = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //===到來的數據是升序的,准時發車,用assignAscendingTimestamps //指定哪個字段是時間戳 需要的是毫秒 * 1000 // .assignAscendingTimestamps(_.timestamp * 1000) //===處理亂序數據 // .assignTimestampsAndWatermarks(new MyAssignerPeriodic()) //==底層也是周期性生成的一個方法 處理亂序數據 延遲1秒種生成水位 同時分配水位和時間戳 括號里傳的是等待延遲的時間
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = { t.timestamp * 1000 } }) // val processedStream = dataStream.keyBy(_.id) // .process(new TempIncreAlert()) //實現方式1
val processedStream2 = dataStream.keyBy(_.id) .process(new TempChangeAlert(10.0)) //實現方式2
val processedStream3 = dataStream.keyBy(_.id) .flatMap(new TempChangeAlertFlatMap(10.0)) //實現方式3
val processedStream4 = dataStream.keyBy(_.id) .flatMapWithState[(String,Double,Double),Double]{ //沒有狀態的話,也就是沒有數據來過,那么就將當前數據溫度值存入狀態
case (input:SensorReading,None) => { println(s"方法三,第一條數據進來,$input") (List.empty,Some(input.temperature)) } //如果有狀態的話,就與上一次的溫度值比較差值,如果大於預設值,就報警
case (input:SensorReading,lastTemp:Some[Double]) => { val diff = (input.temperature - lastTemp.get).abs if(diff>10.0){ (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature)) }else{ (List.empty,Some(input.temperature)) } } } processedStream2.printToErr("processedStream2 data") processedStream3.printToErr("processedStream3 data") processedStream4.printToErr("processedStream4 data") dataStream.print("input data") env.execute("StateBackend Test") } } class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String] { //定義一個狀態,用來保存上一個數據的溫度值 //用懶加載的方式,一開始定義的時候我們還不執行,等到調用的時候去執行 //所有的狀態都這么定義 當成一個變量直接用
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) //定義一個狀態用來保存定時器的時間戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer", classOf[Long])) //判斷溫度連續上升 //跟上一次數據進行比較 如果比較一直大 10秒種內進行報警 //注冊一個定時器 把上一次的數據保存成當前的狀態
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { //每來一條數據的時候,從狀態中取出上一次的溫度值
val preTemp = lastTemp.value() var curTimerTs = currentTimer.value() //更新溫度值
lastTemp.update(value.temperature) //加個if判斷最開始的溫度是否為0來判斷是否是第一條數據 溫度上升且沒有設置過定時器,則注冊定時器
if (preTemp == 0.0) { println("這是第一條數據進來") } else if ((value.temperature > preTemp) && (curTimerTs == 0L)) { val timerTs = ctx.timerService().currentProcessingTime() + 10000L
//傳入當前時間加1 是時間戳
ctx.timerService().registerProcessingTimeTimer(timerTs) currentTimer.update(timerTs) } else if (value.temperature <= preTemp) { //如果溫度下降 或者是第一條數據 刪除定時器
ctx.timerService().deleteProcessingTimeTimer(curTimerTs) //刪除定時器之后將狀態清空
currentTimer.clear() } } //在回調函數中執行定時器到的邏輯 //當前的時間 ctx上下文 out輸出信息
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { //直接輸出報警信息
out.collect(ctx.getCurrentKey + "溫度連續上升") //考慮真實情況,將狀態都清空
currentTimer.clear() } } class TempChangeAlert(thresHold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { //定義一個狀態變量,保存上次的溫度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { //獲取上一次的溫度值
val lastTemp = lastTempState.value() //用當前的溫度值和上次的溫度值計算一個差值,如果大於預設值,就報警‘
val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法一,第一條數據進來,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次處理之后將狀態進行更新
lastTempState.update(value.temperature) } } class TempChangeAlertFlatMap(thresHold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{ //定義
private var lastTempState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { //初始化的時候聲明變量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double])) } override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //獲取上一次的溫度值
val lastTemp = lastTempState.value() //用當前的溫度值和上次的溫度值計算一個差值,如果大於預設值,就報警‘
val diff = (value.temperature - lastTemp).abs if(lastTemp==0.0){ println(s"方法二,第一條數據進來,$value") }else if(diff > thresHold) { out.collect((value.id, lastTemp, value.temperature)) } //做完一次處理之后將狀態進行更新
lastTempState.update(value.temperature) } }
運行測試:
輸入數據:
結果: