窗口
在流處理應用中,數據是連續不斷的,因此我們不可能等到所有數據都到了才開始處理。當然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鍾內有多少用戶點擊了我們的網頁。在這種情況下,我們必須定義一個窗口,用來收集最近一分鍾內的數據,並對這個窗口內的數據進行計算。
Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個橋梁。
- 一個Window代表有限對象的集合。一個窗口有一個最大的時間戳,該時間戳意味着在其代表的某時間點——所有應該進入這個窗口的元素都已經到達
- Window就是用來對一個無限的流設置一個有限的集合,在有界的數據集上進行操作的一種機制。window又可以分為基於時間(Time-based)的window以及基於數量(Count-based)的window。
- Flink DataStream API提供了Time和Count的window,同時增加了基於Session的window。同時,由於某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。
窗口的組成
窗口分配器
-
assignWindows將某個帶有時間戳timestamp的元素element分配給一個或多個窗口,並返回窗口集合
-
getDefaultTrigger 返回跟WindowAssigner關聯的默認觸發器
-
getWindowSerializer返回WindowAssigner分配的窗口的序列化器
-
窗口分配器定義如何將數據元分配給窗口。這是通過WindowAssigner 在window(...)(對於被Keys化流)或windowAll()(對於非被Keys化流)調用中指定您的選擇來完成的。
-
WindowAssigner負責將每個傳入數據元分配給一個或多個窗口。Flink帶有預定義的窗口分配器,用於最常見的用例
即翻滾窗口, 滑動窗口,會話窗口和全局窗口。 -
您還可以通過擴展WindowAssigner類來實現自定義窗口分配器。
-
所有內置窗口分配器(全局窗口除外)都根據時間為窗口分配數據元,這可以是處理時間或事件時間。
State
- 狀態,用來存儲窗口內的元素,如果有 AggregateFunction,則存儲的是增量聚合的中間結果。
窗口函數
選擇合適的計算函數,減少開發代碼量提高系統性能
增量聚合函數(窗口只維護狀態)
- ReduceFunction
- AggregateFunction
- FoldFunction
全量聚合函數(窗口維護窗口內的數據)
- ProcessWindowFunction
- 全量計算
- 支持功能更加靈活
- 支持狀態操作
觸發器
-
EventTimeTrigger基於事件時間的觸發器,對應onEventTime
-
ProcessingTimeTrigger
基於當前系統時間的觸發器,對應onProcessingTime
ProcessingTime 有最好的性能和最低的延遲。但在分布式計算環境中ProcessingTime具有不確定性,相同數據流多次運行有可能產生不同的計算結果。 -
ContinuousEventTimeTrigger
-
ContinuousProcessingTimeTrigger
-
CountTrigger
- Trigger確定何時窗口函數准備好處理窗口(由窗口分配器形成)。每個都有默認值。
如果默認觸發器不符合您的需要,您可以使用指定自定義觸發器。WindowAssignerTriggertrigger(...) - 觸發器界面有五種方法可以Trigger對不同的事件做出反應:
- onElement()為添加到窗口的每個數據元調用該方法。
- onEventTime()在注冊的事件時間計時器觸發時調用該方法。
- onProcessingTime()在注冊的處理時間計時器觸發時調用該方法。
- 該onMerge()方法與狀態觸發器相關,並且當它們的相應窗口合並時合並兩個觸發器的狀態,例如當使用會話窗口時。
- 最后,該clear()方法在移除相應窗口時執行所需的任何動作。
- 默認觸發器
- 默認觸發器GlobalWindow是NeverTrigger從不觸發的。因此,在使用時必須定義自定義觸發器GlobalWindow。
- 通過使用trigger()您指定觸發器會覆蓋a的默認觸發器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據時間進度獲取窗口,
而是僅按計數。現在,如果你想根據時間和數量做出反應,你必須編寫自己的自定義觸發器。 - event-time窗口分配器都有一個EventTimeTrigger作為默認觸發器。該觸發器在watermark通過窗口末尾時出發。
- Trigger確定何時窗口函數准備好處理窗口(由窗口分配器形成)。每個都有默認值。
觸發器分類
CountTrigger
一旦窗口中的數據元數量超過給定限制,就會觸發。所以其觸發機制實現在onElement中
ProcessingTimeTrigger
基於處理時間的觸發。
EventTimeTrigger
根據 watermarks 度量的事件時間進度進行觸發。
PurgingTrigger
-
另一個觸發器作為參數作為參數並將其轉換為清除觸發器。
-
其作用是在 Trigger 觸發窗口計算之后將窗口的 State 中的數據清除。
-
前兩條數據先后於20:01和20:02進入窗口,此時 State 中的值更新為3,同時到了Trigger的觸發時間,輸出結果為3。
-
由於 PurgingTrigger 的作用,State 中的數據會被清除。
DeltaTrigger
DeltaTrigger 的應用
- 有這樣一個車輛區間測試的需求,車輛每分鍾上報當前位置與車速,每行進10公里,計算區間內最高車速。
觸發器原型
- onElement
- onProcessingTime
- onEventTime
- onMerge
- clear
說明
- TriggerResult可以是以下之一
- CONTINUE 什么都不做
- FIRE_AND_PURGE 觸發計算,然后清除窗口中的元素
- FIRE 觸發計算 默認情況下,內置的觸發器只返回 FIRE,不會清除窗口狀態。
- PURGE 清除窗口中的元素
- 所有的事件時間窗口分配器都有一個 EventTimeTrigger 作為默認觸發器。一旦 watermark 到達窗口末尾,這個觸發器就會被觸發。
- 全局窗口(GlobalWindow)的默認觸發器是永不會被觸發的 NeverTrigger。因此,在使用全局窗口時,必須自定義一個觸發器。
- 通過使用 trigger() 方法指定觸發器,將會覆蓋窗口分配器的默認觸發器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger,
那么不會再根據時間進度觸發窗口,而只能通過計數。目前為止,如果你希望基於時間以及計數進行觸發,則必須編寫自己的自定義觸發器。
窗口的分類
- 根據窗口是否調用keyBy算子key化,分為被Keys化Windows和非被Keys化Windows;
- 根據窗口的驅動方式,分為時間驅動(Time Window)、數據驅動(Count Window);
- 根據窗口的元素分配方式,分為滾動窗口(tumbling windows)、滑動窗口(sliding windows)、會話窗口(session windows)以及全局窗口(global windows)
被Keys化Windows
可以理解為按照原始數據流中的某個key進行分類,擁有同一個key值的數據流將為進入同一個window,多個窗口並行的邏輯流
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
非被Keys化Windows
-
不做分類,每進入一條數據即增加一個窗口,多個窗口並行,每個窗口處理1條數據
-
WindowAll 將元素按照某種特性聚集在一起,該函數不支持並行操作,默認的並行度就是1,所以如果使用這個算子的話需要注意一下性能問題
stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
區別
- 對於被Key化的數據流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細信息)。
- 擁有被Key化的數據流將允許您的窗口計算由多個任務並行執行,因為每個邏輯被Key化的數據流可以獨立於其余任務進行處理。
引用相同Keys的所有數據元將被發送到同一個並行任務。
Time-Based window(基於時間的窗口)
每一條記錄來了以后會根據時間屬性值采用不同的window assinger 方法分配給一個或者多個窗口,分為滾動窗口(Tumbling windows)和滑動窗口(Sliding windows)。
-
EventTime 數據本身攜帶的時間,默認的時間屬性;
-
ProcessingTime 處理時間;
-
IngestionTime 數據進入flink程序的時間;
Tumbling windows(滾動窗口)
滾動窗口下窗口之間不重疊,且窗口長度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows創建一個基於Event Time或Processing Time的滾動時間窗口。
下面示例以滾動時間窗口(TumblingEventTimeWindows
)為例,默認模式是TimeCharacteristic.ProcessingTime
處理時間
/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
所以如果使用Event Time
即數據的實際產生時間,需要通過senv.setStreamTimeCharacteristic
指定
// 指定使用數據的實際時間
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 這里減去8小時,表示用UTC世界時間
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Sliding windows(滑動窗口)
滑動窗口以一個步長(Slide)不斷向前滑動,窗口的長度固定。使用時,我們要設置Slide和Size。Slide的大小決定了Flink以多大的頻率來創建新的窗口,Slide較小,窗口的個數會很多。Slide小於窗口的Size時,相鄰窗口會重疊,一個事件會被分配到多個窗口;Slide大於Size,有些事件可能被丟掉。
同理,如果是滑動時間窗口,也是類似的:
// 窗口的大小是10s,每5s滑動一次,也就是5s計算一次
.timeWindow(Time.seconds(10), Time.seconds(5))
這里使用的是timeWindow
,通常使用window
,那么兩者的區別是什么呢?
timeWindow
其實判斷時間的處理模式是ProcessingTime
還是SlidingEventTimeWindows
,幫我們判斷好了,調用方法直接傳入(Time size, Time slide)
這兩個參數就好了,如果是使用.window
方法,則需要自己來判斷,就是前者寫法更簡單一些。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
Count-Based window (基於計數的窗口)
Count Window 是根據元素個數對數據流進行分組的,也分滾動(tumb)和滑動(slide)。
Tumbling Count Window
當我們想要每100個用戶購買行為事件統計購買總數,那么每當窗口中填滿100個元素了,就會對窗口進行計算,這種窗口我們稱之為翻滾計數窗口(Tumbling Count Window),上圖所示窗口大小為3個。通過使用 DataStream API,我們可以這樣實現:
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the buyCnt sum
.sum(1)
Sliding Count Window
當然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和,代碼示例如下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
會話(session)窗口
- SessionWindow中的Gap是一個非常重要的概念,它指的是session之間的間隔。
- 如果session之間的間隔大於指定的間隔,數據將會被划分到不同的session中。比如,設定5秒的間隔,0-5屬於一個session,5-10屬於另一個session
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
Global Windows(全局窗口)
總結
SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows
-
基於時間的滑動窗口
- SlidingEventTimeWindows
- SlidingProcessingTimeWindows
-
基於時間的翻滾窗口
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
-
基於計數的滑動窗口
- countWindow(100, 10)
-
基於計數的翻滾窗口
- countWindow(100)
-
會話窗口
會話窗口:一條記錄一個窗口- ProcessingTimeSessionWindows
- EventTimeSessionWindows
-
全局窗口(GlobalWindows)
- GlobalWindow是一個全局窗口,被實現為單例模式。其maxTimestamp被設置為Long.MAX_VALUE。
- 該類內部有一個靜態類定義了GlobalWindow的序列化器:Serializer。
延遲
默認情況下,當水印超過窗口末尾時,會刪除延遲數據元。
但是,Flink允許為窗口 算子指定最大允許延遲。允許延遲指定數據元在被刪除之前可以延遲多少時間,並且其默認值為0.
在水印通過窗口結束之后但在通過窗口結束加上允許的延遲之前到達的數據元,仍然添加到窗口中。
根據使用的觸發器,延遲但未丟棄的數據元可能會導致窗口再次觸發。就是這種情況EventTimeTrigger。
當指定允許的延遲大於0時,在水印通過窗口結束后保持窗口及其內容。在這些情況下,當遲到但未掉落的數據元到達時,它可能觸發窗口的另一次觸發。
這些射擊被稱為late firings,因為它們是由遲到事件觸發的,與之相反的main firing 是窗口的第一次射擊。在會話窗口的情況下,后期點火可以進一步導致窗口的合並,因為它們可以“橋接”兩個預先存在的未合並窗口之間的間隙。
后期觸發發出的數據元應該被視為先前計算的更新結果,即,您的數據流將包含同一計算的多個結果。根據您的應用程序,您需要考慮這些重復的結果或對其進行重復數據刪除。
窗口的使用
- Flink為每個窗口創建一個每個數據元的副本。鑒於此,翻滾窗口保存每個數據元的一個副本(一個數據元恰好屬於一個窗口,除非它被延遲)
動窗口會每個數據元創建幾個復本,如“ 窗口分配器”部分中所述。因此,尺寸為1天且滑動1秒的滑動窗口可能不是一個好主意。 - ReduceFunction,AggregateFunction並且FoldFunction可以顯着降低存儲要求,因為它們急切地聚合數據元並且每個窗口只存儲一個值。
相反,僅使用 ProcessWindowFunction需要累積所有數據元。
Evictor
- 它剔除元素的時機是:在觸發器觸發之后,在窗口被處理(apply windowFunction)之前
- Flink 的窗口模型允許在窗口分配器和觸發器之外指定一個可選的驅逐器(Evictor)。可以使用 evictor(...) 方法來完成。
驅逐器能夠在觸發器觸發之后,以及在應用窗口函數之前或之后從窗口中移除元素 - 默認情況下,所有內置的驅逐器在窗口函數之前使用
- 指定驅逐器可以避免預聚合(pre-aggregation),因為窗口內所有元素必須在應用計算之前傳遞給驅逐器。
- Flink不保證窗口內元素的順序。這意味着雖然驅逐者可以從窗口的開頭移除元素,但這些元素不一定是先到的還是后到的。
內置的Evitor
- TimeEvitor
- 以毫秒為單位的時間間隔作為參數,對於給定的窗口,找到元素中的最大的時間戳max_ts,並刪除時間戳小於max_ts - interval的所有元素。
- 本質上是將罪行的元素選出來
- CountEvitor
- 保持窗口內元素數量符合用戶指定數量,如果多於用戶指定的數量,從窗口緩沖區的開頭丟棄剩余的元素。
- DeltaEvitor
- 使用 DeltaFunction和 一個閾值,計算窗口緩沖區中的最后一個元素與其余每個元素之間的 delta 值,並刪除 delta 值大於或等於閾值的元素。
- 通過定義的DeltaFunction 和 Threshold ,計算窗口中元素和最新元素的 Delta 值,將Delta 值超過 Threshold的元素刪除
watermark
- watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性。
- watermark Apache Flink為了處理EventTime 窗口計算提出的一種機制,本質上也是一種時間戳,
由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統Event,
與普通數據流Event一樣流轉到對應的下游算子,接收到Watermark Event的算子以此不斷調整自己管理的EventTime clock。
算子接收到一個Watermark時候,框架知道不會再有任何小於該Watermark的時間戳的數據元素到來了,所以Watermark可以看做是告訴Apache Flink框架數據流已經處理到什么位置(時間維度)的方式。 - 通常基於Event Time的數據,自身都包含一個timestamp.watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。
- waterMark 的觸發時間機制(waterMark >= window_end_time)
- 當第一次觸發之后,以后所有到達的該窗口的數據(遲到數據)都會觸發該窗口
- 定義允許延遲,所以 waterMark<window_end_time+allowedLateness 的這段時間內,有數據落入窗口也會觸發計算,當
waterMark>=window_end_time+allowedLateness 是窗口被關閉,數據被丟棄 - 對於out-of-order的數據,Flink可以通過watermark機制結合window的操作,來處理一定范圍內的亂序數據,(新進來的數據)晚於前面進來的數據,但是該數據所在窗口沒有被觸發,
這個時候數據還是有效的——EventTime<WaterMark 的 - 對於out-of-order的數據,延遲太多
- 注意,如果不定義允許最大遲到時間,並且在有很多數據遲到的情況下,會嚴重影響正確結果,只要Event Time < watermark時間就會觸發窗口,也就是說遲到的每一條數據都會觸發
該窗口
產生方式
- Punctuated
- 數據流中每一個遞增的EventTime都會產生一個Watermark(其實是根據某個計算條件來做判斷)。
- 在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
- 每個事件都會攜帶事件,可以根據該時間產生一個watermark 或者可以根據事件攜帶的其他標志——業務的結束標志
- Periodic - 周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。
在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。
背景
- 流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的
- 但是也不排除由於網絡、背壓等原因,導致亂序的產生(out-of-order或者說late element)。
- 對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了
- 它表示當達到watermark到達之后,在watermark之前的數據已經全部達到(即使后面還有延遲的數據
解決的問題
- Watermark的時間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時間戳不等於Event中的EventTime,
Event中的EventTime自產生那一刻起就不可以改變了,不受Apache Flink框架控制,
而Watermark的產生是在Apache Flink的Source節點或實現的Watermark生成器計算產生(如上Apache Flink內置的 Periodic Watermark實現),
Apache Flink內部對單流或多流的場景有統一的Watermark處理。 - 默認情況下小於watermark 時間戳的event 會被丟棄嗎
多流waterMark
- 在實際的流計算中往往一個job中會處理多個Source的數據,對Source的數據進行GroupBy分組,那么來自不同Source的相同key值會shuffle到同一個處理節點,
並攜帶各自的Watermark,Apache Flink內部要保證Watermark要保持單調遞增,多個Source的Watermark匯聚到一起時候可能不是單調自增的 - Apache Flink內部實現每一個邊上只能有一個遞增的Watermark, 當出現多流攜帶Eventtime匯聚到一起(GroupBy or Union)時候,
Apache Flink會選擇所有流入的Eventtime中最小的一個向下游流出。從而保證watermark的單調遞增和保證數據的完整性
理解
- 默認情況下watermark 已經觸發過得窗口,即使有新數據(遲到)落進去不會被計算 ,遲到的意思
watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即數據屬於這個窗口)
- 允許遲到
watermark>=window_n_end_time && watermark<window_n_end_time+lateness && window_n_start_time<=vent_time<window_n_end_time
在 watermark 大於窗口結束時間不超過特定延遲范圍時,落在此窗口內的數據是有效的,可以觸發窗口。
窗口聚合
- 增量聚合
- 窗口內來一條數據就計算一次
- 全量聚合
- 一次計算整個窗口里的所有元素(可以進行排序,一次一批可以針對外部鏈接)
- 使用
- 窗口之后調用 apply ,創建的元素里面方法的參數是一個迭代器
常用的一些方法
- window
- timeWindow和 countWind
- process 和 apply
AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
簡而言之,前一個接口將會周期性發送Watermark,而第二個接口根據一些到達數據的屬性,例如一旦在流中碰到一個特殊的element便發送Watermark。
自定義窗口
- Window Assigner:負責將元素分配到不同的window。
- Trigger即觸發器,定義何時或什么情況下Fire一個window。
- 對於CountWindow,我們可以直接使用已經定義好的Trigger:CountTrigger trigger(CountTrigger.of(2))
- Evictor(可選) 驅逐者,即保留上一window留下的某些元素。
- 最簡單的情況,如果業務不是特別復雜,僅僅是基於Time和Count,我們其實可以用系統定義好的WindowAssigner以及Trigger和Evictor來實現不同的組合:
window 出現數據傾斜
- window 產生數據傾斜指的是數據在不同的窗口內堆積的數據量相差過多。本質上產生這種情況的原因是數據源頭發送的數據量速度不同導致的。出現這種情況一般通過兩種方式來解決:
- 在數據進入窗口前做預聚合;
- 重新設計窗口聚合的 key;
關注公眾號:Java大數據與數據倉庫,回復 "資料",領取大數據資料,學習大數據技術。