Flink| 狀態管理| 狀態編程


 

狀態后端(State Backends)

每傳入一條數據,有狀態的算子任務都會讀取和更新狀態;

由於有效的狀態訪問對於處理數據的低延遲至關重要,因此每個並行任務都會在本地內存維護其狀態,以確保快速的狀態訪問。

狀態的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態后端(State Backend)

狀態后端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠程存儲。

1. Flink中的狀態

流式計算分為無狀態和有狀態兩種情況

  • 無狀態的計算觀察每個獨立事件,並根據最后一個事件輸出結果。
  • 有狀態的計算則會基於多個事件輸出結果。

下圖展示了無狀態流處理和有狀態流處理的主要區別: 無狀態流處理分別接收每條數據記錄(圖中的黑條),然后根據最新輸入的數據生成輸出數據(白條)。有狀態流處理會維護狀態(根

據每條輸入記錄進行更新),並基於最新輸入的記錄和當前的狀態值生成輸出記錄(灰條)。

圖中輸入數據由黑條表示。無狀態流處理每次只轉換一條輸入記錄,並且僅根據最新的輸入記錄輸出結果(白條)。有狀態 流處理維護所有已處理記錄的狀態值,並根據每條新輸入的記

錄更新狀態,因此輸出記錄(灰條)反映的是綜合考慮多個事件之后的結果。

盡管無狀態的計算很重要,但是流處理對有狀態的計算更感興趣。舊的流處理系統並不支持有狀態的計算,而新一代的流處理系統則將狀態及其正確性視為重中之重。

           

流式處理(A. 可以是無狀態(基於某個獨立的事件計算出來后直接輸出了,來一個處理一個不涉及到其他東西,如map、flatmap、filter;超過一定溫度就報警 - 側輸出流;)、

                  B. 可以是有轉態的(求和、wordcount計算))

  • 狀態是針對一個任務而言的,由一個任務維護,並且用來計算某個結果的所有數據,都屬於這個任務的轉態;
  • 可以認為狀態就是一個本地變量,可以被任務的業務邏輯直接訪問;
  • Flink會進行狀態管理(狀態做序列化以二進制的形式全部存儲起來),包括狀態一致性、故障處理以及高效存儲和訪問,以便開發人員可以專注於應用程序的邏輯。

在Flink中,狀態始終與特定算子相關聯;為了運行時的Flink了解算子的狀態,算子需要預先注冊其狀態

有狀態的算子和應用程序

Flink內置的很多算子,數據源source,數據存儲sink都是有狀態的,流中的數據都是buffer records,會保存一定的元素或者元數據。例如: ProcessWindowFunction會緩存輸入流的數

據,ProcessFunction會保存設置的定時器信息等等。

在Flink中,狀態始終與特定算子相關聯。總的來說,有兩種類型的狀態:

  • 算子狀態(operator state),算子狀態的作用范圍限定為算子任務,一個任務一個狀態;
  • 鍵控(分區)狀態(keyed state),根據輸入數據流中定義的鍵(Key)來維護和訪問(基於KeyBy--KeyedStream上有任務出現的狀態,定義的不同的key來維護這個狀態;不同的key也是獨立訪問的,一個key只能訪問它自己的狀態,不同key之間也不能互相訪問);

A. 算子狀態

算子狀態的作用范圍限定為算子任務,由同一並行子任務所處理的所有數據都可以訪問到相同的狀態;

狀態對於同一個任務而言是共享的(每一個並行的子任務共享一個狀態);

算子狀態不能由相同或不同算子的另一個任務訪問(相同算子的不同任務之間也不能訪問);

算子狀態提供三種數據結構:

① 列表狀態(List state),將狀態表示為一組數據的列表;(會根據並行度的調整把之前的狀態重新分組重新分配

② 聯合列表狀態(Union list state),也將狀態表示為數據的列表,它常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復(把之前的每一個狀態廣播到對應的每個算子中)。

③ 廣播狀態(Broadcast state),如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應用廣播狀態(把同一個狀態廣播給所有算子子任務);

 B. 鍵控狀態(Keyed State)-- 更常用

 鍵控狀態是根據輸入數據流中定義的鍵(key)來維度和訪問狀態的;

 Flink為每個key維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個算子任務中,這個任務會維護和處理這個key對應的狀態;

 當任務處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key;

鍵控狀態Keyed State 數據結構:

① 值狀態(ValueState<T>),將狀態表示為單個值;(直接.value獲取,Set操作是.update)

  • get操作: ValueState.value()
  • set操作: ValueState.update(T value)

② 列表狀態(ListState<T>),將狀態表示為一組數據的列表(存多個狀態);(.get,.update,.add)

  • ListState.add(T value)
  • ListState.addAll(List<T> values)
  • ListState.get()返回Iterable<T>
  • ListState.update(List<T> values)

③ 映射狀態(MapState<K, V>),將狀態表示為一組Key-Value對;(.get,.put ,類似HashMap)

  • MapState.get(UK key)
  • MapState.put(UK key, UV value)
  • MapState.contains(UK key)
  • MapState.remove(UK key)

④ 聚合狀態(ReducingState<T>  & AggregatingState<I, O>),將狀態表示為一個用於聚合操作的列表;(.add不像之前添加到列表,它是直接聚合到之前的結果中)

    Reduce輸入輸出類型是不能變的,Aggregate可得到數據類型完全不一樣的結果;

State.clear()是清空操作。

鍵控狀態的使用:

聲明一個鍵控狀態: lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) ) 讀取狀態: val prevTemp = lastTemp.value() 對狀態賦值: lastTemp.update(value.temperature)

案例,利用KeyedState,實現這樣一個需求:檢測傳感器的溫度值,如果連續的兩個溫度差值超過10度,就輸出報警。代碼見下

狀態一致性

檢查點(checkpoint)

 

狀態后端(State Backends)

-- 狀態管理(存儲、訪問、維護和檢查點)

每傳入一條數據,有狀態的算子任務都會讀取和更新狀態;

由於有效的狀態訪問對於處理數據的低效遲至關重要,因此每個並行任務都會在本地維度其狀態,以確保快速的狀態訪問;

狀態的存儲、訪問以及維度,由一個可插入的組件決定,這個組件就叫做狀態后端(State Backends)

狀態后端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠程存儲;

狀態后端的分類

① MemoryStateBackend: 一般用於開發和測試

  • 內存級的狀態后端,會將鍵控狀態作為內存中的對象進行管理,將它們存儲在TaskManager的JVM堆上,而將checkpoint存儲在JobManager的內存中;
  • 特點快速、低延遲,但不穩定;

② FsStateBackend(文件系統狀態后端):生產環境

  • 將checkpoint存到遠程的持久化文件系統(FileSystem),HDFS上,而對於本地狀態,跟MemoryStateBackend一樣,也會存到TaskManager的JVM堆上。
  • 同時擁有內存級的本地訪問速度,和更好的容錯保證;(如果是超大規模的需要保存還是無法解決,存到本地狀態就可能OOM)

③ RocksDBStateBackend:

  • 將所有狀態序列化后,存入本地的RocksDB(本地數據庫硬盤空間,序列化到磁盤)中存儲,全部序列化存儲到本地。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.12</artifactId> <version>1.10.1</version> </dependency>

設置狀態后端為FsStateBackend,並配置檢查點和重啟策略:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. 狀態后端配置 //env.setStateBackend(new MemoryStateBackend());  env.setStateBackend(new FsStateBackend("", true)) //env.setStateBackend(new RocksDBStateBackend("")) // 2. 檢查點配置 開啟checkpoint env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setPreferCheckpointForRecovery(true); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); env.getCheckpointConfig().setCheckpointInterval(10000L) // 3. 重啟策略配置 // 固定延遲重啟(隔一段時間嘗試重啟一次) env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 嘗試重啟次數 100000L // 嘗試重啟的時間間隔,也可org.apache.flink.api.common.time.Time )); env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.minutes(5), Time.seconds(10)))

 

2. 狀態編程

利用Keyed State,實現這樣一個需求:檢測傳感器的溫度值,如果連續的兩個溫度差值超過10度,就輸出報警。

方法一: class TempChangeAlert(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { } //底層API processFunction API
方法二: class TempChangeAlert2(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ } //利用富函數
方法三: dataStream.keyBy(_.id)
              .flatMapWithState[(String, Double, Double), Double]{ } //FlatMap with keyed ValueState 的快捷方式,帶狀態的flatMap
import akka.pattern.BackoffSupervisor.RestartCount import com.xxx.fink.api.sourceapi.SensorReading import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} /** *狀態編程 * 檢測兩次溫度變化如果超過某個范圍就報警,比如超過10°就報警; */ object StateTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //env.setStateBackend(new RocksDBStateBackend("")) val stream: DataStream[String] = env.socketTextStream("hadoop101", 7777) val dataStream: DataStream[SensorReading] = stream.map(data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000 }) //方法一: val processedStream1: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id) .process(new TempChangeAlert(10.0)) //方法二: 除了processFunction,其他也可以有狀態 val processedStream2: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id) .flatMap(new TempChangeAlert2(10.0)) //方法三: 帶狀態的flatMap val processedStream3: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id) .flatMapWithState[(String, Double, Double), Double]{ //如果沒有狀態的話,也就是沒有數據過來,那么就將當前數據濕度值存入狀態 case (input: SensorReading, None) => (List.empty, Some(input.temperature)) //如果有狀態,就應該與上次的溫度值比較差值,如果大於閾值就輸出報警 case(input: SensorReading, lastTemp: Some[Double]) => val diff = (input.temperature - lastTemp.get).abs if (diff > 10.0){ (List((input.id, lastTemp.get, input.temperature)), Some(input.temperature)) }else{ (List.empty, Some(input.temperature)) } } dataStream.print("Input data:") processedStream3.print("process data:") env.execute("Window test") } } class TempChangeAlert(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] { //定義一個狀態變量,保存上次的溫度值 lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = { //獲取上次的溫度值 val lastTemp = lastTempState.value() //用當前的溫度值和上次的做差,如果大於閾值,則輸出報警 val diff = (value.temperature - lastTemp).abs if (diff > threshold){ out.collect(value.id, lastTemp, value.temperature) } lastTempState.update(value.temperature) //狀態更新  } } class TempChangeAlert2(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{ //flatMap本身是無狀態的,富函數版本的函數類都可以去操作狀態也有生命周期 private var lastTempState: ValueState[Double] = _ //賦一個空值; //初始化的聲明state變量 override def open(parameters: Configuration): Unit = { //可以定義一個lazy;也可以在聲明周期中拿; lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) } override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = { //獲取上次的溫度值 val lastTemp = lastTempState.value() //用當前溫度值和上次的求差值,如果大於閾值,輸出報警信息 val diff = (value.temperature - lastTemp).abs if (diff > threshold){ out.collect(value.id, lastTemp, value.temperature) } lastTempState.update(value.temperature) } }

測試:

###方法二測試:
Input data:> SensorReading(sensor_1,1547718199,35.8)
process data:> (sensor_1,0.0,35.8)
Input data:> SensorReading(sensor_1,1547718199,32.0)
Input data:> SensorReading(sensor_1,1547718199,25.0)
Input data:> SensorReading(sensor_1,1547718199,35.1)
process data:> (sensor_1,25.0,35.1)
Input data:> SensorReading(sensor_1,1547718199,12.0)
process data:> (sensor_1,35.1,12.0)

###方法三:
Input data:> SensorReading(sensor_1,1547718199,35.8)
Input data:> SensorReading(sensor_1,1547718199,25.0)
process data:> (sensor_1,35.8,25.0)
Input data:> SensorReading(sensor_1,1547718199,28.8)
Input data:> SensorReading(sensor_1,1547718199,39.8)
process data:> (sensor_1,28.8,39.8)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM