1. Flink SQL空閑狀態保留時間和參數配置
Flink SQL 空閑狀態保留時間是針對 SQL 中聚合 Key 而言的,空閑的時間也就是 Key 沒有更新的時間。如果在 Flink SQL 任務中設置了空閑狀態的保留時間,那么當狀態空閑超過一定的時間后,狀態就會被清理。
設置 Flink SQL 空閑狀態保留時間有兩個參數,狀態空閑最小保留時間和狀態空閑最大保留時間,很多人會問,為什么會設置兩個時間參數呢,設置一個參數不就好了嗎,先來看看這兩個參數的定義:
- The minimum idle state retention time defines how long the state of an inactive key is at least kept before it is removed.(最小空閑狀態時間定義了一個 Key 的狀態至少空閑的時間)
- The maximum idle state retention time defines how long the state of an inactive key is at most kept before it is removed.(最大空閑狀態時間定義了一個 Key 的狀態至多空閑的時間)
你可能會有一個問題,直接使用一個時間參數,但狀態到達這個時間就刪除不就行了,為什么還需要定義兩個時間參數呢,下面來結合源碼進行分析。
2. Flink SQL 空閑狀態保留時間實現原理分析
簡單的講,Flink SQL 空閑狀態保留的時間底層是基於 KeyedProcessFunction 函數來進行實現的,然后為每個 Key,結合空閑狀態時間的最小值和最大值注冊 Timer ,然后到時間就進行狀態清理。具體邏輯從 KeyedProcessFunctionWithCleanupState 這個類開始看起:
/** * A function that processes elements of a stream, and could cleanup state. * @param <K> Type of the key. * @param <IN> Type of the input elements. * @param <OUT> Type of the output elements. */ public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> implements CleanupState { private static final long serialVersionUID = 2084560869233898457L; private final long minRetentionTime; private final long maxRetentionTime; protected final boolean stateCleaningEnabled; // holds the latest registered cleanup timer private ValueState<Long> cleanupTimeState; public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) { this.minRetentionTime = minRetentionTime; this.maxRetentionTime = maxRetentionTime; this.stateCleaningEnabled = minRetentionTime > 1; } }
首先,這個類有一個是否能夠清理空閑狀態的標志,當空閑狀態最小保留時間大於 1 時,這個標志就為 True。
同時,針對每一個 Key ,都有一個 ValueState,記錄着這個 Key 的最新的 Timer 觸發的時間。當然,這個值會隨着這個 Key 的記錄,后續可能會進行時間更新。
下面來看一下具體的 Timer 注冊邏輯,空閑狀態的清理的 Timer 是調用其方法 registerProcessingCleanupTimer 來進行注冊,而方面中又調用了 CleanupState 中的 registerProcessingCleanupTimer 方法:
/** * Base interface for clean up state, both for {@link ProcessFunction} and {@link CoProcessFunction}. */ public interface CleanupState { default void registerProcessingCleanupTimer( ValueState<Long> cleanupTimeState, long currentTime, long minRetentionTime, long maxRetentionTime, TimerService timerService) throws Exception { // last registered timer Long curCleanupTime = cleanupTimeState.value(); // check if a cleanup timer is registered and // that the current cleanup timer won't delete state we need to keep if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { // we need to register a new (later) timer long cleanupTime = currentTime + maxRetentionTime; // register timer and remember clean-up time timerService.registerProcessingTimeTimer(cleanupTime); // delete expired timer if (curCleanupTime != null) { timerService.deleteProcessingTimeTimer(curCleanupTime); } cleanupTimeState.update(cleanupTime); } } }
每次當某個 Key 有消息記錄處理時,先從狀態中取出該 Key 最新的 Timer 的觸發時間,如果為空,表示這調消息是這個 Key 的第一條記錄,那么會使用當前的時間 + 最大空閑狀態保留的時間作為 Timer 未來的觸發時間。
如果當前時間 + 狀態最小的空閑狀態保留的時間 > 上一次注冊 Timer 的觸發清理的時間,那么也重新注冊 Timer,Timer的時間也為當前的時間 + 最大空閑狀態保留的時間,同時,刪除上一次注冊的清理的 Timer。未來每來一條這個 Key 的消息記錄時,便會執行上面的邏輯。如果沒有滿足上面的邏輯,就不做任何處理。
最終,當 Timer 觸發時,會調用 State 的 clear()方法,進行狀態清理。
protected def cleanupState(states: State*): Unit = { // clear all state states.foreach(_.clear()) if (stateCleaningEnabled) { this.cleanupTimeState.clear() } }
有個點需要注意,如果某個 Key 的狀態被清理掉,如果后續再來這個 Key 的消息記錄時,會被當做該 Key 的第一條記錄來進行處理,聚合值也是重新開始計算。所以,請確保設置合理的空閑狀態保留時間。
3. 總結
Flink SQL 雖然沒有 DataStream API 那樣為每個算子單獨來設置狀態的保留時間,不過在 Flink SQL 我們可以設置空閑狀態的保留時間,具體的時間業務方根據實際情況而定。