Flink1.11.1-SQL空闲状态保留时间实现原理


1. Flink SQL空闲状态保留时间和参数配置

Flink SQL 空闲状态保留时间是针对 SQL 中聚合 Key 而言的,空闲的时间也就是 Key 没有更新的时间。如果在 Flink SQL 任务中设置了空闲状态的保留时间,那么当状态空闲超过一定的时间后,状态就会被清理。

设置 Flink SQL 空闲状态保留时间有两个参数,状态空闲最小保留时间和状态空闲最大保留时间,很多人会问,为什么会设置两个时间参数呢,设置一个参数不就好了吗,先来看看这两个参数的定义:

  • The minimum idle state retention time defines how long the state of an inactive key is at least kept before it is removed.(最小空闲状态时间定义了一个 Key 的状态至少空闲的时间)
  • The maximum idle state retention time defines how long the state of an inactive key is at most kept before it is removed.(最大空闲状态时间定义了一个 Key 的状态至多空闲的时间)

你可能会有一个问题,直接使用一个时间参数,但状态到达这个时间就删除不就行了,为什么还需要定义两个时间参数呢,下面来结合源码进行分析。

2. Flink SQL 空闲状态保留时间实现原理分析

简单的讲,Flink SQL 空闲状态保留的时间底层是基于 KeyedProcessFunction 函数来进行实现的,然后为每个 Key,结合空闲状态时间的最小值和最大值注册 Timer ,然后到时间就进行状态清理。具体逻辑从 KeyedProcessFunctionWithCleanupState 这个类开始看起:

/**
 * A function that processes elements of a stream, and could cleanup state.
 * @param <K> Type of the key.
 * @param <IN>  Type of the input elements.
 * @param <OUT> Type of the output elements.
 */
public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
    extends KeyedProcessFunction<K, IN, OUT> implements CleanupState {

    private static final long serialVersionUID = 2084560869233898457L;

    private final long minRetentionTime;
    private final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;

    // holds the latest registered cleanup timer
    private ValueState<Long> cleanupTimeState;

    public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) {
        this.minRetentionTime = minRetentionTime;
        this.maxRetentionTime = maxRetentionTime;
        this.stateCleaningEnabled = minRetentionTime > 1;
    }
}

首先,这个类有一个是否能够清理空闲状态的标志,当空闲状态最小保留时间大于 1 时,这个标志就为 True。

同时,针对每一个 Key ,都有一个 ValueState,记录着这个 Key 的最新的 Timer 触发的时间。当然,这个值会随着这个 Key 的记录,后续可能会进行时间更新。

下面来看一下具体的 Timer 注册逻辑,空闲状态的清理的 Timer 是调用其方法 registerProcessingCleanupTimer 来进行注册,而方面中又调用了 CleanupState 中的 registerProcessingCleanupTimer 方法:

/**
 * Base interface for clean up state, both for {@link ProcessFunction} and {@link CoProcessFunction}.
 */
public interface CleanupState {

    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState,
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService) throws Exception {

        // last registered timer
        Long curCleanupTime = cleanupTimeState.value();

        // check if a cleanup timer is registered and
        // that the current cleanup timer won't delete state we need to keep
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
            // we need to register a new (later) timer
            long cleanupTime = currentTime + maxRetentionTime;
            // register timer and remember clean-up time
            timerService.registerProcessingTimeTimer(cleanupTime);
            // delete expired timer
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            cleanupTimeState.update(cleanupTime);
        }
    }
}

每次当某个 Key 有消息记录处理时,先从状态中取出该 Key 最新的 Timer 的触发时间,如果为空,表示这调消息是这个 Key 的第一条记录,那么会使用当前的时间 + 最大空闲状态保留的时间作为 Timer 未来的触发时间。

如果当前时间 + 状态最小的空闲状态保留的时间 > 上一次注册 Timer 的触发清理的时间,那么也重新注册 Timer,Timer的时间也为当前的时间 + 最大空闲状态保留的时间,同时,删除上一次注册的清理的 Timer。未来每来一条这个 Key 的消息记录时,便会执行上面的逻辑。如果没有满足上面的逻辑,就不做任何处理。

最终,当 Timer 触发时,会调用 State 的 clear()方法,进行状态清理。

protected def cleanupState(states: State*): Unit = {
    // clear all state
    states.foreach(_.clear())
    if (stateCleaningEnabled) {
      this.cleanupTimeState.clear()
    }
  }

有个点需要注意,如果某个 Key 的状态被清理掉,如果后续再来这个 Key 的消息记录时,会被当做该 Key 的第一条记录来进行处理,聚合值也是重新开始计算。所以,请确保设置合理的空闲状态保留时间。

3. 总结

Flink SQL 虽然没有 DataStream API 那样为每个算子单独来设置状态的保留时间,不过在 Flink SQL 我们可以设置空闲状态的保留时间,具体的时间业务方根据实际情况而定。

来源:https://mp.weixin.qq.com/s?src=11&timestamp=1602772486&ver=2646&signature=NoJiCCHXfp7ewiiGyfdPxcCn*u-SltN2U*GlzZ3NqAoqVWRUuTQu0UDxB4ugK4MjU660LXsdUQNp0Rb-sqx2a*ukTux5bftTtz-o0ejdJ2Co8r452*9ECXuiIqomgq8k&new=1

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM