Windows是處理無線數據流的核心,它將流分割成有限大小的桶(buckets),並在其上執行各種計算。
窗口化的Flink程序的結構通常如下,有分組流(keyed streams)和無分組流(non-keyed streams)兩種。兩者的不同之處在於,分組流中調用了keyBy(...)
方法,無分組流中使用windowAll(...)
替代分組流中的window(...)
方法。
Window生命周期
當屬於一個窗口的第一個元素到達時,這個窗口被創建,當時間(event or processing time)經過了它的結束時間戳與用戶指定允許延時之后,窗口將被完全移除。同時,Flink確保只對基於時間的窗口執行刪除操作,而對於其他類型不做此處理(例:global windows)。舉個例子,基於事件時間的窗口策略每5分鍾創建一個不重疊窗口,允許1分鍾的延時,那么,當時間戳屬於12:00-12:05這個區間的第一個元素到達時,Flink將為其創建一個新的窗口,一直到watermark到達12:06這個時間戳時,Flink刪除該窗口。
Flink中,每個窗口都有一個觸發器(Trigger)和函數(ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)與之關聯。其中,函數中包含了作用於窗口中的元素的計算邏輯,觸發器用於說明什么條件下執行窗口的函數計算。觸發策略通常類似於“當窗口中的元素個數超過4個時”,或者“當watermark到達窗口結束時間戳時”。觸發器還可以決定在窗口生命周期內的任何時間清除窗口中的內容。這種情況下的清除操作只會涉及到窗口中的元素,而不會清除窗口的元數據(window metadata)。也就是說,新的元素任然可以被添加到這個窗口中。
除此之外,你還可以指定一個回收器(Evictor),它能夠在觸發器被觸發后以及函數作用之前或之后從窗口中刪除元素。
分組窗口和無分組窗口
在定義窗口之前,首先需要明確的是我們的數據流是否需要分組。使用keyBy(...)
會將無線流分隔成邏輯上分組的流,反之,則不會分組流數據。
在分組流中,傳入事件的任何屬性都可以作為分組流的鍵。由於每個分組流都可以獨立於其他流被處理,所以分組流中允許多個任務並行地進行窗口計算。所有引用了同一個鍵的元素將會被發送到相同的並行任務。
對於無分組的數據流,數據源不會被分隔成多個邏輯流,所有的窗口計算邏輯將會在一個任務中執行。
窗口分配器(Window Assigners)
確定了窗口是否分組之后,接下來我們需要定義分配器,窗口分配器定義如何將元素分配給窗口。
WindowAssigner負責將傳入的元素分配給一個或多個窗口。Flink基於一些常見的應用場景,為我們提供了幾個預定義的WindowAssigner,分別是滾動窗口(tumbling windows)、滑動窗口(sliding windows)、會話窗口(session windows)以及全局窗口(global windows)。我們也可以通過繼承WindowAssigner類來自定義窗口分配器邏輯。Flink內置的WindowAssigner中,除了global windows,其余幾個都是基於時間(processing time or event time)來為窗口分配元素。
基於時間的窗口包含一個start timestamp(大於等於)和一個end timestamp(小於),兩者的時間差用於表示窗口大小。同時,我們可以通過Flink提供的TimeWindow來查詢開始、結束時間戳,還可以通過maxTimestamp()
方法獲取給定窗口允許的最大時間戳。
Tumbling Windows
滾動窗口分配器會將每個元素分配給一個指定窗口大小的窗口。滾動窗口具有固定的窗口大小,並且窗口之間不會重疊。比如下圖展示的是一個設定為5分鍾窗口大小的滾動窗口,每五分鍾會創建一個新的窗口。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
如上段代碼中最后一個例子展示的那樣,tumbling window assigners包含一個可選的offset
參數,我們可以用它來改變窗口的對齊方式。比如,一個沒有偏移量的按小時滾動窗口,它創建的時間窗口通常是1:00:00.000 - 1:59:59.999
,2:00:00.000 - 2:59:59.999
,當我們給定一個15分鍾的偏移量時,時間窗口將會變成1:15:00.000 - 2:14:59.999
,2:15:00.000 - 3:14:59.999
。在實際應用中,一個比較常見的使用場景是通過offset
將窗口調整到UTC-0以外的時區,比如通過Time.hours(-8)
調整時區到東八區。
Sliding Windows
滑動窗口分配器同樣是將元素分配給固定大小的時間窗口,窗口大小的配置方式與滾動窗口一樣,不同之處在於,滑動窗口還有一個額外的slide
參數用於控制窗口滑動的頻率。當slide
小於window size
時,滑動窗口便會重疊。這種情況下同一個元素將會被分配給多個窗口。
比如下圖這樣,設置了一個10分鍾大小的滑動窗口,它的滑動參數(slide
)為5分鍾。這樣的話,每5分鍾將會創建一個新的窗口,並且這個窗口中包含了一部分來自上一個窗口的元素。
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
同樣,我們可以通過offset
參數來為窗口設置偏移量。
Session Windows
會話窗口通過活動會話來對元素進行分組。不同於滾動窗口和滑動窗口,會話窗口不會重疊,也沒有固定的開始、結束時間。當一個會話窗口在指定的時間區間內沒有接收到新的數據時,這個窗口將會被關閉。會話窗口分配器可以直接配置一個靜態常量會話間隔,也可以通過函數來動態指定會話間隔時間。
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
如上,固定大小的會話間隔可以通過Time.milliseconds(x)
,Time.seconds(x)
,Time.minutes(x)
來指定,動態會話間隔通過實現SessionWindowTimeGapExtractor
接口來指定。
注意:由於會話窗口沒有固定的開始結束時間,它的計算方法與滾動窗口、滑動窗口有所不同。在一個會話窗口算子內部會為每一個接收到的元素創建一個新的窗口,如果這些元素之間的時間間隔小於定義的會話窗口間隔,則將阿門合並到一個窗口。為了能夠進行窗口合並,我們需要為會話窗口定義一個Tigger
函數和Window Function
函數(例如ReduceFunction, AggregateFunction, or ProcessWindowFunction. FoldFunction不能用於合並)。
Global Windows
全局窗口分配器會將具有相同key值的所有元素分配在同一個窗口。這種窗口模式下需要我們設置一個自定義的Trigger
,否則將不會執行任何計算,這是因為全局窗口中沒有一個可以處理聚合元素的自然末端。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
Window Function
定義好窗口分配器之后,我們需要指定作用於每個窗口上的計算。這可以通過指定Window Function來實現,一旦系統確定了某個窗口已經准備好進行處理,該函數將會處理窗口中的每個元素。
Window Function通常有這幾種:ReduceFunction,AggregateFunction,FoldFunction以及ProcessWindowFunction。其中,前兩個函數可以高效執行,因為Flink可以在每個元素到達窗口時增量的聚合這些元素。ProcessWindowFunction持有一個窗口中包含的所有元素的Iterable對象,以及元素所屬窗口的附加meta信息。
ProcessWindowFunction
無法高效執行是因為在調用函數之前Flink必須在內部緩存窗口中的所有元素。我們可以將ProcessWindowFunction
和ReduceFunction
,AggregateFunction
, 或者FoldFunction
函數結合來緩解這個問題,從而可以獲取窗口元素的聚合數據以及ProcessWindowFunction接收的窗口meta數據。
ReduceFunction
ReduceFunction用於指明如何組合輸入流中的兩個元素來生成一個相同類型的輸出元素。Flink使用ReduceFunction增量地聚合窗口中的元素。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
AggregateFunction
AggregateFunction可以稱之為廣義上的ReduceFunction,它包含三種元素類型:輸入類型(IN),累加器類型(ACC)以及輸出類型(OUT)。AggregateFunction接口中有一個用於創建初始累加器、合並兩個累加器的值到一個累加器以及從累加器中提取輸出結果的方法。
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
FoldFunction
FoldFunction用於指定窗口中的輸入元素如何與給定類型的輸出元素相結合。對於輸入到窗口中的每個元素,遞增調用FoldFunction將其與當前輸出值合並。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
注意:fold()不能用於會話窗口或其他可合並的窗口
ProcessWindowFunction
從ProcessWindowFunction中可以獲取一個包含窗口中所有元素的迭代對象,以及一個用來訪問時間和狀態信息的Context對象,這使得它比其他窗口函數更加靈活。當然,這也帶來了更大的性能開銷和資源消耗。
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
其中的key參數是通過keyBy()
中指定的KeySelector
來獲取的鍵值。對於元組(tuple)索引的key或是字符串字段引用的key,這里的KEY參數類型都是元組類型,我們需要手動將其轉換為正確大小的元組,以便於從中提取key值。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
ProcessWindowFunction with Incremental Aggregation
正如前文中提到的,我們可以將ReduceFunction、AggregateFunction或者FoldFunction與ProcessWindowFunction結合起來使用,這樣不但可以增量地執行窗口計算,還可以獲取ProcessWindowFunction為我們提供的一些額外的窗口meta信息。
Incremental Window Aggregation with ReduceFunction
下面這個例子說明了如何將二者結合起來,以返回窗口中的最小事件和窗口的開始時間
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
Incremental Window Aggregation with AggregateFunction
示例:計算元素平均值,同時輸出key值與均值。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
Incremental Window Aggregation with FoldFunction
示例:返回窗口中的事件數量,同時返回key值和窗口結束時間。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
Triggers
Trigger用於決定窗口什么時候被window function處理。Flink中每個WindowAssigner都有一個默認的Trigger。我們也可以通過trigger(...)
函數來自定義觸發規則。
Trigger接口包含以下5個方法:
- The
onElement()
method is called for each element that is added to a window. - The
onEventTime()
method is called when a registered event-time timer fires. - The
onProcessingTime()
method is called when a registered processing-time timer fires. - The
onMerge()
method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge,_e.g._when using session windows. - Finally the
clear()
method performs any action needed upon removal of the corresponding window.
Evictors
Flink窗口模式允許我們指定一個WindowAssigner和Trigger之外的可選的Evictor。Evictor可以在觸發器啟動之后、窗口函數作用之前或之后移出窗口中的元素。
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
Flink為我們提供了三個預定義的evictors:
CountEvictor
: 保留窗口中用戶指定數量的元素,從窗口緩沖區開始部分刪除其他元素。DeltaEvictor
: 獲取一個DeltaFunction函數和閾值,計算窗口緩沖區中其余元素與最后一個元素的Delta值,然后將Delta值大於等於閾值的元素移除。TimeEvictor
: 持有一個毫秒級的interval
參數,對於一個給定窗口,找到元素中的最大時間戳max_ts,然后刪除那些時間戳小於max_ts - interval值的元素。
所有預定義的Evictor均會在窗口函數作用之前執行。
Allowed Lateness
當使用事件時間窗口時,可能會出現元素延遲到達的情況。例如,Flink用於跟蹤單事件時間進程的watermark已經越過了元素所屬窗口的結束時間。
默認情況下,當watermark越過了窗口結束時間時,延遲到達的元素將會被丟棄。但是,Flink允許我們指定一個窗口的最大延遲時間,允許元素在被刪除前(watermark到達結束時間時)可以延遲多長時間,它的默認值為0。根據所用觸發器的不同,延遲到達但未廢棄的元素可能會導致窗口的再次觸發,使用EventTimeTrigger
會有這種情況。
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
Side Output
Flink的side output可以讓我們獲得一個廢棄元素的數據流。如下,通過設置窗口的sideOutputLateData(OutputTag)
可以獲取旁路輸出流。
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
閱讀原文:一文搞懂Flink Window機制