Flink基礎(十二):DS簡介(12) 有狀態算子和應用


狀態操作符和用戶自定義函數都是我們在寫流處理程序時,常用的工具。事實上,大部分稍微復雜一點的邏輯都需要保存數據或者保存計算結果。很多Flink內置的操作符例如:source操作符,sink操作符等等都是有狀態的,也就是說會緩存流數據或者計算結果。例如,窗口操作符將會為ProcessWindowFunction收集輸入的數據,或者收集ReduceFunction計算的結果。而ProcessFunction也會保存定時器事件,一些sink方法為了做到exactly-once,會將事務保存下來。除了內置的操作符以及提供的source和sink操作符,Flink的DataStream API還在UDF函數中暴露了可以注冊、保存和訪問狀態的接口。

本章重點討論有狀態的用戶自定義函數的實現,以及討論有狀態應用的性能和健壯性。特別的,我們將解釋在用戶自定義函數中,如何定義不同類型的狀態,以及如何與狀態進行交互。我們還討論了性能方面的問題以及如何控制狀態大小的問題。

1 實現有狀態的用戶自定義函數

我們知道函數有兩種狀態,鍵控狀態(keyed state)和操作符狀態(operator state)。

1.1 在RuntimeContext中定義鍵控狀態

用戶自定義函數可以使用keyed state來存儲和訪問key對應的狀態。對於每一個key,Flink將會維護一個狀態實例。一個操作符的狀態實例將會被分發到操作符的所有並行任務中去。這表明函數的每一個並行任務只為所有key的某一部分key保存key對應的狀態實例。所以keyed state和分布式key-value map數據結構非常類似。

keyed state僅可用於KeyedStream。Flink支持以下數據類型的狀態變量:

  • ValueState[T]保存單個的值,值的類型為T。
    • get操作: ValueState.value()
    • set操作: ValueState.update(value: T)
  • ListState[T]保存一個列表,列表里的元素的數據類型為T。基本操作如下:
    • ListState.add(value: T)
    • ListState.addAll(values: java.util.List[T])
    • ListState.get()返回Iterable[T]
    • ListState.update(values: java.util.List[T])
  • MapState[K, V]保存Key-Value對。
    • MapState.get(key: K)
    • MapState.put(key: K, value: V)
    • MapState.contains(key: K)
    • MapState.remove(key: K)
  • ReducingState[T]
  • AggregatingState[I, O]

  ReducingState[T]AggregatingState[IN, OUT]ListState[T]同屬於MergingState[T]。與ListState[T]不同的是,ReducingState[T]只有一個元素,而不是一個列表。它的原理是新元素通過add(value: T)加入后,與已有的狀態元素使用ReduceFunction合並為一個元素,並更新到狀態里。AggregatingState[IN, OUT]ReducingState[T]類似,也只有一個元素,只不過AggregatingState[IN, OUT]的輸入和輸出類型可以不一樣。ReducingState[T]AggregatingState[IN, OUT]與窗口上進行ReduceFunctionAggregateFunction很像,都是將新元素與已有元素做聚合。

State.clear()是清空操作。

scala version

val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)

val alerts: DataStream[(String, Double, Double)] = keyedData
  .flatMap(new TemperatureAlertFunction(1.7))

class TemperatureAlertFunction(val threshold: Double)
  extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])

    lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
  }

  override def flatMap(
    reading: SensorReading,
    out: Collector[(String, Double, Double)]
  ): Unit = {
    val lastTemp = lastTempState.value()
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      out.collect((reading.id, reading.temperature, tempDiff))
    }
    this.lastTempState.update(reading.temperature)
  }
}

上面例子中的FlatMapFunction只能訪問當前處理的元素所包含的key所對應的狀態變量。

不同key對應的keyed state是相互隔離的。

  • 通過RuntimeContext注冊StateDescriptor。StateDescriptor以狀態state的名字和存儲的數據類型為參數。數據類型必須指定,因為Flink需要選擇合適的序列化器。
  • 在open()方法中創建state變量。注意復習之前的RichFunction相關知識。

當一個函數注冊了StateDescriptor描述符,Flink會檢查狀態后端是否已經存在這個狀態。這種情況通常出現在應用掛掉要從檢查點或者保存點恢復的時候。在這兩種情況下,Flink會將注冊的狀態連接到已經存在的狀態。如果不存在狀態,則初始化一個空的狀態。

使用FlatMap with keyed ValueState的快捷方式flatMapWithState也可以實現以上需求。

scala version

val alerts: DataStream[(String, Double, Double)] = keyedSensorData
  .flatMapWithState[(String, Double, Double), Double] {
    case (in: SensorReading, None) =>
      // no previous temperature defined.
      // Just update the last temperature
      (List.empty, Some(in.temperature))
    case (SensorReading r, lastTemp: Some[Double]) =>
      // compare temperature difference with threshold
      val tempDiff = (r.temperature - lastTemp.get).abs
      if (tempDiff > 1.7) {
        // threshold exceeded.
        // Emit an alert and update the last temperature
        (List((r.id, r.temperature, tempDiff)), Some(r.temperature))
      } else {
        // threshold not exceeded. Just update the last temperature
        (List.empty, Some(r.temperature))
      }
  }

1.2 使用ListCheckpointed接口來實現操作符的列表狀態

操作符狀態會在操作符的每一個並行實例中去維護。一個操作符並行實例上的所有事件都可以訪問同一個狀態。Flink支持三種操作符狀態:list state, list union state, broadcast state。

一個函數可以實現ListCheckpointed接口來處理操作符的list state。ListCheckpointed接口無法處理ValueState和ListState,因為這些狀態是注冊在狀態后端的。操作符狀態類似於成員變量,和狀態后端的交互通過ListCheckpointed接口的回調函數實現。接口提供了兩個方法:

// 返回函數狀態的快照,返回值為列表
snapshotState(checkpointId: Long, timestamp: Long): java.util.List[T]
// 從列表恢復函數狀態
restoreState(java.util.List[T] state): Unit

當Flink觸發stateful functon的一次checkpoint時,snapshotState()方法會被調用。方法接收兩個參數,checkpointId為唯一的單調遞增的檢查點Id,timestamp為當master機器開始做檢查點操作時的牆上時鍾(機器時間)。方法必須返回序列化好的狀態對象的列表。

當宕機程序從檢查點或者保存點恢復時會調用restoreState()方法。restoreState使用snapshotState保存的列表來恢復。

下面的例子展示了如何實現ListCheckpointed接口。業務場景為:一個對每一個並行實例的超過閾值的溫度的計數程序。

class HighTempCounter(val threshold: Double)
    extends RichFlatMapFunction[SensorReading, (Int, Long)]
    with ListCheckpointed[java.lang.Long] {

  // index of the subtask
  private lazy val subtaskIdx = getRuntimeContext
    .getIndexOfThisSubtask
  // local count variable
  private var highTempCnt = 0L

  override def flatMap(
      in: SensorReading,
      out: Collector[(Int, Long)]): Unit = {
    if (in.temperature > threshold) {
      // increment counter if threshold is exceeded
      highTempCnt += 1
      // emit update with subtask index and counter
      out.collect((subtaskIdx, highTempCnt))
    }
  }

  override def restoreState(
      state: util.List[java.lang.Long]): Unit = {
    highTempCnt = 0
    // restore state by adding all longs of the list
    for (cnt <- state.asScala) {
      highTempCnt += cnt
    }
  }

  override def snapshotState(
      chkpntId: Long,
      ts: Long): java.util.List[java.lang.Long] = {
    // snapshot state as list with a single count
    java.util.Collections.singletonList(highTempCnt)
  }
}

上面的例子中,每一個並行實例都計數了本實例有多少溫度值超過了設定的閾值。例子中使用了操作符狀態,並且每一個並行實例都擁有自己的狀態變量,這個狀態變量將會被檢查點操作保存下來,並且可以通過使用ListCheckpointed接口來恢復狀態變量。

看了上面的例子,我們可能會有疑問,那就是為什么操作符狀態是狀態對象的列表。這是因為列表數據結構支持包含操作符狀態的函數的並行度改變的操作。為了增加或者減少包含了操作符狀態的函數的並行度,操作符狀態需要被重新分區到更多或者更少的並行任務實例中去。而這樣的操作需要合並或者分割狀態對象。而對於每一個有狀態的函數,分割和合並狀態對象都是很常見的操作,所以這顯然不是任何類型的狀態都能自動完成的。

通過提供一個狀態對象的列表,擁有操作符狀態的函數可以使用snapshotState()方法和restoreState()方法來實現以上所說的邏輯。snapshotState()方法將操作符狀態分割成多個部分,restoreState()方法從所有的部分中將狀態對象收集起來。當函數的操作符狀態恢復時,狀態變量將被分區到函數的所有不同的並行實例中去,並作為參數傳遞給restoreState()方法。如果並行任務的數量大於狀態對象的數量,那么一些並行任務在開始的時候是沒有狀態的,所以restoreState()函數的參數為空列表。

再來看一下上面的程序,我們可以看到操作符的每一個並行實例都暴露了一個狀態對象的列表。如果我們增加操作符的並行度,那么一些並行任務將會從0開始計數。為了獲得更好的狀態分區的行為,當HighTempCounter函數擴容時,我們可以按照下面的程序來實現snapshotState()方法,這樣就可以把計數值分配到不同的並行計數中去了。

override def snapshotState(
    chkpntId: Long,
    ts: Long): java.util.List[java.lang.Long] = {
  // split count into ten partial counts
  val div = highTempCnt / 10
  val mod = (highTempCnt % 10).toInt
  // return count as ten parts
  (List.fill(mod)(new java.lang.Long(div + 1)) ++
    List.fill(10 - mod)(new java.lang.Long(div))).asJava
}

1.3 使用連接的廣播狀態

一個常見的需求就是流應用需要將同樣的事件分發到操作符的所有的並行實例中,而這樣的分發操作還得是可恢復的。

我們舉個例子:一條流是一個規則(比如5秒鍾內連續兩個超過閾值的溫度),另一條流是待匹配的流。也就是說,規則流和事件流。所以每一個操作符的並行實例都需要把規則流保存在操作符狀態中。也就是說,規則流需要被廣播到所有的並行實例中去。

在Flink中,這樣的狀態叫做廣播狀態(broadcast state)。廣播狀態和DataStream或者KeyedStream都可以做連接操作。

下面的例子實現了一個溫度報警應用,應用有可以動態設定的閾值,動態設定通過廣播流來實現。

val sensorData: DataStream[SensorReading] = ...
val thresholds: DataStream[ThresholdUpdate] = ...
val keyedSensorData: KeyedStream[SensorReading, String] = sensorData
  .keyBy(_.id)

// the descriptor of the broadcast state
val broadcastStateDescriptor =
  new MapStateDescriptor[String, Double](
    "thresholds", classOf[String], classOf[Double])

val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds
  .broadcast(broadcastStateDescriptor)

// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
  .connect(broadcastThresholds)
  .process(new UpdatableTemperatureAlertFunction())

帶有廣播狀態的函數在應用到兩條流上時分三個步驟:

  • 調用DataStream.broadcast()來創建BroadcastStream,定義一個或者多個MapStateDescriptor對象。
  • 將BroadcastStream和DataStream/KeyedStream做connect操作。
  • 在connected streams上調用KeyedBroadcastProcessFunction/BroadcastProcessFunction。

下面的例子實現了動態設定溫度閾值的功能。

class UpdatableTemperatureAlertFunction()
    extends KeyedBroadcastProcessFunction[String,
      SensorReading, ThresholdUpdate, (String, Double, Double)] {

  // the descriptor of the broadcast state
  private lazy val thresholdStateDescriptor =
    new MapStateDescriptor[String, Double](
      "thresholds", classOf[String], classOf[Double])

  // the keyed state handle
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    // create keyed state descriptor
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])
    // obtain the keyed state handle
    lastTempState = getRuntimeContext
      .getState[Double](lastTempDescriptor)
  }

  override def processBroadcastElement(
      update: ThresholdUpdate,
      ctx: KeyedBroadcastProcessFunction[String,
        SensorReading, ThresholdUpdate,
        (String, Double, Double)]#Context,
      out: Collector[(String, Double, Double)]): Unit = {
    // get broadcasted state handle
    val thresholds = ctx
      .getBroadcastState(thresholdStateDescriptor)

    if (update.threshold != 0.0d) {
      // configure a new threshold for the sensor
      thresholds.put(update.id, update.threshold)
    } else {
      // remove threshold for the sensor
      thresholds.remove(update.id)
    }
  }

  override def processElement(
      reading: SensorReading,
      readOnlyCtx: KeyedBroadcastProcessFunction
        [String, SensorReading, ThresholdUpdate,
        (String, Double, Double)]#ReadOnlyContext,
      out: Collector[(String, Double, Double)]): Unit = {
    // get read-only broadcast state
    val thresholds = readOnlyCtx
      .getBroadcastState(thresholdStateDescriptor)
    // check if we have a threshold
    if (thresholds.contains(reading.id)) {
      // get threshold for sensor
      val sensorThreshold: Double = thresholds.get(reading.id)

      // fetch the last temperature from state
      val lastTemp = lastTempState.value()
      // check if we need to emit an alert
      val tempDiff = (reading.temperature - lastTemp).abs
      if (tempDiff > sensorThreshold) {
        // temperature increased by more than the threshold
        out.collect((reading.id, reading.temperature, tempDiff))
      }
    }

    // update lastTemp state
    this.lastTempState.update(reading.temperature)
  }
}

2 配置檢查點

10秒鍾保存一次檢查點。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;

// set checkpointing interval to 10 seconds (10000 milliseconds)
env.enableCheckpointing(10000L);

2.1 將hdfs配置為狀態后端

首先在IDEA的pom文件中添加依賴:

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.3</version>
<!--            <scope>provided</scope>-->
        </dependency>

hdfs-site.xml添加:

    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>

別忘了重啟hdfs文件系統!

然后添加本地文件夾和hdfs文件的映射:

hdfs getconf -confKey fs.default.name
hdfs dfs -put /home/parallels/flink/checkpoint hdfs://localhost:9000/flink

然后在代碼中添加:

env.enableCheckpointing(5000) env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink"))

檢查一下檢查點正確保存了沒有:

hdfs dfs -ls hdfs://localhost:9000/flink

3 保證有狀態應用的可維護性

3.1 指定唯一的操作符標識符

每一個操作符都可以指定唯一的標識符。標識符將會作為操作符的元數據和狀態數據一起保存到savepoint中去。當應用從保存點恢復時,標識符可以用來在savepoint中查找標識符對應的操作符的狀態數據。標識符必須是唯一的,否則應用不知道從哪一個標識符恢復。

強烈建議為應用的每一個操作符定義唯一標識符。例子:

DataStream<Tuple3<String, Double, Double>> alerts = keyedSensorData
  .flatMap(new TemperatureAlertFunction(1.1))  
  .uid("TempAlert");

3.2 指定操作符的最大並行度

操作符的最大並行度定義了操作符的keyed state可以被分到多少個key groups中。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;

// set the maximum parallelism for this application
env.setMaxParallelism(512);

DataStream<Tuple3<String, Double, Double>> alerts = keyedSensorData
  .flatMap(new TemperatureAlertFunction(1.1))
  // set the maximum parallelism for this operator and
  // override the application-wide value
  .setMaxParallelism(1024);

4 有狀態應用的性能和健壯性

4.1 選擇一個狀態后端

  • MemoryStateBackend將狀態當作Java的對象(沒有序列化操作)存儲在TaskManager JVM進程的堆上。
  • FsStateBackend將狀態存儲在本地的文件系統或者遠程的文件系統如HDFS。
  • RocksDBStateBackend將狀態存儲在RocksDB \footnote{Facebook開源的KV數據庫} 中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;

String checkpointPath = ???
// configure path for checkpoints on the remote filesystem
// env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))

val backend = new RocksDBStateBackend(checkpointPath)
// configure the state backend
env.setStateBackend(backend);

4.2 防止狀態泄露

流應用通常需要運行幾個月或者幾年。如果state數據不斷增長的話,會爆炸。所以控制state數據的大小十分重要。而Flink並不會清理state和gc。所以所有的stateful operator都需要控制他們各自的狀態數據大小,保證不爆炸。

例如我們之前講過增量聚合函數ReduceFunction/AggregateFunction,就可以提前聚合而不給state太多壓力。

我們來看一個例子,我們實現了一個KeyedProcessFunction,用來計算連續兩次的溫度的差值,如果差值超過閾值,報警。

我們之前實現過這個需求,但沒有清理掉狀態數據。比如一小時內不再產生溫度數據的傳感器對應的狀態數據就可以清理掉了。

 

class SelfCleaningTemperatureAlertFunction(val threshold: Double)
    extends KeyedProcessFunction[String,
      SensorReading, (String, Double, Double)] {

  // the keyed state handle for the last temperature
  private var lastTempState: ValueState[Double] = _
  // the keyed state handle for the last registered timer
  private var lastTimerState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    // register state for last temperature
    val lastTempDesc = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])
    lastTempState = getRuntimeContext
      .getState[Double](lastTempDescriptor)
    // register state for last timer
    val lastTimerDesc = new ValueStateDescriptor[Long](
      "lastTimer", classOf[Long])
    lastTimerState = getRuntimeContext
      .getState(timestampDescriptor)
  }

  override def processElement(
      reading: SensorReading,
      ctx: KeyedProcessFunction
        [String, SensorReading, (String, Double, Double)]#Context,
      out: Collector[(String, Double, Double)]): Unit = {

    // compute timestamp of new clean up timer
    // as record timestamp + one hour
    val newTimer = ctx.timestamp() + (3600 * 1000)
    // get timestamp of current timer
    val curTimer = lastTimerState.value()
    // delete previous timer and register new timer
    ctx.timerService().deleteEventTimeTimer(curTimer)
    ctx.timerService().registerEventTimeTimer(newTimer)
    // update timer timestamp state
    lastTimerState.update(newTimer)

    // fetch the last temperature from state
    val lastTemp = lastTempState.value()
    // check if we need to emit an alert
    val tempDiff = (reading.temperature - lastTemp).abs
    if (tempDiff > threshold) {
      // temperature increased by more than the threshold
      out.collect((reading.id, reading.temperature, tempDiff))
    }

    // update lastTemp state
    this.lastTempState.update(reading.temperature)
  }

  override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[String,
        SensorReading, (String, Double, Double)]#OnTimerContext,
      out: Collector[(String, Double, Double)]): Unit = {

    // clear all state for the key
    lastTempState.clear()
    lastTimerState.clear()
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM