Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。本文主要聚焦於在Flink中如何進行窗口操作,以及程序員如何從window提供的功能中獲得最大的收益。
窗口化的Flink程序的一般結構如下,第一個代碼段中是分組的流,而第二段是非分組的流。正如我們所見,唯一的區別是分組的stream調用keyBy(…)
和window(…)
,而非分組的stream中window()
換成了windowAll(…)
,這些也將貫穿都這一頁的其他部分中。
Keyed Windows
stream.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function"
Non-Keyed Windows
stream.windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function"
在上面的例子中,方括號[]內的命令是可選的,這表明Flink允許你根據最符合你的要求來定義自己的window邏輯。
Window 的生命周期
簡單地說,當一個屬於window的元素到達之后這個window就創建了,而當當前時間(事件或者處理時間)為window的創建時間跟用戶指定的延遲時間相加時,窗口將被徹底清除。Flink 確保了只清除基於時間的window,其他類型的window不清除,例如:全局window(詳情:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners) 。例如:對於一個每5分鍾創建無覆蓋的(即 翻滾窗口)窗口,允許一個1分鍾的時延的窗口策略,Flink將會在12:00到12:05這段時間內第一個元素到達時創建窗口,當水印通過12:06時,移除這個窗口。
此外,每個 Window 都有一個Trigger(觸發器,詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers) 和一個附屬於 Window 的函數(例如: WindowFunction
, ReduceFunction
及 FoldFunction
),詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-functions 。函數里包含了應用於窗口(Window)內容的計算,而Trigger(觸發器)則指定了函數在什么條件下可被應用(函數何時被觸發),一個觸發策略可以是 "當窗口中的元素個數超過4個時" 或者 "當水印達到窗口的邊界時"。觸發器還可以決定在窗口創建和刪除之間的任意時刻清除窗口的內容,本例中的清除僅指清除窗口的內容而不是窗口的元數據,也就是說新的數據還是可以被添加到當前的window中。
除了上面的提到之外,你還可以指定一個驅逐者(Evictor,詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#evictors ), Evictor
將在觸發器觸發之后或者在函數被應用之前或者之后,清楚窗口中的元素。
接下來我們將更深入的去了解上述的部件,我們從上述片段的主要部分開始(如:Keyed
vs Non-Keyed Windows
, Window Assigner
, 及 Window Function
),然后是可選部分。
分組和非分組Windows (Keyed vs Non-Keyed Windows)
首先,第一件事是指定你的數據流是分組的還是未分組的,這個必須在定義 window 之前指定好。使用 keyBy(...)
會將你的無限數據流拆分成邏輯分組的數據流,如果 keyBy(...)
函數不被調用的話,你的數據流將不是分組的。
在分組數據流中,任何正在傳入的事件的屬性都可以被當做key(更多詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#specifying-keys ),分組數據流將你的window計算通過多任務並發執行,以為每一個邏輯分組流在執行中與其他的邏輯分組流是獨立地進行的。
在非分組數據流中,你的原始數據流並不會拆分成多個邏輯流並且所有的window邏輯將在一個任務中執行,並發度為1。
窗口分配器(Window Assingers)
指定完你的數據流是分組的還是非分組的之后,接下來你需要定義一個窗口分配器(window assigner
),窗口分配器定義了元素如何分配到窗口中,這是通過在分組數據流中調用window(...)
或者非分組數據流中調用windowAll(...)
時你選擇的窗口分配器(WindowAssigner
)來指定的。WindowAssigner
是負責將每一個到來的元素分配給一個或者多個窗口(window),Flink 提供了一些常用的預定義窗口分配器,即:滾動窗口、滑動窗口、會話窗口和全局窗口。你也可以通過繼承WindowAssigner
類來自定義自己的窗口。所有的內置窗口分配器(除了全局窗口 global window
)都是通過時間來分配元素到窗口中的,這個時間要么是處理的時間,要么是事件發生的時間。請看一下我們的 event time
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html )部分來了解更多處理時間和事件時間的區別及時間戳(timestamp
)和水印(watermark
)是如何產生的。
接下來我們將展示Flink的預定義窗口分配器是如何工作的,以及它們在DataStream
程序中是如何使用的。接下來我們將展示Flink的預定義窗口分配器是如何工作的,以及它們在DataStream
程序中是如何使用的。下圖中展示了每個分配器是如何工作的,紫色圓圈代表着數據流中的一個元素,這些元素是通過一些key進行分區(在本例中是 user1,user2,user3), X軸顯示的是時間進度。
滾動窗口
滾動窗口分配器將每個元素分配的一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,並且不會出現重疊。例如:如果你指定了一個5分鍾大小的滾動窗口,當前窗口將被評估並將按下圖說明每5分鍾創建一個新的窗口。
![滾動窗口][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/tumbling-windows.svg ]
下面的代碼片段展示了如何使用滾動窗口。
Java 代碼
DataStream<T> input = ...;
滾動事件時間窗口( tumbling event-time windows )
input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>);
滾動處理時間窗口(tumbling processing-time windows)
input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>);
每日偏移8小時的滾動事件時間窗口(daily tumbling event-time windows offset by -8 hours. )
input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
Scala 代碼:
val input:DataStream[T] =
滾動事件時間窗口(tumbling event-time windows)
input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>)
滾動處理時間窗口(tumbling processing-time windows)
input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>)
每日偏移8小時的滾動事件時間窗口(daily tumbling event-time windows offset by -8 hours. )
input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
等其中的一個來指定。
在上面最后的例子中,滾動窗口分配器還接受了一個可選的偏移參數,可以用來改變窗口的排列。例如,沒有偏移的話按小時的滾動窗口將按時間紀元來對齊,也就是說你將一個如: 1:00:00.000~1:59:59.999,2:00:00.000~2:59:59.999等,如果你想改變一下,你可以指定一個偏移,如果你指定了一個15分鍾的偏移,你將得到1:15:00.000~2:14:59.999,2:15:00.000~3:14:59.999等。時間偏移一個很大的用處是用來調准非0時區的窗口,例如:在中國你需要指定一個8小時的時間偏移。
滑動窗口(Sliding Windows)
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小於滾動參數的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。
例如,你有10分鍾的窗口和5分鍾的滑動,那么每個窗口中5分鍾的窗口里包含着上個10分鍾產生的數據,如下圖所示:
![][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/sliding-windows.svg]
下面的代碼片段中展示了如何使用滑動窗口:
Java 代碼:
DataStream<T> input = ...;
滑動事件時間窗口
input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>);
滑動處理時間窗口
input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>);
//偏移8小時的滑動處理時間窗口(sliding processing-time windows offset by -8 hours)
input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
Scala 代碼:
val input: DataStream[T] = ...
// 滑動事件時間窗口(sliding event-time windows)
input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>)
//滑動處理時間窗口(sliding processing-time windows)
input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>)
// 偏移8小時的滑動處理時間窗口(sliding processing-time windows offset by -8 hours)
input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
等來指定。
正如上述例子所示,滑動窗口分配器也有一個可選的偏移參數來改變窗口的對齊。例如,沒有偏移參數,按小時的窗口,有30分鍾的滑動,將根據時間紀元來對齊,也就是說你將得到如下的窗口1:00:00.001:59:59.999,1:30:00.0002:29:59.999等。而如果你想改變窗口的對齊,你可以給定一個偏移,如果給定一個15分鍾的偏移,你將得到如下的窗口:1:15:00.000~2:14.59.999, 1:45:00.000~2:44:59.999等。時間偏移一個很大的用處是用來調准非0時區的窗口,例如:在中國你需要指定一個8小時的時間偏移。
會話窗口(Session Windows)
session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況。相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度。當這個非活躍周期產生,那么當前的session將關閉並且后續的元素將被分配到新的session窗口中去。
![會話窗口][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/session-windows.svg]
下面的代碼片段中展示了如何使用session窗口
Java代碼:
DataStream<T> input = ...;
// 事件時間會話窗口(event-time session windows)
input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>);
// 處理時間會話窗口(processing-time session windows)
input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>);
Scala代碼:
val input: DataStream[T] = ...
// 事件時間會話窗口(event-time session windows)
input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>)
// 處理時間會話窗口(processing-time session windows)
input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
等來指定。
注意: 因為session看窗口沒有一個固定的開始和結束,他們的評估與滑動窗口和滾動窗口不同。在內部,session操作為每一個到達的元素創建一個新的窗口,並合並間隔時間小於指定非活動間隔的窗口。為了進行合並,session窗口的操作需要指定一個合並觸發器(Trigger)和一個合並窗口函數(Window Function),如:ReduceFunction或者WindowFunction(FoldFunction不能合並)。
全局窗口(Global Windows)
全局窗口分配器將所有具有相同key的元素分配到同一個全局窗口中,這個窗口模式僅適用於用戶還需自定義觸發器的情況。否則,由於全局窗口沒有一個自然的結尾,無法執行元素的聚合,將不會有計算被執行。
![全局窗口][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/non-windowed.svg]
下面的代碼片段展示了如何使用全局窗口:
Java 代碼:
DataStream<T> input = ...; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
Scala代碼:
val input: DataStream[T] = ...
input
.keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>)
窗口函數(Window Functions)
定義完窗口分配器后,我們還需要為每一個窗口指定我們需要執行的計算,這是窗口的責任,當系統決定一個窗口已經准備好執行之后,這個窗口函數將被用來處理窗口中的每一個元素(可能是分組的)。請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers 來了解當一個窗口准備好之后,Flink是如何決定的。
window函數可以是ReduceFunction
, FoldFunction
或者 WindowFunction
中的一個。前面兩個更高效一些(),因為在每個窗口中增量地對每一個到達的元素執行聚合操作。一個 WindowFunction
可以獲取一個窗口中的所有元素的一個迭代以及哪個元素屬於哪個窗口的額外元信息。
有WindowFunction
的窗口化操作會比其他的操作效率要差一些,因為Flink內部在調用函數之前會將窗口中的所有元素都緩存起來。這個可以通過WindowFunction
和ReduceFunction
或者FoldFunction
結合使用來獲取窗口中所有元素的增量聚合和WindowFunction
接收的額外的窗口元數據,接下來我們將看一看每一種變體的示例。
ReduceFunction
ReduceFunction
指定了如何通過兩個輸入的參數進行合並輸出一個同類型的參數的過程,Flink使用ReduceFunction
來對窗口中的元素進行增量聚合。
一個ReduceFunction
可以通過如下的方式來定義和使用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });
Scala 代碼:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>) .window(<window assigner>) .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
上面的例子是將窗口所有元素中元組的第二個屬性進行累加操作。
FoldFunction
FoldFunction
指定了一個輸入元素如何與一個輸出類型的元素合並的過程,這個FoldFunction
會被每一個加入到窗口中的元素和當前的輸出值增量地調用,第一個元素是與一個預定義的類型為輸出類型的初始值合並。
一個FoldFunction可以通過如下的方式定義和調用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> { public String fold(String acc, Tuple2<String, Long> value) { return acc + value.f1; } });
Scala 代碼:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
上面例子追加所有輸入的長整型到一個空的字符串中。
注意 fold()
不能應用於回話窗口或者其他可合並的窗口中。
窗口函數 —— 一般用法(WindowFunction - The Generic Case)
一個WindowFunction
將獲得一個包含了window
中的所有元素迭代(Iterable
),並且提供所有窗口函數的最大靈活性。這些帶來了性能的成本和資源的消耗,因為window
中的元素無法進行增量迭代,而是緩存起來直到window
被認為是可以處理時為止。
WindowFunction
的使用說明如下:
Java 代碼:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable { /** // Evaluates the window and outputs none or several elements. // @param key The key for which this window is evaluated. // @param window The window that is being evaluated. // @param input The elements in the window being evaluated. // @param out A collector for emitting elements. // @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception; }
Scala 代碼:
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { /** // Evaluates the window and outputs none or several elements. // // @param key The key for which this window is evaluated. // @param window The window that is being evaluated. // @param input The elements in the window being evaluated. // @param out A collector for emitting elements. // @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) }
一個WindowFunction
可以按如下方式來定義和使用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction()); /* ... */ public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> { void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) { long count = 0; for (Tuple<String, Long> in: input) { count++; } out.collect("Window: " + window + "count: " + count); } }
Scala 代碼:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
/* ... */ class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = { var count = 0L for (in <- input) { count = count + 1 } out.collect(s"Window $window count: $count") } }
上面的例子展示了統計一個window
中元素個數的WindowFunction
,此外,還將window
的信息添加到輸出中。
注意:使用WindowFunction
來做簡單的聚合操作如計數操作,性能是相當差的。下一章節我們將展示如何將ReduceFunction
跟WindowFunction
結合起來,來獲取增量聚合和添加到WindowFunction
中的信息。
ProcessWindowFunction
在使用WindowFunction
的地方你也可以用ProcessWindowFunction
,這跟WindowFunction
很類似,除了接口允許查詢跟多關於context
的信息,context
是window
評估發生的地方。
下面是ProcessWindowFunction
的接口:
Java 代碼:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** // Evaluates the window and outputs none or several elements. // // @param key The key for which this window is evaluated. // @param context The context in which the window is being evaluated. // @param elements The elements in the window being evaluated. // @param out A collector for emitting elements. // // @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** // The context holding window metadata */ public abstract class Context { /** // @return The window that is being evaluated. */ public abstract W window(); } }
Scala 代碼:
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function { /** // Evaluates the window and outputs none or several elements. // // @param key The key for which this window is evaluated. // @param context The context in which the window is being evaluated. // @param elements The elements in the window being evaluated. // @param out A collector for emitting elements. // @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ @throws[Exception] def process( key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]) /** // The context holding window metadata */ abstract class Context { /** // @return The window that is being evaluated. */ def window: W } }
ProcessWindowFunction
可以通過如下方式調用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .process(new MyProcessWindowFunction());` Scala 代碼: `val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>) .window(<window assigner>) .process(new MyProcessWindowFunction())
有增量聚合功能的WindowFunction (WindowFunction with Incremental Aggregation)
WindowFunction
可以跟ReduceFunction
或者FoldFunction
結合來增量地對到達window
中的元素進行聚合,當window
關閉之后,WindowFunction
就能提供聚合結果。當獲取到WindowFunction
額外的window
元信息后就可以進行增量計算窗口了。
標注:你也可以使用ProcessWindowFunction
替換WindowFunction
來進行增量窗口聚合。
使用FoldFunction 進行增量窗口聚合(Incremental Window Aggregation with FoldFunction)
下面的例子展示了一個增量的FoldFunction
如何跟一個WindowFunction
結合,來獲取窗口的事件數,並同時返回窗口的key
和窗口的最后時間。
Java 代碼:
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction()) // Function definitions private static class MyFoldFunction implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > { public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { Integer cur = acc.getField(2); acc.setField(2, cur + 1); return acc; } } private static class MyWindowFunction implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { public void apply(String key, TimeWindow window, Iterable<Tuple3<String, Long, Integer>> counts, Collector<Tuple3<String, Long, Integer>> out) { Integer count = counts.iterator().next().getField(2); out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count)); } }
Scala 代碼:
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, ( key: String, window: TimeWindow, counts: Iterable[(String, Long, Int)], out: Collector[(String, Long, Int)] ) => { val count = counts.iterator.next() out.collect((key, window.getEnd, count._3)) } )
使用ReduceFunction進行增量窗口聚合(Incremental Window Aggregation with ReduceFunction)
下面例子展示了一個增量額ReduceFunction
如何跟一個WindowFunction
結合,來獲取窗口中最小的事件和窗口的開始時間。
Java 代碼:
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(new MyReduceFunction(), new MyWindowFunction()); // Function definitions private static class MyReduceFunction implements ReduceFunction<SensorReading> { public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyWindowFunction implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { public void apply(String key, TimeWindow window, Iterable<SensorReading> minReadings, Collector<Tuple2<Long, SensorReading>> out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min)); } }
Scala 代碼:
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, ( key: String, window: TimeWindow, minReadings: Iterable[SensorReading], out: Collector[(Long, SensorReading)] ) => { val min = minReadings.iterator.next() out.collect((window.getStart, min)) } )
觸發器(Triggers)
觸發器決定了一個窗口何時可以被窗口函數處理,每一個窗口分配器都有一個默認的觸發器,如果默認的觸發器不能滿足你的需要,你可以通過調用trigger(...)
來指定一個自定義的觸發器。觸發器的接口有5個方法來允許觸發器處理不同的事件:
*onElement()
方法,每個元素被添加到窗口時調用
*onEventTime()
方法,當一個已注冊的事件時間計時器啟動時調用
*onProcessingTime()
方法,當一個已注冊的處理時間計時器啟動時調用
*onMerge()
方法,與狀態性觸發器相關,當使用會話窗口時,兩個觸發器對應的窗口合並時,合並兩個觸發器的狀態。
*最后一個clear()
方法執行任何需要清除的相應窗口
上面的方法中有兩個需要注意的地方:
1)第一、三通過返回一個TriggerResult
來決定如何操作調用他們的事件,這些操作可以是下面操作中的一個;
CONTINUE
:什么也不做
FIRE
:觸發計算
PURGE
:清除窗口中的數據
FIRE_AND_PURGE
:觸發計算並清除窗口中的數據
2)這些函數可以被用來為后續的操作注冊處理時間定時器或者事件時間計時器
觸發和清除(Fire and Purge)
一旦一個觸發器決定一個窗口已經准備好進行處理,它將觸發並返回FIRE
或者FIRE_AND_PURGE
。這是窗口操作發送當前窗口結果的信號,給定一個擁有一個WindowFunction
的窗口那么所有的元素都將發送到WindowFunction
中(可能之后還會發送到驅逐器(Evitor
)中)。有ReduceFunction
或者FoldFunction
的Window
僅僅發送他們的急切聚合結果。
當一個觸發器觸發時,它可以是FIRE
或者FIRE_AND_PURGE
,如果是FIRE
的話,將保持window
中的內容,FIRE_AND_PURGE
的話,會清除window
的內容。默認情況下,預實現的觸發器僅僅是FIRE
,不會清除window
的狀態。
注意:清除操作僅清除window
的內容,並留下潛在的窗口元信息和完整的觸發器狀態。
窗口分配器默認的觸發器(Default Triggers of WindowAssigners)
默認的觸發器適用於許多種情況,例如:所有的事件時間分配器都有一個EventTimeTrigger
作為默認的觸發器,這個觸發器僅在當水印通過窗口的最后時間時觸發。
注意:GlobalWindow
默認的觸發器是NeverTrigger
,是永遠不會觸發的,因此,如果你使用的是GlobalWindow
的話,你需要定義一個自定義觸發器。
注意:通過調用trigger(...)
來指定一個觸發器你就重寫了WindowAssigner
的默認觸發器。例如:如果你為TumblingEventTimeWindows
指定了一個CountTrigger
,你就不會再通過時間來獲取觸發了,而是通過計數。現在,如果你想通過時間和計數來觸發的話,你需要寫你自己自定義的觸發器。
內置的和自定義的觸發器(Build-in and Custom Triggers)
Flink有一些內置的觸發器:
*EventTimeTrigger
(前面提到過)觸發是根據由水印衡量的事件時間的進度來的
*ProcessingTimeTrigger
根據處理時間來觸發
*CountTrigger
一旦窗口中的元素個數超出了給定的限制就會觸發
*PurgingTrigger
作為另一個觸發器的參數並將它轉換成一個清除類型
如果你想實現一個自定義的觸發器,你需要查看一下這個抽象類Trigger
(https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ),請注意,這個API還在優化中,后續的Flink版本可能會改變。
驅逐器(Evictors)
Flink的窗口模型允許指定一個除了WindowAssigner
和Trigger
之外的可選參數Evitor
,這個可以通過調用evitor(...)
方法(在這篇文檔的開頭展示過)來實現。這個驅逐器(evitor
)可以在觸發器觸發之前或者之后,或者窗口函數被應用之前清理窗口中的元素。為了達到這個目的,Evitor
接口有兩個方法:
/** // Optionally evicts elements. Called before windowing function. // // @param elements The elements currently in the pane. // @param size The current number of elements in the pane. // @param window The {@link Window} // @param evictorContext The context for the Evictor /// void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); /** // Optionally evicts elements. Called after windowing function. // // @param elements The elements currently in the pane. // @param size The current number of elements in the pane. // @param window The {@link Window} // @param evictorContext The context for the Evictor */ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evitorBefore()
方法包含了在window function
之前被應用的驅逐邏輯,而evitorAfter()
方法包含了在window function
之后被應用的驅逐邏輯。在window function
應用之前被驅逐的元素將不會再被window function
處理。
Flink有三個預實現的驅逐器,他們是:
CountEvitor:在窗口中保持一個用戶指定數量的元素,並在窗口的開始處丟棄剩余的其他元素
DeltaEvitor: 通過一個DeltaFunction
和一個閾值,計算窗口緩存中最近的一個元素和剩余的所有元素的delta
值,並清除delta
值大於或者等於閾值的元素
TimeEvitor:使用一個interval
的毫秒數作為參數,對於一個給定的窗口,它會找出元素中的最大時間戳max_ts
,並清除時間戳小於max_tx - interval
的元素。
默認情況下:所有預實現的evitor
都是在window function
前應用它們的邏輯
注意:指定一個Evitor
要防止預聚合,因為窗口中的所有元素必須得在計算之前傳遞到驅逐器中
注意:Flink 並不保證窗口中的元素是有序的,所以驅逐器可能從窗口的開始處清除,元素到達的先后不是那么必要。
允許延遲(Allowed Lateness)
當處理事件時間的window時,可能會出現元素到達晚了,Flink用來與事件時間聯系的水印已經過了元素所屬的窗口的最后時間。可以查看事件時間(event time
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html )尤其是晚到元素(late elements
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements )來了解Flink如何處理事件時間的討論。
默認情況下,當水印已經過了窗口的最后時間時晚到的元素會被丟棄。然而,Flink允許為窗口操作指定一個最大允許時延,允許時延指定了元素可以晚到多長時間,默認情況下是0。水印已經過了窗口最后時間后才來的元素,如果還未到窗口最后時間加時延時間,那么元素任然添加到窗口中。如果依賴觸發器的使用的話,晚到但是未丟棄的元素可能會導致窗口再次被觸發。
為了達到這個目的,Flink將保持窗口的狀態直到允許時延的發生,一旦發生,Flink將清除Window,刪除window的狀態,如Window 生命周期章節中所描述的那樣。
默認情況下,允許時延為0,也就是說水印之后到達的元素將被丟棄。
你可以按如下方式來指定一個允許時延:
Java 代碼:
DataStream<T> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>);
Scala 代碼:
val input: DataStream[T] = ...
input
.keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>)
注意:當使用GlobalWindows
分配器時,沒有數據會被認為是延遲的,因為Global Window
的最后時間是Long.MAX_VALUE
。
以側輸出來獲取延遲數據(Getting Late Data as a Site Output)
使用Flink的側輸出(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html )特性,你可以獲得一個已經被丟棄的延遲數據流。
首先你需要在窗口化的數據流中調用sideOutputLateData(OutputTag)
指定你需要獲取延遲數據,然后,你就可以在window 操作的結果中獲取到側輸出流了。
代碼如下:
Java 代碼:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; DataStream<T> input = ...; DataStream<T> result = input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>); DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
Scala代碼:
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .sideOutputLateData(lateOutputTag) .<windowed transformation>(<window function>) val lateStream = result.getSideOutput(lateOutputTag)
延遲元素考慮(Late elements considerations)
當指定一個允許延遲大於0時,window
以及window
中的內容將會繼續保持即使水印已經達到了window
的最后時間。在這種情況下,當一個延遲事件到來而未丟棄時,它可能會觸發window
中的其他觸發器。這些觸發叫做late firings
,因為它們是由延遲事件觸發的,並相對於window
中第一個觸發即主觸發而言。對於session window
而言,late firing
還會進一步導致window
的合並,因為它們橋接了兩個之前存在差距,而未合並的window
。
有用狀態大小的考慮(Useful state size considerations)
window 可以定義一個很長的周期(例如:一天、一周或者一月),因此積累了相當大的狀態。這里有些規則,當估計你的窗口計算的存儲要求時,需要記住。
1、Flink會在每個窗口中為每個屬於它的元素創建一份備份,鑒於此,滾動窗口保存了每個元素的一個備份,與此相反,滑動窗口會為每個元素創建幾個備份,如Window Assigner
章節所述。因此,一個窗口大小為1天,滑動大小為1秒的滑動窗口可能就不是個好的策略了。
2、FoldFunction
和ReduceFunction
可以制定reduce
的存儲需求,因為它們預聚合元素並且每個窗口只保存一個值。相反,只有WindowFunction
需要累積所有的元素。
3、使用Evitor
需要避免任何預聚合操作,因為窗口中的所有元素都需要在應用於計算之前傳遞到evitor
中
鏈接:https://www.jianshu.com/p/a883262241ef