Flink 有狀態的算子和應用程序


流式計算分為無狀態有狀態兩種情況。無狀態的計算觀察每個獨立事件,並根據最后一個事件輸出結果。例如,流處理應用程序從傳感器接收水位數據,並在水位超過指定高度時發出警告。有狀態的計算則會基於多個事件輸出結果。以下是一些例子。

  • 所有類型的窗口。例如,計算過去一小時的平均水位,就是有狀態的計算。
  • 所有用於復雜事件處理的狀態機。例如,若在一分鍾內收到兩個相差20cm以上的水位差讀數,則發出警告,這是有狀態的計算。
  • 流與流之間的所有關聯操作,以及流與靜態表或動態表之間的關聯操作,都是有狀態的計算。

無狀態流處理和有狀態流處理的主要區別:

  • 無狀態處理分別接收每條數據記錄(黑條),然后根據最新輸入的數據生成輸出數據(白條)
  • 有狀態流處理會維護狀態(根據每條輸入記錄進行更新),並給予最新輸入的記錄和當前的狀態值生成輸出記錄(灰條)

盡管無狀態的計算很重要,但是流處理對有狀態的計算更感興趣。事實上,正確地實現有狀態的計算比實現無狀態的計算難得多。舊的流處理系統並不支持有狀態的計算,而新一代的流處理系統則將狀態及其正確性視為重中之重。

1、 算子狀態(operator state)

Flink內置的很多算子,數據源source,數據存儲sink都是有狀態的,流中的數據都是buffer records,會保存一定的元素或者元數據。例如: ProcessWindowFunction會緩存輸入流的數據,ProcessFunction會保存設置的定時器信息等等。

在Flink中,狀態始終與特定算子相關聯。總的來說,有兩種類型的狀態:

  • 算子狀態(operator state)
  • 鍵控狀態(keyed state)

1.1.1 算子狀態 (operator state)

算子狀態的作用范圍限定為算子任務。這意味着由同一並行任務所處理的所有數據都可以訪問到相同的狀態,狀態對於同一任務而言是共享的。算子狀態不能由相同或不同算子的另一個任務訪問。

Flink為算子狀態提供三種基本數據結構:

(1)列表狀態(List state)

將狀態表示為一組數據的列表。

(2)聯合列表狀態(Union list state)

也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復。

(3)廣播狀態(Broadcast state)

如果一個算子有多項任務,而它的每項任務狀態又都相同,那么這種特殊情況最適合應用廣播狀態。

1.1.2 鍵控狀態(keyed state)

2、 鍵控狀態 (keyed state)

鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的。Flink為每個鍵值維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個算子任務中,這個任務會維護和處理這個key對應的狀態。當任務處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key。因此,具有相同key的所有數據都會訪問相同的狀態。Keyed State很類似於一個分布式的key-value map數據結構,只能用於KeyedStream(keyBy算子處理之后)。

Flink的Keyed State支持以下數據類型:

  • ValueState[T] 保存單個值,值得類型為T
    • get操作: ValueState.value()
    • set操作: ValueState.update(value: T)
  • ListState[T]保存一個列表,列表里的元素的數據類型為T
    • ListState.add(value: T)
    • ListState.addAll(values: java.util.List[T])
    • ListState.get()返回Iterable[T]
    • ListState.update(values: java.util.List[T])
  • MapState[K, V]保存Key-Value對
    • MapState.get(key: K)
    • MapState.put(key: K, value: V)
    • MapState.contains(key: K)
    • MapState.remove(key: K)
  • ReducingState[T]
  • AggregatingState[I, O]

State.clear()是清空

通過RuntimeContext注冊StateDescriptor。

StateDescriptor以狀態state的名字和存儲的數據類型為參數

在open方法中創建state變量(如果直接初始化,會拋出異常,Exception in thread "main" java.lang.IllegalStateException: The runtime context has not been initialized.

需求:如果連續兩次水位差超過40cm,發生預警信息。

 1 object AlarmTest {
 2     def main(args: Array[String]): Unit = {
 3        
 4         // TODO 需求:如果連續兩次水位差超過40cm,發生預警信息。
 5         
 6         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
 7         env.setParallelism(1)
 8         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 9         
10         //val dataDS: DataStream[String] = env.readTextFile("input/sensor-data2.log")
11         val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999)
12         val waterDS = dataDS.map(
13             data=>{
14                 val datas = data.split(",")
15                 WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
16             }
17         )
18     
19         // 設定數據的事件時間已經定義Watermark
20         val markDS: DataStream[WaterSensor] = waterDS.assignAscendingTimestamps(_.ts * 1000)
21     
22         // TODO 對分區后的數據進行處理
23         markDS.keyBy(_.id)
24                 .process( new KeyedProcessFunction[String, WaterSensor, String] {
25     
26                     private var lastWaterVal : ValueState[Int] = _
27     
28                     override def open(parameters: Configuration): Unit = {
29                         lastWaterVal = getRuntimeContext.getState[Int](
30                             new ValueStateDescriptor[Int]("lastWaterVal", classOf[Int])
31                         )
32                     }
33     
34                     // TODO 當水位差超過40cm,馬上預警
35                     override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext, out: Collector[String]): Unit = {
36                         out.collect("水位差超過40cm")
37                     }
38     
39                     override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
40                         // TODO 當前水位應該減去上一次記錄的水位是否超過40cm
41                         if ( (value.vc - lastWaterVal.value()) > 40 ) {
42                             ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000)
43                         }
44                         lastWaterVal.update(value.vc)
45                         
46                     }
47                 } ).print("alarm>>>>")
48         markDS.print("mark>>>>>>>")
49         env.execute()      
50     }   
51 }

 

也可以用lazy懶加載方式,

1  private lazy var lastWaterVal : ValueState[Int] =getRuntimeContext.getState[Int](
2                             new ValueStateDescriptor[Int]("lastWaterVal", classOf[Int])

 

3、 狀態后端(state Backend)

每傳入一條數據,有狀態的算子任務都會讀取和更新狀態。由於有效的狀態訪問對於處理數據的低延遲至關重要,因此每個並行任務都會在本地維護其狀態,以確保快速的狀態訪問。

狀態的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態后端(state backend)

狀態后端主要負責兩件事:

  • 本地的狀態管理
  • 將檢查點(checkpoint)狀態寫入遠程存儲

狀態后端分類:

(1) MemoryStateBackend

內存級的狀態后端,會將鍵控狀態作為內存中的對象進行管理,將它們存儲在TaskManager的JVM堆上

將checkpoint存儲在JobManager的內存中

(2)FsStateBackend

本地狀態,跟MemoryStateBackend一樣,也會存在TaskManager的JVM堆上

將checkpoint存到遠程的持久化文件系統(FileSystem)上

(3)RocksDBStateBackend

將所有狀態序列化后,存入本地的RocksDB中存儲

1 <dependency>
2     <groupId>org.apache.flink</groupId>
3     <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
4     <version>1.7.2</version>
5 </dependency>

 

設置狀態后端為RocksDBStateBackend:

1 val checkpointPath : String = "XXX路徑"
2 val stateBackend:StateBackend = new RocksDBStateBackend(checkpointPath)
3 env.setStateBackend(stateBackend)
4 env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM