Implementing Stateful Functions
source function的stateful看官網,要加lock
Declaring Keyed State at the RuntimeContext
state可通過 rich functions 、Listcheckpoint和CheckpointFunction獲得。
在Flink中,當對某個數據進行處理時,從上下文中獲取state時,只會獲取該數據key對應的state。
四種Keyed State:
-
ValueState[T]:ValueState[T]holds a single value of typeT..value() 和 update(value: T)
-
ListState[T]:ListState[T]holds a list of elements of typeT..add(value: T) or .addAll(values: java.util.List[T])
.get() which returns an
Iterable[T]over all state elements..update(values: java.util.List[T]),沒有刪除
-
MapState[K, V]:MapState[K, V]holds a map of keys and values. The state primitive offers many methods of a regular Java Map such asget(key: K),put(key: K, value: V),contains(key: K),remove(key: K), and iterators over the contained entries, keys, and values. -
ReducingState[T]:ReducingState[T]offers the same methods asListState[T](except foraddAll()andupdate()) but instead of appending values to a list,ReducingState.add(value: T)immediately aggregatesvalueusing aReduceFunction. The iterator returned byget()returns anIterablewith a single entry, which is the reduced value. -
AggregatingState[I, O]:AggregatingState[I, O]it uses the more generalAggregateFunctionto aggregate values.
// 啟動警報,如果兩次測量的差距過大
val keyedData: KeyedStream[SensorReading, String] = sensorData
.keyBy(_.id)
.flatMap(new TemperatureAlertFunction(1.1))
// --------------------------------------------------------------
class TemperatureAlertFunction(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
// 定義state的類型
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// create state descriptor,實例化state,需要state的名字和類
val lastTempDescriptor =
new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
// obtain the state handle and register state for last temperature 登記時會檢查state backend是否有這個函數的data、state with the given name and type,從checkpoint恢復有可能有,有就關聯
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor) // 只能取得當前數據的key相關的state。如果上面定義類ListState,那就getListState
}
override def flatMap(
in: SensorReading,
out: Collector[(String, Double, Double)]): Unit = {
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
if (lastTemp > 0d && (in.temperature / lastTemp) > threshold) {
// temperature increased by more than the threshold
out.collect((in.id, in.temperature, lastTemp))
}
// update lastTemp state
this.lastTempState.update(in.temperature)
}
}
// 簡化
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.flatMapWithState[(String, Double, Double), Double] {
case (in: SensorReading, None) =>
// no previous temperature defined.
// Just update the last temperature
(List.empty, Some(in.temperature))
case (in: SensorReading, lastTemp: Some[Double]) =>
// compare temperature difference with threshold
if (lastTemp.get > 0 && (in.temperature / lastTemp.get) > 1.4) {
// threshold exceeded. Emit an alert and update the last temperature
(List((in.id, in.temperature, lastTemp.get)), Some(in.temperature))
} else {
// threshold not exceeded. Just update the last temperature
(List.empty, Some(in.temperature))
}
}
Implementing Operator List State with the ListCheckpointed Interface
函數需要實現ListCheckpointed接口才能處理list state。這個接口有兩個方法:
- snapshotState:當Flink請求checkpoint時調用。方法要返回a list of serializable state objects
- restoreState:函數state初始化時調用,例如從checkpoint恢復。
上面兩個方法在調整並發度時也要用到。
class HighTempCounter(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (Int, Long)]
with ListCheckpointed[java.lang.Long] {
// index of the subtask
private lazy val subtaskIdx = getRuntimeContext
.getIndexOfThisSubtask
// local count variable
private var highTempCnt = 0L
override def flatMap(
in: SensorReading,
out: Collector[(Int, Long)]): Unit = {
if (in.temperature > threshold) {
// increment counter if threshold is exceeded
highTempCnt += 1
// emit update with subtask index and counter
out.collect((subtaskIdx, highTempCnt))
}
}
override def restoreState(
state: util.List[java.lang.Long]): Unit = {
highTempCnt = 0
// restore state by adding all longs of the list
for (cnt <- state.asScala) { // 因為 ListCheckpointed 是Java的
highTempCnt += cnt
}
}
override def snapshotState(
chkpntId: Long,
ts: Long): java.util.List[java.lang.Long] = {
// split count into ten partial counts,為了提高並發度時,state能夠均勻分布,而非有些從0開始
val div = highTempCnt / 10
val mod = (highTempCnt % 10).toInt
// return count as ten parts
(List.fill(mod)(new java.lang.Long(div + 1)) ++
List.fill(10 - mod)(new java.lang.Long(div))).asJava
}
}
Using Connected Broadcast State
這個功能checkpoint時會保存多份相同的,為避免恢復時所有task都讀取一個文件。但調整並法度時和理論一樣,只傳遞一份。
步驟:
- 用DataStream.broadcast()得到BroadcastStream,其中參數為一或多個MapStateDescriptor對象,每個descriptor代表一個單獨的broadcast state用於后續的操作。
- DataStream或KeyedStream.connect(BroadcastStream)
- 應用函數到connected stream,keyed或non-keyed的接口不同
// 下面例子是根據thresholds動態調整alert的閾值。
val thresholds: DataStream[ThresholdUpdate] = env.fromElements(
ThresholdUpdate("sensor_1", 5.0d),
ThresholdUpdate("sensor_2", 2.0d),
ThresholdUpdate("sensor_1", 1.2d))
val keyedSensorData: KeyedStream[SensorReading, String] =
sensorData.keyBy(_.id)
// the descriptor of the broadcast state
val broadcastStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds",
classOf[String],
classOf[Double])
// create a BroadcastStream
val broadcastThresholds: BroadcastStream[ThresholdUpdate] =
thresholds.broadcast(broadcastStateDescriptor)
// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.connect(broadcastThresholds)
.process(new UpdatableTempAlertFunction(4.0d))
// --------------------------------------------------------------
// 如果非KeyedStream,用BroadcastProcessFunction,但它沒有timer服務來登記和調用onTimer
class UpdatableTempAlertFunction(val defaultThreshold: Double)
extends KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)] {
// the descriptor of the broadcast state
private lazy val thresholdStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds",
classOf[String],
classOf[Double])
// the keyed state handle
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// create keyed state descriptor
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp",
classOf[Double])
// obtain the keyed state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}
// 下面方法不能調用key state,因為broadcast不是key Stream。要對key state進行操作的話,用keyedCtx.applyToKeyedState(StateDescriptor, KeyedStateFunction)。
override def processBroadcastElement(
update: ThresholdUpdate,
keyedCtx: KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)]#KeyedContext,
out: Collector[(String, Double, Double)]): Unit = {
if (update.threshold >= 1.0d) {
// configure a new threshold of the sensor
thresholds.put(update.id, update.threshold)
} else {
// remove sensor specific threshold
thresholds.remove(update.id)
}
}
override def processElement(
reading: SensorReading,
keyedReadOnlyCtx: KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)]#KeyedReadOnlyContext,
out: Collector[(String, Double, Double)]): Unit = {
// get 只讀 broadcast state
val thresholds: MapState[String, Double] = keyedReadOnlyCtx
.getBroadcastState(thresholdStateDescriptor)
// get threshold for sensor
val sensorThreshold: Double =
if (thresholds.contains(reading.id)) {
thresholds.get(reading.id)
} else {
defaultThreshold
}
// fetch the last temperature from keyed state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
if (lastTemp > 0 &&
(reading.temperature / lastTemp) > sensorThreshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, lastTemp))
}
// update lastTemp state
this.lastTempState.update(reading.temperature)
}
}
Using the CheckpointedFunction Interface
它是最底層的stateful functions接口,是唯一提供list union state的接口。它有兩個方法:
-
initializeState():作業啟動或重啟時調用,包含了重啟時的恢復邏輯。它的調用伴隨FunctionInitializationContext,提供了OperatorStateStore和 KeyedStateStore的訪問,它們都是用於登記state。 -
snapshotState():在checkpoint時調用,接收FunctionSnapshotContext參數,用於訪問checkpoint的創建時間、id等。這個方法是為了在checkpoint前完成對state的更新。和CheckpointListener接口結合,能寫數據到外部存儲時和checkpoint實現同步。
// 創建一個具有key和operator狀態的函數,該函數按key和operator實例計算有多少傳感器讀數超過指定閾值。
class HighTempCounter(val threshold: Double)
extends FlatMapFunction[SensorReading, (String, Long, Long)]
with CheckpointedFunction {
// local variable for the operator high temperature cnt
var opHighTempCnt: Long = 0
var keyedCntState: ValueState[Long] = _
var opCntState: ListState[Long] = _
override def flatMap(
v: SensorReading,
out: Collector[(String, Long, Long)]): Unit = {
// check if temperature is high
if (v.temperature > threshold) {
// update local operator high temp counter
opHighTempCnt += 1
// update keyed high temp counter
val keyHighTempCnt = keyedCntState.value() + 1
keyedCntState.update(keyHighTempCnt)
// emit new counters
out.collect((v.id, keyHighTempCnt, opHighTempCnt))
}
}
override def initializeState(
initContext: FunctionInitializationContext): Unit = {
// initialize keyed state
val keyCntDescriptor = new ValueStateDescriptor[Long](
"keyedCnt",
createTypeInformation[Long])
keyedCntState = initContext.getKeyedStateStore
.getState(keyCntDescriptor)
// initialize operator state
val opCntDescriptor = new ListStateDescriptor[Long](
"opCnt",
createTypeInformation[Long])
opCntState = initContext.getOperatorStateStore
.getListState(opCntDescriptor)
// initialize local variable with state
opHighTempCnt = opCntState.get().asScala.sum
}
override def snapshotState(
snapshotContext: FunctionSnapshotContext): Unit = {
// update operator state with local state
opCntState.clear()
opCntState.add(opHighTempCnt)
}
}
// 對於sink function的例子
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int)): Unit = {
bufferedElements += value
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
// checkpoint前清理舊的state
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
// 用UnionListState時,getUnionListState
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) {
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
Receiving Notifications about Completed Checkpoints
應用程序的狀態永遠不會處於一致狀態,除非采用checkpoint的邏輯時間點。對於一些operator,知道checkpoint什么時候完成是重要的。例如,將數據寫入具有一次性保證的外部系統的sink functions必須僅發出在成功checkpoint之前接收的記錄,以確保在發生故障時不會重新計算接收到的數據。
CheckpointListener接口的notifyCheckpointComplete方法被JM調用,登記checkpoint完成,例如當所有operators成功將他們的state復制到遠程。Flink不保證每個完成的checkpoint都調用這個方法,所以有可能有的task錯失這個提醒。
Robustness and Performance of Stateful Applications
Flink有三種state backend: InMemoryStateBackend, FsStateBackend, and RocksDBStateBackend。也可以實現StateBackend 接口來自定義。IM和Fs將state作為一般的對象存儲在TM的JVM進程。RDB則序列化所有state為RocksDB instance,當有大量state時適合使用。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val checkpointPath: String = ???
val incrementalCheckpoints: Boolean = true
// configure path for checkpoints on the remote filesystem
val backend = new RocksDBStateBackend(
checkpointPath,
incrementalCheckpoints)
// configure path for local RocksDB instance on worker
backend.setDbStoragePath(dbPath)
// configure RocksDB options
backend.setOptions(optionsFactory)
// configure state backend
env.setStateBackend(backend)
Enabling Checkpointing
當開啟后,JM會定期實例化checkpoint。更多調整看C9
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set checkpointing interval to 10 seconds (10000 milliseconds)
env.enableCheckpointing(10000L).set.... // 其他選項
Updating Stateful Operators
這個功能的關鍵在於成功反序列化savepoint的數據和正確地映射數據到state。
- 正確映射
默認情況下,identifier 根據operator的屬性及其前驅屬性計算為唯一哈希值。 因此,如果operator或其前驅更改,那Flink將無法映射先前保存點的狀態,則identifier 將不可避免地發生變化。為了能夠刪除或增加operator,必須手動分配唯一identifier
val alerts: DataStream[(String, Double, Double)] = sensorData
.keyBy(_.id)
// apply stateful FlatMap and set unique ID
.flatMap(new TemperatureAlertFunction(1.1)).uid("alertFunc")
默認情況下,從savepoint中恢復要讀取所有保存的state,但更新后可能不需要,則可以做調整。
- 反序列化
序列化的兼容性和任何有state的 operator相關,如果輸入和輸出被改變,就會影響更新的兼容性。建議將帶有支持版本控制的編碼的數據類型用作內置DataStream operators with state的輸入類型,這樣改變就不會有問題。如果之前沒有使用serializers with versioned encodings,Flink也能通過TypeSerializer 接口的兩個方法實現。詳細要另找資料。
Tuning the Performance of Stateful Applications
對於RocksDBStateBackend ,VS是的訪問和更新完全反序列化和序列化的;LS訪問時需要反序列化所有entries,更新時只需序列化更新的節點,所以如果經常添加state由很少訪問,LS優於ValueState[List[X]];MS的訪問和更新只涉及被訪問的k-v的反序列化和序列化,遍歷時,entries是從RocksDB預取的,只有實際訪問key或v時才會反序列化。用MapState[X, Y] 優於ValueState[HashMap[X, Y]]
每次函數調用只更新一次狀態?Since checkpoints are synchronized with function invocations, multiple state updates do not provide any benefits but can cause additional serialization overhead
Preventing Leaking State
清除無用的key state。這個問題涉及到自定義stateful function和一些內置DataStream API,例如aggregates on a KeyedStream,反正無法定期清理state的函數都要考慮。否則只有key有限或者state不會無限擴大時這些函數才能使用。 count-based的window也有同樣問題,time-bassed就沒有,因會定時觸發清除state。
具有key state的函數只有在收到帶有該鍵的記錄時才能訪問key state,由於不知道某條數據所否是該key最后一條數據,所以不能直接刪除key state。但可以通過回調來進行key state的刪除,即過了一段時間,沒有該key的信息,就代表該key無用了。實現上通過登記一個timer來定時清除該key的state。目前支持登記timers的有Trigger interface for windows and the ProcessFunction.
// 啟動警報,如果兩次測量的差距過大,並且如果某個id如果在1小時內沒有更新timer,就會觸發onTime,並清除該key的state
class StateCleaningTemperatureAlertFunction(val threshold: Double)
extends ProcessFunction[SensorReading, (String, Double, Double)] {
// the keyed state handle for the last temperature
private var lastTempState: ValueState[Double] = _
// the keyed state handle for the last registered timer
private var lastTimerState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
// register state for last temperature
val lastTempDescriptor =
new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
// enable queryable state and set its external identifier
lastTempDescriptor.setQueryable("lastTemperature")
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
// register state for last timer
val timerDescriptor: ValueStateDescriptor[Long] =
new ValueStateDescriptor[Long]("timerState", classOf[Long])
lastTimerState = getRuntimeContext
.getState(timerDescriptor)
}
override def processElement(
in: SensorReading,
ctx: ProcessFunction[SensorReading, (String, Double, Double)]#Context,
out: Collector[(String, Double, Double)]) = {
// get current watermark and add one hour
val checkTimestamp =
ctx.timerService().currentWatermark() + (3600 * 1000)
// register new timer.
// Only one timer per timestamp will be registered. Timers with identical timestamps are deduplicated. That is also the reason why we compute the clean-up time based on the watermark and not on the record timestamp.
ctx.timerService().registerEventTimeTimer(checkTimestamp)
// update timestamp of last timer
lastTimerState.update(checkTimestamp)
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
if (lastTemp > 0.0d && (in.temperature / lastTemp) > threshold) {
// temperature increased by more than the threshold
out.collect((in.id, in.temperature, lastTemp))
}
// update lastTemp state
this.lastTempState.update(in.temperature)
}
override def onTimer(
ts: Long,
ctx: ProcessFunction[SensorReading, (String, Double, Double)]#OnTimerContext,
out: Collector[(String, Double, Double)]): Unit = {
// get timestamp of last registered timer
val lastTimer = lastTimerState.value()
// check if the last registered timer fired 因為上面每個數據處理時timer不是被覆蓋,而是登記了新的timer。執行期間,PF會保留a list of all registered timers
if (lastTimer != null.asInstanceOf[Long] && lastTimer == ts) {
// clear all state for the key
lastTempState.clear()
lastTimerState.clear()
}
}
}
State Time-To-Live (TTL)(v1.6了解,未完善)
TTL可以分配到key state,如果key state超過TTL設定的時間,而且被讀取時.value(),會被清理。collection state能實現entries獨立,即一個entries一個TTL。時間目前只支持process。增加這個功能意味着更多內存消耗。不會被checkpoint。
The map state with TTL currently supports null user values only if the user value serializer can handle null values. If the serializer does not support null values, it can be wrapped with NullableSerializer at the cost of an extra byte in the serialized form.
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// whether the expired value is returned on read access,其他選項StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp / ReturnExpiredIfNotCleanedUp
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
// 其他選項
// returned if still available
setStateVisibility()
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
Queryable State
只支持key-point-queries
Architecture and Enabling Queryable State

為了開啟這個功能,要添加flink-queryable-state-runtime JAR 到TM進程的classpath。This is done by copying it from the ./opt folder of you installation into the ./lib folder. 相關端口和參數設置在./conf/flink-conf.yaml。
Exposing Queryable State
方法1:看上面例子中的open()
方法2:
val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData
// project to sensor id and temperature
.map(r => (r.id, r.temperature))
// compute every 10 seconds the max temperature per sensor
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.max(1)
// store max temperature of the last 10 secs for each sensor in a queryable state.
tenSecsMaxTemps
// key by sensor id
.keyBy(_._1)
.asQueryableState("maxTemperature")
其他重載方法:
asQueryableState(id: String, stateDescriptor: ValueStateDescriptor[T])can be used to configure theValueStatein more detail, e.g., to configure a custom serializer.asQueryableState(id: String, stateDescriptor: ReducingStateDescriptor[T])configures aReducingStateinstead of aValueState. TheReducingStateis also updated for each incoming record. However, in contrast to theValueState, the new record does not replace the existing value but is instead combined with the previous version using the state’sReduceFunction.
Querying State from External Applications
任何JVM-based application可以查詢queryable state通過使用QueryableStateClient。添加依賴即可flink-queryable-state-client-java_2.11
當獲取到client后,調用getKvState(),參數為JobID of the running application(REST API, the web UI, or the log files), the state identifier, the key for which the state should be fetched, the TypeInformation for the key, and the StateDescriptorof the queried state. 返回結果是 CompletableFuture[S] where S is the type of the state, e.g., ValueState[_] or MapState[_, _]
object TemperatureDashboard {
// assume local setup and TM runs on same machine as client
val proxyHost = "127.0.0.1"
val proxyPort = 9069
// jobId of running QueryableStateJob. can be looked up in logs of running job or the web UI
val jobId = "d2447b1a5e0d952c372064c886d2220a"
// how many sensors to query
val numSensors = 5
// how often to query the state
val refreshInterval = 10000
def main(args: Array[String]): Unit = {
// configure client with host and port of queryable state proxy
val client = new QueryableStateClient(proxyHost, proxyPort)
val futures = new Array[
CompletableFuture[ValueState[(String, Double)]]](numSensors)
val results = new Array[Double](numSensors)
// print header line of dashboard table
val header =
(for (i <- 0 until numSensors) yield "sensor_" + (i + 1))
.mkString("\t| ")
println(header)
// loop forever
while (true) {
// send out async queries
for (i <- 0 until numSensors) {
futures(i) = queryState("sensor_" + (i + 1), client)
}
// wait for results
for (i <- 0 until numSensors) {
results(i) = futures(i).get().value()._2
}
// print result
val line = results.map(t => f"$t%1.3f").mkString("\t| ")
println(line)
// wait to send out next queries
Thread.sleep(refreshInterval)
}
client.shutdownAndWait()
}
def queryState(
key: String,
client: QueryableStateClient)
: CompletableFuture[ValueState[(String, Double)]] = {
client
.getKvState[String, ValueState[(String, Double)], (String, Double)](
JobID.fromHexString(jobId),
"maxTemperature",
key,
Types.STRING,
new ValueStateDescriptor[(String, Double)](
"", // state name not relevant here
createTypeInformation[(String, Double)]))
}
}
參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske
