【源碼】flink 窗口數據觸發清理流程


前言: Flink 窗口會將當前窗口的數據存儲在狀態中,等待窗口結束的時候觸發計算,那窗口狀態什么時候清理?

(前提: 窗口的主要邏輯是在 WindowOperator 中完成的)

翻一下 WindowOperator 的代碼,可以看到下面這個方法,“Drops all state for the given window and calls” 這個注釋,還是可以比較明確的說明這個方法的作用的。

那 WindowOperator.clearAllState 是什么時候調用的

/**
 * Drops all state for the given window and calls
 * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
 *
 * <p>The caller must ensure that the
 * correct key is set in the state backend and the triggerContext object.
 */
private void clearAllState(
    W window,
    AppendingState<IN, ACC> windowState,
    MergingWindowSet<W> mergingWindows) throws Exception {
  // 清理 窗口狀態
  windowState.clear();
  // 清理 觸發器
  triggerContext.clear();
  processContext.window = window;
  // 清理 窗口上下文,調用 userFunction 的 clear 方法
  processContext.clear();
  if (mergingWindows != null) {
    mergingWindows.retireWindow(window);
    mergingWindows.persist();
  }

clearAllState 是在 WindowOperator.onEventTime/onProcessingTime 方法中調用的,計算是否達到窗口的 isCleanupTime

if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
    clearAllState(triggerContext.window, windowState, mergingWindows);
}

protected final boolean isCleanupTime(W window, long time) {
  return time == cleanupTime(window);
}

private long cleanupTime(W window) {
  if (windowAssigner.isEventTime()) {
    // 加上 允許的延遲(窗口的延遲,不是 watermark 減去的那個值)
    long cleanupTime = window.maxTimestamp() + allowedLateness;
    return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
  } else {
    return window.maxTimestamp();
  }
}

窗口 timer.getTimestamp 時間 等於窗口的最大時間(如果允許延遲: 最大時間 + allowedLateness 時間),就調用 WindowOperator.clearAllState

如果是滑動窗口,數據屬於多個窗口的,timer 或者說 onEventTime/onProcessingTime 方法是屬於所有窗口的,每次觸發的窗口是,窗口隊列中最早結束的一個

(滑動窗口的數據流進來的時候,會自動把數據放到多個窗口的狀態中去)

以下以事件時間為例說明

WindowOperator.onEventTime 上游調用的地方是 InternalTimerServiceImpl.advanceWatermark (在代碼 WindowOperator.onEventTime/onProcessingTime/processElement 打個斷點,

查看調用棧,可以看到窗口算子從網絡 buffer 中讀取 數據/watermark 的地方: StreamTaskNetworkInput.processElement )

public void advanceWatermark(long time) throws Exception {
  currentWatermark = time;

  InternalTimer<K, N> timer;

  while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    eventTimeTimersQueue.poll();
    keyContext.setCurrentKey(timer.getKey());
    triggerTarget.onEventTime(timer);
  }
}

可以看到隊列,peek 會取出第一條,timer 會對應到第一條對應的窗口

(翻滾窗口 queue 的多條數據,對應多個keyGroup 的多個 key 的窗口,每次只 peek 出第一條,timer 對象是帶了keyContext,第一步就是選擇當前的 keyContext : keyContext.setCurrentKey(timer.getKey()) )

eventTimeTimersQueue 是在 WindowOperator.processElement 中調用 EventTimeTrigger.onElement 方法,注冊定時器添加的

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    // if the watermark is already past the window fire immediately
    return TriggerResult.FIRE;
  } else {
    ctx.registerEventTimeTimer(window.maxTimestamp());
    return TriggerResult.CONTINUE;
  }
}

@Override
public void registerEventTimeTimer(N namespace, long time) {
  eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

隊列的數據是在窗口處理輸入數據的過程中,調用 EventTimeTrigger.onElement 方法,使用窗口結束時間注冊了個定時(注冊的時候設置 隊列元素的 keyContext 和 namespace)

注: 使用的是 flink 實現的 KeyGroupedInternalPriorityQueue,根據 傳入時間(window.masTimestamp)可能會插入到隊列的不同位置(按 timestamp 排序, HeapPriorityQueue.siftUp/ siftDown)

@Override
// 添加到隊列的末尾
protected void addInternal(@Nonnull T element) {
  final int newSize = increaseSizeByOne();
  moveElementToIdx(element, newSize);
  // 自排序
  siftUp(newSize);
}

// 當前元素位置向對頭前移
private void siftUp(int idx) {
  final T[] heap = this.queue;
  // 拿出當前插入元素
  final T currentElement = heap[idx];
  // idx 的一半
  int parentIdx = idx >>> 1;
  // 折半插入: 比較當前插入元素與 隊列數組中當前元素 index 一半的元素的優先級
  while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
    // 
    moveElementToIdx(heap[parentIdx], idx);
    idx = parentIdx;
    // 再一半
    parentIdx >>>= 1;
  }
  // 最后放入對應位置
  moveElementToIdx(currentElement, idx);
}

總結: 窗口數據輸入時注冊窗口最大時間的定時器用以觸發窗口計算(同時會注冊一個 registerCleanupTimer 用以清除窗口數據,

沒有開啟 allowedLateness 的窗口,兩個定時器時間是相同的,只保留一個),watermark 前進的時候,會從定時器隊列中取出對

頭中的 timer,查看 timer.getTimestamp(即窗口的最大時間) 小於 當前 watermark 對應的 時間時,觸發當前的 timer(從隊列中彈出,

調用 triggerTarget.onEventTime(timer)),觸發當前窗口的計算(窗口結束),同時查看當前窗口是否可以清除窗口狀態(事件時間

的窗口如果允許延遲的,窗口結束是不能清理窗口狀態的,)

清理狀態就比較簡單了,就是調用 AbstractHeapState.remove 方法,傳入對應的 namestace,從 stateMap 中 remove 對應的 key

private void remove(K key, int keyGroupIndex, N namespace) {
  checkKeyNamespacePreconditions(key, namespace);

  StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
  stateMap.remove(key, namespace);
}

注: 窗口觸發計算的時候,觸發器返回 PURGE 也會刪除窗口數據

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM