Flink窗口介紹及應用


Windows是Flink流計算的核心,本文將概括的介紹幾種窗口的概念,重點只放在窗口的應用上。

本實驗的數據采用自擬電影評分數據(userId, movieId, rating, timestamp),userId和movieId范圍分別為1-100和1-200的隨機數,rating范圍為[0:0.5:5.0]一共10個檔位,timestamp為10000-20000之間的隨機數,且數據順序采用timestamp的升序排列。(2.1-2.6節的數據是亂序)

一、窗口(window)的類型

對於窗口的操作主要分為兩種,分別對於Keyedstream和Datastream。他們的主要區別也僅僅在於建立窗口的時候一個為.window(...),一個為.windowAll(...)。對於Keyedstream的窗口來說,他可以使得多任務並行計算,每一個logical key stream將會被獨立的進行處理。

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)/.windowAll(...)  <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

按照窗口的Assigner來分,窗口可以分為

Tumbling window, sliding window,session window,global window,custom window

每種窗口又可分別基於processing time和event time,這樣的話,窗口的類型嚴格來說就有很多。

還有一種window叫做count window,依據元素到達的數量進行分配,之后也會提到。

窗口的生命周期開始在第一個屬於這個窗口的元素到達的時候,結束於第一個不屬於這個窗口的元素到達的時候。

二、窗口的操作

2.1 Tumbling window

固定相同間隔分配窗口,每個窗口之間沒有重疊看圖一眼明白。

下面的例子定義了每隔3毫秒一個窗口的流:

WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
    .keyBy(MovieRate::getUserId)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(3)));

2.2 Sliding Windows

跟上面一樣,固定相同間隔分配窗口,只不過每個窗口之間有重疊。窗口重疊的部分如果比窗口小,窗口將會有多個重疊,即一個元素可能被分配到多個窗口里去。

下面的例子給出窗口大小為10毫秒,重疊為5毫秒的流:

WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                .keyBy(MovieRate::getUserId)
                .window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5)));

2.3 Session window

這種窗口主要是根據活動的事件進行窗口化,他們通常不重疊,也沒有一個固定的開始和結束時間。一個session window關閉通常是由於一段時間沒有收到元素。在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續活躍的周期),由非活躍的間隙分隔開。

// 靜態間隔時間
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                .keyBy(MovieRate::getUserId)
                .window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
// 動態時間
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                .keyBy(MovieRate::getUserId)
                .window(EventTimeSessionWindows.withDynamicGap(()));

2.4 Global window

將所有相同keyed的元素分配到一個窗口里。好吧,就這樣:

WindowedStream<MovieRate, Integer, GlobalWindow> Rates = rates
    .keyBy(MovieRate::getUserId)
    .window(GlobalWindows.create());

三、窗口函數

窗口函數就是這四個:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前兩個執行得更有效,因為Flink可以增量地聚合每個到達窗口的元素。

Flink必須在調用函數之前在內部緩沖窗口中的所有元素,所以使用ProcessWindowFunction進行操作效率不高。不過ProcessWindowFunction可以跟其他的窗口函數結合使用,其他函數接受增量信息,ProcessWindowFunction接受窗口的元數據。

舉一個AggregateFunction的例子吧,下面代碼為MovieRate按user分組,且分配5毫秒的Tumbling窗口,返回每個user在窗口內評分的所有分數的平均值。

DataStream<Tuple2<Integer,Double>> Rates = rates
                .keyBy(MovieRate::getUserId)
                .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
                .aggregate(new AggregateFunction<MovieRate, AverageAccumulator, Tuple2<Integer,Double>>() {
                    @Override
                    public AverageAccumulator createAccumulator() {
                        return new AverageAccumulator();
                    }

                    @Override
                    public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
                        acc.userId = movieRate.userId;
                        acc.sum += movieRate.rate;
                        acc.count++;
                        return acc;
                    }

                    @Override
                    public Tuple2<Integer,Double> getResult(AverageAccumulator acc) {
                        return  Tuple2.of(acc.userId, acc.sum/(double)acc.count);
                    }

                    @Override
                    public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
                        acc0.count += acc1.count;
                        acc0.sum += acc1.sum;
                        return acc0;
                    }
                });

public static class AverageAccumulator{
        int userId;
        int count;
        double sum;
    }

以下是部分輸出:

...
1> (44,3.0)
4> (96,0.5)
2> (51,0.5)
3> (90,2.75)
...

看上面的代碼,會發現add()函數特別生硬,因為我們想返回Tuple2<Integer, Double>類型,即Integer為key,但AggregateFunction似乎沒有提供這個機制可以讓AverageAccumulator的構造函數提供參數。所以,這里引入ProcessWindowFunction與AggregateFunction的結合版,AggregateFunction進行增量疊加,當窗口關閉時,ProcessWindowFunction將會被提供AggregateFunction返回的結果,進行Tuple封裝:

DataStream<Tuple2<Integer,Double>> Rates = rates
    .keyBy(MovieRate::getUserId)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());


public static class MyAggregateFunction implements AggregateFunction<MovieRate, AverageAccumulator, Double> {
    @Override
    public AverageAccumulator createAccumulator() {
        return new AverageAccumulator();
    }

    @Override
    public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
        acc.sum += movieRate.rate;
        acc.count++;
        return acc;
    }

    @Override
    public Double getResult(AverageAccumulator acc) {
        return  acc.sum/(double)acc.count;
    }

    @Override
    public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
        acc0.count += acc1.count;
        acc0.sum += acc1.sum;
        return acc0;
    }
}

public static class MyProcessWindowFunction extends
    ProcessWindowFunction<Double, Tuple2<Integer, Double>, Integer, TimeWindow> {

    @Override
    public void process(Integer key,
                        Context context,
                        Iterable<Double> results,
                        Collector<Tuple2<Integer, Double>> out) throws Exception {
        Double result = results.iterator().next();
        out.collect(new Tuple2<>(key, result));
    }
}

public static class AverageAccumulator{
    int count;
    double sum;
}

可以得到,結果與上面一樣,但代碼好看了很多。

四、其他操作

4.1 Triggers(觸發器)

觸發器定義了窗口何時准備好被窗口處理。每個窗口分配器默認都有一個觸發器,如果默認的觸發器不符合你的要求,就可以使用trigger(...)自定義觸發器。

通常來說,默認的觸發器適用於多種場景。例如,多有的event-time窗口分配器都有一個EventTimeTrigger作為默認觸發器。該觸發器在watermark通過窗口末尾時出發。

PS:GlobalWindow默認的觸發器時NeverTrigger,該觸發器從不出發,所以在使用GlobalWindow時必須自定義觸發器。

4.2 Evictors(驅逐器)

Evictors可以在觸發器觸發之后以及窗口函數被應用之前和/或之后可選擇的移除元素。使用Evictor可以防止預聚合,因為窗口的所有元素都必須在應用計算邏輯之前先傳給Evictor進行處理

4.3 Allowed Lateness

當使用event-time窗口時,元素可能會晚到,例如Flink用於跟蹤event-time進度的watermark已經超過了窗口的結束時間戳。

默認來說,當watermark超過窗口的末尾時,晚到的元素會被丟棄。但是flink也允許為窗口operator指定最大的allowed lateness,以至於可以容忍在徹底刪除元素之前依然接收晚到的元素,其默認值是0。

為了支持該功能,Flink會保持窗口的狀態,知道allowed lateness到期。一旦到期,flink會刪除窗口並刪除其狀態。

把晚到的元素當作side output。

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM