Flink1.11.1-SQL空閑狀態保留時間實現原理


1. Flink SQL空閑狀態保留時間和參數配置

Flink SQL 空閑狀態保留時間是針對 SQL 中聚合 Key 而言的,空閑的時間也就是 Key 沒有更新的時間。如果在 Flink SQL 任務中設置了空閑狀態的保留時間,那么當狀態空閑超過一定的時間后,狀態就會被清理。

設置 Flink SQL 空閑狀態保留時間有兩個參數,狀態空閑最小保留時間和狀態空閑最大保留時間,很多人會問,為什么會設置兩個時間參數呢,設置一個參數不就好了嗎,先來看看這兩個參數的定義:

  • The minimum idle state retention time defines how long the state of an inactive key is at least kept before it is removed.(最小空閑狀態時間定義了一個 Key 的狀態至少空閑的時間)
  • The maximum idle state retention time defines how long the state of an inactive key is at most kept before it is removed.(最大空閑狀態時間定義了一個 Key 的狀態至多空閑的時間)

你可能會有一個問題,直接使用一個時間參數,但狀態到達這個時間就刪除不就行了,為什么還需要定義兩個時間參數呢,下面來結合源碼進行分析。

2. Flink SQL 空閑狀態保留時間實現原理分析

簡單的講,Flink SQL 空閑狀態保留的時間底層是基於 KeyedProcessFunction 函數來進行實現的,然后為每個 Key,結合空閑狀態時間的最小值和最大值注冊 Timer ,然后到時間就進行狀態清理。具體邏輯從 KeyedProcessFunctionWithCleanupState 這個類開始看起:

/**
 * A function that processes elements of a stream, and could cleanup state.
 * @param <K> Type of the key.
 * @param <IN>  Type of the input elements.
 * @param <OUT> Type of the output elements.
 */
public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
    extends KeyedProcessFunction<K, IN, OUT> implements CleanupState {

    private static final long serialVersionUID = 2084560869233898457L;

    private final long minRetentionTime;
    private final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;

    // holds the latest registered cleanup timer
    private ValueState<Long> cleanupTimeState;

    public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) {
        this.minRetentionTime = minRetentionTime;
        this.maxRetentionTime = maxRetentionTime;
        this.stateCleaningEnabled = minRetentionTime > 1;
    }
}

首先,這個類有一個是否能夠清理空閑狀態的標志,當空閑狀態最小保留時間大於 1 時,這個標志就為 True。

同時,針對每一個 Key ,都有一個 ValueState,記錄着這個 Key 的最新的 Timer 觸發的時間。當然,這個值會隨着這個 Key 的記錄,后續可能會進行時間更新。

下面來看一下具體的 Timer 注冊邏輯,空閑狀態的清理的 Timer 是調用其方法 registerProcessingCleanupTimer 來進行注冊,而方面中又調用了 CleanupState 中的 registerProcessingCleanupTimer 方法:

/**
 * Base interface for clean up state, both for {@link ProcessFunction} and {@link CoProcessFunction}.
 */
public interface CleanupState {

    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState,
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService) throws Exception {

        // last registered timer
        Long curCleanupTime = cleanupTimeState.value();

        // check if a cleanup timer is registered and
        // that the current cleanup timer won't delete state we need to keep
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
            // we need to register a new (later) timer
            long cleanupTime = currentTime + maxRetentionTime;
            // register timer and remember clean-up time
            timerService.registerProcessingTimeTimer(cleanupTime);
            // delete expired timer
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            cleanupTimeState.update(cleanupTime);
        }
    }
}

每次當某個 Key 有消息記錄處理時,先從狀態中取出該 Key 最新的 Timer 的觸發時間,如果為空,表示這調消息是這個 Key 的第一條記錄,那么會使用當前的時間 + 最大空閑狀態保留的時間作為 Timer 未來的觸發時間。

如果當前時間 + 狀態最小的空閑狀態保留的時間 > 上一次注冊 Timer 的觸發清理的時間,那么也重新注冊 Timer,Timer的時間也為當前的時間 + 最大空閑狀態保留的時間,同時,刪除上一次注冊的清理的 Timer。未來每來一條這個 Key 的消息記錄時,便會執行上面的邏輯。如果沒有滿足上面的邏輯,就不做任何處理。

最終,當 Timer 觸發時,會調用 State 的 clear()方法,進行狀態清理。

protected def cleanupState(states: State*): Unit = {
    // clear all state
    states.foreach(_.clear())
    if (stateCleaningEnabled) {
      this.cleanupTimeState.clear()
    }
  }

有個點需要注意,如果某個 Key 的狀態被清理掉,如果后續再來這個 Key 的消息記錄時,會被當做該 Key 的第一條記錄來進行處理,聚合值也是重新開始計算。所以,請確保設置合理的空閑狀態保留時間。

3. 總結

Flink SQL 雖然沒有 DataStream API 那樣為每個算子單獨來設置狀態的保留時間,不過在 Flink SQL 我們可以設置空閑狀態的保留時間,具體的時間業務方根據實際情況而定。

來源:https://mp.weixin.qq.com/s?src=11&timestamp=1602772486&ver=2646&signature=NoJiCCHXfp7ewiiGyfdPxcCn*u-SltN2U*GlzZ3NqAoqVWRUuTQu0UDxB4ugK4MjU660LXsdUQNp0Rb-sqx2a*ukTux5bftTtz-o0ejdJ2Co8r452*9ECXuiIqomgq8k&new=1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM