前言: Flink 窗口會將當前窗口的數據存儲在狀態中,等待窗口結束的時候觸發計算,那窗口狀態什么時候清理?
(前提: 窗口的主要邏輯是在 WindowOperator 中完成的)
翻一下 WindowOperator 的代碼,可以看到下面這個方法,“Drops all state for the given window and calls” 這個注釋,還是可以比較明確的說明這個方法的作用的。
那 WindowOperator.clearAllState 是什么時候調用的
/** * Drops all state for the given window and calls * {@link Trigger#clear(Window, Trigger.TriggerContext)}. * * <p>The caller must ensure that the * correct key is set in the state backend and the triggerContext object. */ private void clearAllState( W window, AppendingState<IN, ACC> windowState, MergingWindowSet<W> mergingWindows) throws Exception { // 清理 窗口狀態 windowState.clear(); // 清理 觸發器 triggerContext.clear(); processContext.window = window; // 清理 窗口上下文,調用 userFunction 的 clear 方法 processContext.clear(); if (mergingWindows != null) { mergingWindows.retireWindow(window); mergingWindows.persist(); }
clearAllState 是在 WindowOperator.onEventTime/onProcessingTime 方法中調用的,計算是否達到窗口的 isCleanupTime
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { clearAllState(triggerContext.window, windowState, mergingWindows); } protected final boolean isCleanupTime(W window, long time) { return time == cleanupTime(window); } private long cleanupTime(W window) { if (windowAssigner.isEventTime()) { // 加上 允許的延遲(窗口的延遲,不是 watermark 減去的那個值) long cleanupTime = window.maxTimestamp() + allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { return window.maxTimestamp(); } }
窗口 timer.getTimestamp 時間 等於窗口的最大時間(如果允許延遲: 最大時間 + allowedLateness 時間),就調用 WindowOperator.clearAllState
如果是滑動窗口,數據屬於多個窗口的,timer 或者說 onEventTime/onProcessingTime 方法是屬於所有窗口的,每次觸發的窗口是,窗口隊列中最早結束的一個
(滑動窗口的數據流進來的時候,會自動把數據放到多個窗口的狀態中去)
以下以事件時間為例說明
WindowOperator.onEventTime 上游調用的地方是 InternalTimerServiceImpl.advanceWatermark (在代碼 WindowOperator.onEventTime/onProcessingTime/processElement 打個斷點,
查看調用棧,可以看到窗口算子從網絡 buffer 中讀取 數據/watermark 的地方: StreamTaskNetworkInput.processElement )
public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }
可以看到隊列,peek 會取出第一條,timer 會對應到第一條對應的窗口
(翻滾窗口 queue 的多條數據,對應多個keyGroup 的多個 key 的窗口,每次只 peek 出第一條,timer 對象是帶了keyContext,第一步就是選擇當前的 keyContext : keyContext.setCurrentKey(timer.getKey()) )
eventTimeTimersQueue 是在 WindowOperator.processElement 中調用 EventTimeTrigger.onElement 方法,注冊定時器添加的
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public void registerEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); }
隊列的數據是在窗口處理輸入數據的過程中,調用 EventTimeTrigger.onElement 方法,使用窗口結束時間注冊了個定時(注冊的時候設置 隊列元素的 keyContext 和 namespace)
注: 使用的是 flink 實現的 KeyGroupedInternalPriorityQueue,根據 傳入時間(window.masTimestamp)可能會插入到隊列的不同位置(按 timestamp 排序, HeapPriorityQueue.siftUp/ siftDown)
@Override // 添加到隊列的末尾 protected void addInternal(@Nonnull T element) { final int newSize = increaseSizeByOne(); moveElementToIdx(element, newSize); // 自排序 siftUp(newSize); } // 當前元素位置向對頭前移 private void siftUp(int idx) { final T[] heap = this.queue; // 拿出當前插入元素 final T currentElement = heap[idx]; // idx 的一半 int parentIdx = idx >>> 1; // 折半插入: 比較當前插入元素與 隊列數組中當前元素 index 一半的元素的優先級 while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) { // moveElementToIdx(heap[parentIdx], idx); idx = parentIdx; // 再一半 parentIdx >>>= 1; } // 最后放入對應位置 moveElementToIdx(currentElement, idx); }
總結: 窗口數據輸入時注冊窗口最大時間的定時器用以觸發窗口計算(同時會注冊一個 registerCleanupTimer 用以清除窗口數據,
沒有開啟 allowedLateness 的窗口,兩個定時器時間是相同的,只保留一個),watermark 前進的時候,會從定時器隊列中取出對
頭中的 timer,查看 timer.getTimestamp(即窗口的最大時間) 小於 當前 watermark 對應的 時間時,觸發當前的 timer(從隊列中彈出,
調用 triggerTarget.onEventTime(timer)),觸發當前窗口的計算(窗口結束),同時查看當前窗口是否可以清除窗口狀態(事件時間
的窗口如果允許延遲的,窗口結束是不能清理窗口狀態的,)
清理狀態就比較簡單了,就是調用 AbstractHeapState.remove 方法,傳入對應的 namestace,從 stateMap 中 remove 對應的 key
private void remove(K key, int keyGroupIndex, N namespace) { checkKeyNamespacePreconditions(key, namespace); StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex); stateMap.remove(key, namespace); }
注: 窗口觸發計算的時候,觸發器返回 PURGE 也會刪除窗口數據
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文