Flink之對時間的處理


window+trigger+watermark處理全局亂序數據,指定窗口上的allowedLateness可以處理特定窗口操作的局部事件時間亂序數據

1、流處理系統中的微批

Flink內部也使用了某種形式的微批處理技術,在shuffle階段將含有多個事件的緩沖容器通過網絡發送,而不是發送單個事件
流處理系統中的批處理必須滿足以下兩點要求:
  • 批處理只作為提高系統性能的機制。批量越大,系統的吞吐量就越大。
  • 為了提高性能而使用的批處理必須完全獨立於定義窗口時所用的緩沖,或者為了保證容錯性而提交的代碼,也不能作為 API 的一部分。否則,系統將受到限制,並且變得脆弱且難以使用。

2、時間概念

  • 事件時間,即事件實際發生的時間(由水印觸發器實現),基於事件時間處理可實現時間回溯並正確地重新處理數據
  • 處理時間,即事件被處理的時間,是處理事件的機器所測量的時間
  • 攝取時間,即事件進入流處理框架的時間,缺乏事件時間的數據會被處理器附上攝取時間(由source函數完成)

3、窗口

所有內置窗口都由同一種機制實現,開窗機制與檢查點機制完全分離;可直接用基本的開窗機制定義更復雜的窗口(如某種時間窗口,可基於元素計數生成中間結果)
窗口時間區間是按自然時間分配的,比如3秒的時間間隔,[0,3) [3,6)

(1)時間窗口(每隔B時長對A時長內數據聚合)

  • 設置事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  • 設置處理時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  • 設置攝取時間 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
  • 滾動窗口A stream.timeWindow(Time.minutes(1)) stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
  • 滑動窗口B stream.timeWindow(Time.minutes(1), Time.seconds(30)) stream.window(TumblingEventTimeWindows.of(Time.seconds(1)), SlidingEventTimeWindows.of(Time.seconds(30)))

(2)計數窗口(每隔B個元素對A個元素進行聚合)

為避免永遠達不到計數窗口而浪費內存,可用時間窗口觸發超時
  • 滾動窗口A stream.countWindow(4)
  • 滑動窗口B stream.countWindow(4, 2)

(3)會話窗口(會話即活動階段,其前后都是非活躍階段,常用於無固定持續時間或無固定交互次數的場景)

由超時時間設定,即希望非活躍狀態持續多久才結束窗口。window區間:當b比上一條記錄a延遲超過超時時間t時,出現會話窗口[上一個window_end, b-t)
  • 事件時間會話窗口 stream.window(EventTimeSessionWindows.withGap(Time.minutes(5))
  • 處理時間會話窗口 stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5))
處理延遲數據
  • allowedLateness(Time.minutes(60))
縮短反饋時間(若用戶會話遲遲不結束,反饋時間過長)
  • trigger(ContinuousEventTrigger.of(Time.minutes(10)) #每10分鍾輸出一個結果並覆蓋之前的

(4)全局窗口(對全部數據進行統計,使用流方法實現批處理)

內置觸發器是NeverTrigger,永遠不會觸發,需要自定義觸發器才有意義
stream.window(GlobalWindows.create()).trigger(...)

4、觸發器

繼承Trigger類
Trigger抽象類的結構:
boolean canMerge()
void clear(W window, TriggerContext ctx)
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) 每個元素到來時執行
TriggerResult onEventTime(long time, W window, TriggerContext ctx) Timer到期后執行
void onMerge(W window, OnMergeContext ctx)
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)

5、水印

水印的語義是認為比水印早的消息都已消費
窗口 + 水印,用於解決亂序問題(並不是解決,而是假定所有正常的事件都只是一定程度內亂序,可以解決此程度內的亂序)

(1)基於事件時間處理時的水印處理邏輯

水印是判斷所有事件到達的標志,開始計算和輸出結果,晚於處理時間但早於此水印時間的事件也可被正確處理。
水印定義最長遲到數據(比當前watermark還早的數據會被丟棄,水印閾值越大,允許的遲到數據越久)
watermark的值不是全局的,但與key無關,有幾個並行,就有幾個watermark,window的觸發條件與最小的watermark有關
水印時間 = 收到的最大事件時間 - 水印閾值
一個操作算子收到多個並行流的輸入時,取最小的watermark作為當前算子的watermark

(2)異常情況

如果水印遲到得太久(可能是maxOrderness設置太大,也可能是后序事件過晚到達),收到結果的速度會變慢,解決方法是在水印到達之前輸出近似結果,其實就是后面設置Lateness的方案;如果水印到達得太早(可能是maxOrderness設置太小,也可能是后序事件過早到達),則可能丟失一些前序事件,收到錯誤結果,解決方法是采用Flink作業監控事件流,學習事件的遲到規律,以此構建水印模型

(3)分配Timestamp和Watermark

timestamp和watermark都是通過從1970年1月1日0時0分0秒到現在的毫秒數來指定的
先后順序:分配timstamp是按設置的時間間隔定時執行的,即使無數據進來也會執行,這就造成了getCurrentWatermark調用后看上去第一個watermark永遠是以0為基准計算顯示的 ,但實際並不是按那個算的。第2條的watermark如果是23的話,是不大於window_end 24的,也就不應該觸發,而如果是下一條的24則可以觸發。AssignerWithPeriodicWatermarks子類是每隔一段時間執行的,這個具體由ExecutionConfig.setAutoWatermarkInterval設置,如果沒有設置會幾乎沒有間隔地調用getCurrentWatermark方法。之所以會出現-10000時因為你沒有數據進入窗口,當然一直都是-10000,但是getCurrentWatermark方法不是在執行extractTimestamp后才執行的
直接在數據源生成(推薦,數據生成時即分配timestamp和watermark)
實現SourceFuntion接口的run方法,並調用如下方法:
  • 分配timestamp:SourceContext.collectWithTimestamp(...)
  • 分配watermark:SourceContext.emitWatermark(new WaterMark(...))
獲取流后使用生成器生成新流(使用此種方式,會覆蓋源提供的timestamp和watermark,注意一定要在時間窗口之前生成)
stram.assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks 實現類對象)
定義分配器
AssignerWithPeriodicWatermarks(周期性水印,分配時間戳並定期生成水印)
watermark產生的事件間隔(每n毫秒)是通過ExecutionConfig.setAutoWatermarkInterval(...)來定義的,當getCurrentWatermark()被調用時,若返回的watermark非空且大於上一個watermark,則發射一個新的watermark
  • 預定義實現類(使用時重寫extractTimestamp):
    • AscendingTimestampExtractor 適用於時間戳遞增的情況
    • BoundedOutOfOrdernessTimestampExtractor 適用於亂序但最大延遲已知的情況
  • 自定義實現類(使用時重寫getCurrentWatermark、extractTimestamp)
AssignerWithPunctuatedWatermarks(帶斷點水印)
事件驅動生成水印,每個單獨的event都可以產生一個watermark,會有額外計算,過多可能導致性能降低。任何一個event都觸發extractTimestamp(...)來為元素分配一個timestamp,然后立即調用該元素上的checkAndGetNextWatermark(...)方法,一旦checkAndGetNextWatermark(...)返回一個非空的watermark並且watermark比前一個watermark大的話,這個新的watermark將會被發送

(4)設定水印后觸發window的條件:

  • watermark >= window_end(開啟多並發后,每個算子接收到的watermark都會進行對齊,取最小的watermark作為最終的watermark並往下一個算子發送)
  • 在[window_begin, window_end)中有數據存在

(5)不足之處

無法應對遲到數據,如果一個窗口已經被觸發了,即使滿足上述條件也不會第二次觸發窗口。水印被發射到下一個算子前已默認比水印更早的數據已經全部處理了

6、allowedLateness

主要用於解決遲到問題,給遲到數據第二次或多次觸發window的機會,可對無法觸發window的遲到數據單獨處理
默認情況下,watermark超過end-of-window后,將忽略之后到達的符合window的數據
在Watermark < 窗口結束時間 + Lateness時,仍會繼續等待窗口內的元素參與窗口計算,計算時要注意狀態值的重復,直到Watermark >= 窗口結束時間 + Lateness 時清空緩存
要注意再次觸發窗口時,UDF中的狀態值的處理,要考慮state在計算時的去重問題

(1)

  • 對於trigger是默認的EventTimeTrigger的情況,allowedLateness會再次觸發窗口的計算,而之前觸發的數據,會buffer起來,直到watermark超過end-of-window + allowedLateness的時間,窗口的數據及元數據信息才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。
  • 對於sessionWindow情況,當late element在allowedLateness范圍之內到達時,可能會引起窗口的merge,這樣,之前窗口的數據會在新窗口中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。

(2)觸發條件

  • watermark < window_end + allowedLateness
  • 在[window_begin, window_end)中有late數據存在

7、定時器Timer

Flink Streaming API提供的用於感知並利用處理時間/事件時間變化的機制
Timer會由Flink按key+timestamp自動去重的,也就是說如果你的key有N個,並且注冊的timestamp相同的話,那么實際只會注冊N個Timer

(1)在KeyedProcessFunction實現類里定義定時器:

重寫processElement(),對每個輸入元素注冊定時器,但會自動去重
重寫onTimer(),定時器觸發時執行的邏輯
根據時間特征的不同,具體如下:
處理時間——調用Context.timerService().registerProcessingTimeTimer()注冊;onTimer()在系統時間戳達到Timer設定的時間戳時觸發。
事件時間——調用Context.timerService().registerEventTimeTimer()注冊;onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發。

(2)EventTimeTrigger使用Timer實現觸發時間窗口

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; }
   else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; }
}

(3)接口

TimerService接口是用來處理時間和定時器的,根據time確定一個唯一的定時器
InternalTimerService接口是TimerService接口的內部版本,根據time和namespace來確定一個唯一的定時器。有實現類InternalTimerServiceImpl
currentProcessingTime 獲取當前處理時間
currentWatermark	獲取當前水印
deleteEventTimeTimer	刪除某time和namespace對應的定時器
deleteProcessingTimeTimer	刪除某time和namespace對應的定時器
void registerEventTimeTimer(N namespace, long time) 水印達到給定time時注冊特定的定時器
registerProcessingTimeTimer	處理時間達到給定time時注冊特定的定時器
InternalTimerServiceImpl有兩個關鍵字段processingTimeTimersQueue和eventTimeTimersQueue,分別存儲in-flight中的處理定時器、事件定時器,注冊Timer實際上就是為它們賦予對應的時間戳、key和命名空間,並將它們加入對應的優先隊列,這兩個優先隊列是按Timer時間戳為關鍵字排序的最小堆,。繼承HeapPriorityQueueSet實現優先級隊列功能,在Java自帶的PriorityQueue基礎上加入按key去重邏輯。
注冊:KeyGroup是Flink內部KeyedState的原子單位,亦即一些key的組合。一個Flink App的KeyGroup數量與最大並行度相同,將key分配到KeyGroup的操作則是經典的取hashCode+取模。而KeyGroupRange則是一些連續KeyGroup的范圍,每個Flink sub-task都只包含一個KeyGroupRange。以插入一個Timer的流程為例:
  • 從Timer中取出key,計算該key屬於哪一個KeyGroup;
  • 計算出該KeyGroup在整個KeyGroupRange中的偏移量,按該偏移量定位到HashMap<T, T>[]數組的下標;
  • 根據putIfAbsent()方法的語義,只有當對應HashMap不存在該Timer的key時,才將Timer插入最小堆中,做到了KeyGroup級別的key去重。
觸發:如果是處理定時器,按順序從隊列中獲取到比時間戳time小的所有Timer,並挨個執行Triggerable.onProcessingTime()方法,也就是在上文KeyedProcessOperator的同名方法,用戶自定義的onTimer()邏輯也就被執行了。如果是事件定時器,當水印到來時會觸發所有早於水印時間戳的Timer
TimerHeapInternalTimer最終實現了InternalTimerServiceImpl里隊列域變量的元素,根據time、namespace、key確定一個唯一的定時器,在java堆存定時器,每個timer都是優先級隊列里的一個元素,都用timerHeapIndex維護其在優先級隊列里的下標,方便快速刪除
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM