1. Flink SQL空闲状态保留时间和参数配置
Flink SQL 空闲状态保留时间是针对 SQL 中聚合 Key 而言的,空闲的时间也就是 Key 没有更新的时间。如果在 Flink SQL 任务中设置了空闲状态的保留时间,那么当状态空闲超过一定的时间后,状态就会被清理。
设置 Flink SQL 空闲状态保留时间有两个参数,状态空闲最小保留时间和状态空闲最大保留时间,很多人会问,为什么会设置两个时间参数呢,设置一个参数不就好了吗,先来看看这两个参数的定义:
- The minimum idle state retention time defines how long the state of an inactive key is at least kept before it is removed.(最小空闲状态时间定义了一个 Key 的状态至少空闲的时间)
- The maximum idle state retention time defines how long the state of an inactive key is at most kept before it is removed.(最大空闲状态时间定义了一个 Key 的状态至多空闲的时间)
你可能会有一个问题,直接使用一个时间参数,但状态到达这个时间就删除不就行了,为什么还需要定义两个时间参数呢,下面来结合源码进行分析。
2. Flink SQL 空闲状态保留时间实现原理分析
简单的讲,Flink SQL 空闲状态保留的时间底层是基于 KeyedProcessFunction 函数来进行实现的,然后为每个 Key,结合空闲状态时间的最小值和最大值注册 Timer ,然后到时间就进行状态清理。具体逻辑从 KeyedProcessFunctionWithCleanupState 这个类开始看起:
/** * A function that processes elements of a stream, and could cleanup state. * @param <K> Type of the key. * @param <IN> Type of the input elements. * @param <OUT> Type of the output elements. */ public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> implements CleanupState { private static final long serialVersionUID = 2084560869233898457L; private final long minRetentionTime; private final long maxRetentionTime; protected final boolean stateCleaningEnabled; // holds the latest registered cleanup timer private ValueState<Long> cleanupTimeState; public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) { this.minRetentionTime = minRetentionTime; this.maxRetentionTime = maxRetentionTime; this.stateCleaningEnabled = minRetentionTime > 1; } }
首先,这个类有一个是否能够清理空闲状态的标志,当空闲状态最小保留时间大于 1 时,这个标志就为 True。
同时,针对每一个 Key ,都有一个 ValueState,记录着这个 Key 的最新的 Timer 触发的时间。当然,这个值会随着这个 Key 的记录,后续可能会进行时间更新。
下面来看一下具体的 Timer 注册逻辑,空闲状态的清理的 Timer 是调用其方法 registerProcessingCleanupTimer 来进行注册,而方面中又调用了 CleanupState 中的 registerProcessingCleanupTimer 方法:
/** * Base interface for clean up state, both for {@link ProcessFunction} and {@link CoProcessFunction}. */ public interface CleanupState { default void registerProcessingCleanupTimer( ValueState<Long> cleanupTimeState, long currentTime, long minRetentionTime, long maxRetentionTime, TimerService timerService) throws Exception { // last registered timer Long curCleanupTime = cleanupTimeState.value(); // check if a cleanup timer is registered and // that the current cleanup timer won't delete state we need to keep if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { // we need to register a new (later) timer long cleanupTime = currentTime + maxRetentionTime; // register timer and remember clean-up time timerService.registerProcessingTimeTimer(cleanupTime); // delete expired timer if (curCleanupTime != null) { timerService.deleteProcessingTimeTimer(curCleanupTime); } cleanupTimeState.update(cleanupTime); } } }
每次当某个 Key 有消息记录处理时,先从状态中取出该 Key 最新的 Timer 的触发时间,如果为空,表示这调消息是这个 Key 的第一条记录,那么会使用当前的时间 + 最大空闲状态保留的时间作为 Timer 未来的触发时间。
如果当前时间 + 状态最小的空闲状态保留的时间 > 上一次注册 Timer 的触发清理的时间,那么也重新注册 Timer,Timer的时间也为当前的时间 + 最大空闲状态保留的时间,同时,删除上一次注册的清理的 Timer。未来每来一条这个 Key 的消息记录时,便会执行上面的逻辑。如果没有满足上面的逻辑,就不做任何处理。
最终,当 Timer 触发时,会调用 State 的 clear()方法,进行状态清理。
protected def cleanupState(states: State*): Unit = { // clear all state states.foreach(_.clear()) if (stateCleaningEnabled) { this.cleanupTimeState.clear() } }
有个点需要注意,如果某个 Key 的状态被清理掉,如果后续再来这个 Key 的消息记录时,会被当做该 Key 的第一条记录来进行处理,聚合值也是重新开始计算。所以,请确保设置合理的空闲状态保留时间。
3. 总结
Flink SQL 虽然没有 DataStream API 那样为每个算子单独来设置状态的保留时间,不过在 Flink SQL 我们可以设置空闲状态的保留时间,具体的时间业务方根据实际情况而定。