Windows是Flink流計算的核心,本文將概括的介紹幾種窗口的概念,重點只放在窗口的應用上。
本實驗的數據采用自擬電影評分數據(userId, movieId, rating, timestamp),userId和movieId范圍分別為1-100和1-200的隨機數,rating范圍為[0:0.5:5.0]一共10個檔位,timestamp為10000-20000之間的隨機數,且數據順序采用timestamp的升序排列。(2.1-2.6節的數據是亂序)
一、窗口(window)的類型
對於窗口的操作主要分為兩種,分別對於Keyedstream和Datastream。他們的主要區別也僅僅在於建立窗口的時候一個為.window(...),一個為.windowAll(...)。對於Keyedstream的窗口來說,他可以使得多任務並行計算,每一個logical key stream將會被獨立的進行處理。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...)/.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
按照窗口的Assigner來分,窗口可以分為
Tumbling window, sliding window,session window,global window,custom window
每種窗口又可分別基於processing time和event time,這樣的話,窗口的類型嚴格來說就有很多。
還有一種window叫做count window,依據元素到達的數量進行分配,之后也會提到。
窗口的生命周期開始在第一個屬於這個窗口的元素到達的時候,結束於第一個不屬於這個窗口的元素到達的時候。
二、窗口的操作
2.1 Tumbling window
固定相同間隔分配窗口,每個窗口之間沒有重疊看圖一眼明白。
下面的例子定義了每隔3毫秒一個窗口的流:
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(TumblingEventTimeWindows.of(Time.milliseconds(3)));
2.2 Sliding Windows
跟上面一樣,固定相同間隔分配窗口,只不過每個窗口之間有重疊。窗口重疊的部分如果比窗口小,窗口將會有多個重疊,即一個元素可能被分配到多個窗口里去。
下面的例子給出窗口大小為10毫秒,重疊為5毫秒的流:
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5)));
2.3 Session window
這種窗口主要是根據活動的事件進行窗口化,他們通常不重疊,也沒有一個固定的開始和結束時間。一個session window關閉通常是由於一段時間沒有收到元素。在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續活躍的周期),由非活躍的間隙分隔開。
// 靜態間隔時間
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
// 動態時間
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(EventTimeSessionWindows.withDynamicGap(()));
2.4 Global window
將所有相同keyed的元素分配到一個窗口里。好吧,就這樣:
WindowedStream<MovieRate, Integer, GlobalWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(GlobalWindows.create());
三、窗口函數
窗口函數就是這四個:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前兩個執行得更有效,因為Flink可以增量地聚合每個到達窗口的元素。
Flink必須在調用函數之前在內部緩沖窗口中的所有元素,所以使用ProcessWindowFunction進行操作效率不高。不過ProcessWindowFunction可以跟其他的窗口函數結合使用,其他函數接受增量信息,ProcessWindowFunction接受窗口的元數據。
舉一個AggregateFunction的例子吧,下面代碼為MovieRate按user分組,且分配5毫秒的Tumbling窗口,返回每個user在窗口內評分的所有分數的平均值。
DataStream<Tuple2<Integer,Double>> Rates = rates
.keyBy(MovieRate::getUserId)
.window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
.aggregate(new AggregateFunction<MovieRate, AverageAccumulator, Tuple2<Integer,Double>>() {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
acc.userId = movieRate.userId;
acc.sum += movieRate.rate;
acc.count++;
return acc;
}
@Override
public Tuple2<Integer,Double> getResult(AverageAccumulator acc) {
return Tuple2.of(acc.userId, acc.sum/(double)acc.count);
}
@Override
public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
acc0.count += acc1.count;
acc0.sum += acc1.sum;
return acc0;
}
});
public static class AverageAccumulator{
int userId;
int count;
double sum;
}
以下是部分輸出:
...
1> (44,3.0)
4> (96,0.5)
2> (51,0.5)
3> (90,2.75)
...
看上面的代碼,會發現add()函數特別生硬,因為我們想返回Tuple2<Integer, Double>類型,即Integer為key,但AggregateFunction似乎沒有提供這個機制可以讓AverageAccumulator的構造函數提供參數。所以,這里引入ProcessWindowFunction與AggregateFunction的結合版,AggregateFunction進行增量疊加,當窗口關閉時,ProcessWindowFunction將會被提供AggregateFunction返回的結果,進行Tuple封裝:
DataStream<Tuple2<Integer,Double>> Rates = rates
.keyBy(MovieRate::getUserId)
.window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());
public static class MyAggregateFunction implements AggregateFunction<MovieRate, AverageAccumulator, Double> {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
acc.sum += movieRate.rate;
acc.count++;
return acc;
}
@Override
public Double getResult(AverageAccumulator acc) {
return acc.sum/(double)acc.count;
}
@Override
public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
acc0.count += acc1.count;
acc0.sum += acc1.sum;
return acc0;
}
}
public static class MyProcessWindowFunction extends
ProcessWindowFunction<Double, Tuple2<Integer, Double>, Integer, TimeWindow> {
@Override
public void process(Integer key,
Context context,
Iterable<Double> results,
Collector<Tuple2<Integer, Double>> out) throws Exception {
Double result = results.iterator().next();
out.collect(new Tuple2<>(key, result));
}
}
public static class AverageAccumulator{
int count;
double sum;
}
可以得到,結果與上面一樣,但代碼好看了很多。
四、其他操作
4.1 Triggers(觸發器)
觸發器定義了窗口何時准備好被窗口處理。每個窗口分配器默認都有一個觸發器,如果默認的觸發器不符合你的要求,就可以使用trigger(...)自定義觸發器。
通常來說,默認的觸發器適用於多種場景。例如,多有的event-time窗口分配器都有一個EventTimeTrigger作為默認觸發器。該觸發器在watermark通過窗口末尾時出發。
PS:GlobalWindow默認的觸發器時NeverTrigger,該觸發器從不出發,所以在使用GlobalWindow時必須自定義觸發器。
4.2 Evictors(驅逐器)
Evictors可以在觸發器觸發之后以及窗口函數被應用之前和/或之后可選擇的移除元素。使用Evictor可以防止預聚合,因為窗口的所有元素都必須在應用計算邏輯之前先傳給Evictor進行處理
4.3 Allowed Lateness
當使用event-time窗口時,元素可能會晚到,例如Flink用於跟蹤event-time進度的watermark已經超過了窗口的結束時間戳。
默認來說,當watermark超過窗口的末尾時,晚到的元素會被丟棄。但是flink也允許為窗口operator指定最大的allowed lateness,以至於可以容忍在徹底刪除元素之前依然接收晚到的元素,其默認值是0。
為了支持該功能,Flink會保持窗口的狀態,知道allowed lateness到期。一旦到期,flink會刪除窗口並刪除其狀態。
把晚到的元素當作side output。
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);