Flink的窗口處理機制(一)


一、為什么需要 window ?

在流處理應用中,數據是連續不斷的,即數據是沒有邊界的,因此我們不可能等到所有數據都到了才開始處理。當然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鍾內有多少用戶點擊了我們的網頁。在這種情況下,我們必須定義一個窗口,用來收集最近一分鍾內的數據,並對這個窗口內的數據進行計算。

流上的聚合需要由 window 來划定范圍,比如 “計算過去的5分鍾” ,或者 “最后100個元素的和” 。

window是一種可以把無限數據流切割為有限數據塊的手段。

Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。

而窗口(window)就是從 Streaming 到 Batch 的一個橋梁。

二、窗口的生命周期

窗口的生命周期,就是指窗口從創建、觸發執行、到銷毀的過程。

那么這個時候需要思考四個問題

1、數據元素是如何分配到對應窗口中的(也就是窗口的分配器)?

2、元素分配到對應窗口之后什么時候會觸發計算(也就是窗口的觸發器)?

3、在窗口內我們能夠進行什么樣的操作(也就是窗口內的操作)?

4、當窗口過期后是如何處理的(也就是窗口的銷毀關閉)?

其實這四個問題從大體上可以理解為窗口的整個生命周期過程。接下來我們對每個環節進行講解

創建:當屬於該窗口的第一個元素到達時就會創建該窗口

銷毀:當時間(事件或處理時間)超過窗口的結束時間戳加上用戶指定的允許延遲時間,窗口將被完全刪除。 Flink保證僅刪除基於時間的窗口而不是其他類型的窗口,例如全局窗口。

例如,使用基於事件時間的窗口策略,每5分鍾創建一個不重疊(或翻滾)的窗口並允許延遲1分鍾,當具有落入該間隔的時間戳的第一個元素到達時,Flink將為12:00到12:05之間的間隔創建一個新窗口,當水位線(watermark)到12:06時間戳時它將刪除它。【這里同時我們也可以明白watermark的作用】

Trigger觸發器:指定了窗口函數在什么條件下可被觸發,觸發器還可以決定在創建和刪除窗口之間的任何時間清除窗口的內容。在這種情況下,清除僅限於窗口中的元素,而不是窗口元數據。這意味着新數據仍然可以添加到該窗口中。

例如:當窗口中的元素個數超過4個時 或者 當水印達到窗口的邊界時―觸發計算

Window的函數:函數里定義了應用於窗口(Window)內容的計算邏輯

Evictor(驅逐器):將在觸發器觸發之后或者在函數被應用前后,清除窗口中的元素

三、Keyed vs Non-Keyed Windows

在定義窗口之前,首先要指定你的流是否應該被keyBy()分區,這個必須要窗口定義前確定。使用 keyBy(...) 后,不同的 key 會被划分到不同的流里面,每個流可以被一個單獨的 task 處理。而相同的key將會被分配到同一個keyed Stream,被同一個task處理。

如果 不使用 keyBy ,所有數據會被划分到一個窗口里,匯總到一個task處理,並行度是1.

PS:最大並行度=container個數 * 每個container上最大slot數

api調用如下:

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

四、Window Assigners 窗口指派器

數據經過控制流的處理之后,無論是keyed Stream還是Non-keyed Stream,兩種控制流都需要指定一個window Assinger,負責將每個傳入的元素分配給一個或多個窗口,有了window Assinger,才會創建出各種形式的window來覆蓋我們所需的各種場景,對我們開發來說不需要關注window本身,只需要關注Window Assinger的分類即可,所以很多關於Flink的視頻都沒有講解控制流的概念,只講了Window的分類。

Flink中的窗口從大體上划分有以下幾個大類:

第一種是基於時間划分的窗口,叫TimeWindow。(比如每30秒)

第二種是基於數據數量划分的窗口,叫CountWindow。(比如每100個元素)

第三種是全局窗口,不划分的。

還有就是自定義窗口類型。(通過繼承WindowAssigner類來實現自定義窗口分配器邏輯)

api介紹:

當input的Stream進行keyBy()之后,就會生成一個KeyedStream,而KeyedStream實現了timeWindow()、countWindow()、window()等方法。源碼如下圖:

clipboard

如果dataStream沒有經過keyBy(),就是Non-keyed Stream,就是原生的dataStream的話,其實它也可以調用窗口函數。api源碼如下:

clipboard

我們發現Non-keyed Stream相比keyed Stream,Window Assigner的調用方式上,只是多了個All。

下面先基於常用的KeyedStream來介紹常用的window Assigner

1、TimeWindow

TimeWindow按照時間來生成窗口。每個時間窗口都有一個開始時間和結束時間,表示一個左閉右開的時間段,表示了窗口的區間大小。

(編程技巧:可以通過TimeWindow對象的getStart()、getEnd()方法來獲取窗口的開始時間和結束時間的時間戳,也可以通過maxTimestamp() 方法來獲取窗口內的最大時間戳。)

根據不同的業務場景,Time Window 也可以分為三種類型,

分別是滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)

我們知道Flink中的時間類型可以划分為三種:

1、Event Time:事件時間,即事件產生的時間

2、IngestionTime:攝入時間,事件進入流處理系統的時間,也就是數據進入flink的時間

3、Processing Time:處理時間,消息被flink計算框架處理的時間

這里主要考慮事件時間和處理時間,所以上面的每種窗口又可分別基於processing time和event time。


首先,我們來查看TimeWindow的api,這個窗口指派器需要緊跟在數據流后面。它是KeyedStream下的方法。

方式一:直接使用KeyedStream下的timeWindow()方法。

里面接一個參數的就表示是滾動時間窗口,接兩個參數的就表示是滑動時間窗口。

在這里處理的是事件時間還是處理時間,取決於env設置的TimeCharacteristic參數。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

調用如下:

// Stream of (sensorId, carCnt )
val vehicleCnts: DataStream[(Int, Int)] = ...

// timeWindow后面接一個參數就表示是滾動時間窗口
val tumblingCnts: DataStream[ (Int, Int)] = vehicleCnts 
  // key stream by sensorId
  .keyBy(0)
  // tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
  // compute sum over carCnt
  .sum(1)

// timeWindow后面接兩個參數就表示是滑動時間窗口
val slidingCnts: DataStream[ (Int, Int)] = vehicleCnts
  .keyBy(0)
// sliding time window of 1 minute Length and 30 secs trigger interval 
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1 )

方式二:使用KeyedStream下的window()方法

需要在參數里指明使用哪種時間窗口類型。

這也是官方文檔指定的方式。

支持滾動窗口、滑動窗口、會話窗口和全局窗口。

inputStream.keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    
// window里面的窗口類型可以換成:
1、TumblingEventTimeWindows()    滾動事件時間窗口
2、TumblingProcessingTimeWindows()    滾動處理時間窗口
3、SlidingEventTimeWindows()          滑動事件時間窗口
4、SlidingProcessingTimeWindows()     滑動處理時間窗口
5、EventTimeSessionWindows()          事件時間會話窗口
6、ProcessingTimeSessionWindows()     處理時間會話窗口
7、GlobalWindows.create()             全局窗口


1.1、Tumbling Window(滾動窗口)

滾動窗口的概念:

  • 滾動窗口能將數據流切分成不重疊的窗口,每一個事件只能屬於一個窗口
  • 滾動窗具有固定的尺寸,不重疊。

滾動窗口的划分,可以基於時間戳來進行划分窗口,也可以基於到來的事件元素數量來划分窗口。

因為我們這里考慮的是TimeWindow,所以這里考慮基於時間戳來進行窗口划分。

例如,如果您指定了一個大小為5分鍾的滾動窗口,那么每5分鍾將會啟動一個新窗口,

如下圖:

clipboard

滾動時間窗口api。

方式一:直接使用.timeWindow()

// inputStream進行keyby后,調用.timeWindow方法,
// 滾動timeWindow里面就一個參數,指明每10秒划分一個時間窗口
keyedStream.timeWindow(Time.seconds(10));

注意:這種方式,如果需要按照處理時間划分窗口,需要在env指明TimeCharacteristic時間類型。

例如:

// 默認就是EventTime,ProcessingTime需要顯式指定
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

方式二:使用window()

使用window()的方式,就不需要在env里單獨指定TimeCharacteristic時間類型,因為在window()的參數里需要傳入指定的參數。

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// 這種方式可以指定窗口的對齊方式,
// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

如上段代碼中最后一個例子展示的那樣,tumbling window assigners包含一個可選的offset參數,我們可以用它來改變窗口的對齊方式。

如果我們指定了一個15分鍾的窗口,那么每個小時內,每個窗口的開始時間和結束時間為:

[00:00,00:15)

[00:15,00:30)

[00:30,00:45)

[00:45,01:00)

如果我們指定了一個5分鍾的offset,那么每個窗口的開始時間和結束時間為:

[00:05,00:20)

[00:20,00:35)

[00:35,00:50)

[00:50,01:05)

一個實際的應用場景是,我們可以使用 offset 使我們的時區以0時區為准。比如我們生活在中國,時區是 UTC+08:00,可以指定一個 Time.hour(-8),使時間以0時區為准。


滾動窗口適用場景:

適用場景:適合做每個時間段的聚合計算,BI分析。例如統計某頁面每分鍾點擊的pv。

場景1:我們需要統計每一分鍾中用戶購買的商品的總數,需要將用戶的行為事件按每一分鍾進行切分,這種切分被成為翻滾時間窗口(Tumbling Time Window)。


應用案例:

編寫代碼模擬:

下面代碼僅僅是模擬,每5秒划分一個窗口,打印輸出信息。跟上面購買商品的場景無關。

package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

/**
 * @author doublexi
 * @date 2021/10/30 11:37
 * @description 基於時間的滾動時間窗口
 * 1、獲取流數據源
 * 2、獲取窗口
 * 3、操作窗口數據
 * 4、輸出窗口數據
 */
public class WindowDemo {
    public static void main(String[] args) throws Exception {
        // 獲取數據源
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用匿名內部類的方式添加自定義數據源
        DataStreamSource<String> data = env.addSource(new SourceFunction<String>() {
            int count = 0;
            
            // 每1秒產生一個數字,拼接字符串作為數據源事件發送出去。
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(count + "號數據源");
                    count++;
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        // 對輸入的流的數據進行轉換封裝
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        // 為了增加並行度,進行keyBy聚合操作,相同key數據會進入同一個分區,給同一個subtask任務
        KeyedStream<Tuple3<String, String, String>, String> keyByed = maped.keyBy(value -> value.f0);
        
        // 2、獲取窗口
        // 基於時間驅動, 每5s割出一個窗口
        WindowedStream<Tuple3<String, String, String>, String, TimeWindow> timeWindow = keyByed.timeWindow(Time.seconds(5));
        // 基於事件驅動, 每相隔3個事件(即三個相同key的數據), 划分一個窗口進行計算
        // WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3);


        // 3、操作窗口數據
        // apply是窗口的應用函數,即apply里的函數將應用在此窗口的數據上。
        // 第一個參數Tuple3是窗口輸入進來的數據類型,第二個參數Object是輸出的數據類型,第三個參數String是數據源中key的數據類型,第四個參數指明當前處理的窗口是什么類型的窗口
        SingleOutputStreamOperator<String> applyed = timeWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, TimeWindow>() {
            // s就是上面一行一行的數據源,window代表當前窗口,
            // 一個窗口中數據源可能是相同的,根據keyBy分組的,如果有兩個數據源相同,就會放入這個input迭代器里,
            // out將處理結果往外發送
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                // new 一個StringBuilder去做字符串拼接
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    // 這個next就是一個一個Tuple3數據
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);
                }
                // 拼接輸出的信息,
                String s1 = s + "..." + window.getStart() + "..." + sb;
                out.collect(s1);
            }
        });

        applyed.print();
        
        // 轉換算子都是lazy init的, 最后要顯式調用 執行程序
        env.execute();

    }
}

上面timeWindow.apply()方法里面是使用匿名內部類的方式,實現WindowFunction接口。

我們也可以通過自定義類實現WindowFunction方式都可以。

運行結果:

因為我們將時間窗口設置為5s,所以它是隔一段時間輸出一次。

輸出內容中:

這里第一個就是s數據源本身,

第二個就是window.getStart(),窗口的開始時間,

第三個字段數據就是input里的Tuple3里的第一個參數value,數據源本身,

第四個字段數據就是處理時間,當時用的system.currentTimemills,

第五個字段數據就是一個5以內的隨機數

window.getStart()時間相同,表示它是同一個窗口里的數據。

2021-10-30 12:29:15.295數據處理時間不一樣,因為它是屬於不同的任務槽,它是並發執行的,哪個任務槽先處理完就先輸出哪個。

5> 0號數據源...1635568150000...0號數據源...2021-10-30 12:29:14.395...1


6> 1號數據源...1635568155000...1號數據源...2021-10-30 12:29:15.295...0
8> 3號數據源...1635568155000...3號數據源...2021-10-30 12:29:17.308...2
4> 2號數據源...1635568155000...2號數據源...2021-10-30 12:29:16.302...0
1> 4號數據源...1635568155000...4號數據源...2021-10-30 12:29:18.315...0
1> 5號數據源...1635568155000...5號數據源...2021-10-30 12:29:19.322...1


8> 8號數據源...1635568160000...8號數據源...2021-10-30 12:29:22.342...1
2> 6號數據源...1635568160000...6號數據源...2021-10-30 12:29:20.329...4
2> 10號數據源...1635568160000...10號數據源...2021-10-30 12:29:24.355...0
5> 9號數據源...1635568160000...9號數據源...2021-10-30 12:29:23.349...2
4> 7號數據源...1635568160000...7號數據源...2021-10-30 12:29:21.334...0


8> 13號數據源...1635568165000...13號數據源...2021-10-30 12:29:27.377...4
5> 12號數據源...1635568165000...12號數據源...2021-10-30 12:29:26.369...1
6> 14號數據源...1635568165000...14號數據源...2021-10-30 12:29:28.384...4
4> 11號數據源...1635568165000...11號數據源...2021-10-30 12:29:25.361...0
4> 15號數據源...1635568165000...15號數據源...2021-10-30 12:29:29.388...1


4> 18號數據源...1635568170000...18號數據源...2021-10-30 12:29:32.306...1
3> 16號數據源...1635568170000...16號數據源...2021-10-30 12:29:30.395...3
6> 17號數據源...1635568170000...17號數據源...2021-10-30 12:29:31.301...0
3> 20號數據源...1635568170000...20號數據源...2021-10-30 12:29:34.320...3
3> 19號數據源...1635568170000...19號數據源...2021-10-30 12:29:33.313...0

Process finished with exit code -1


1.2、Sliding Window(滑動窗口)

滑動窗口的概念:

滑動窗口的划分同滾動一樣,可以基於時間戳來進行划分窗口,也可以基於到來的事件元素數量來划分窗口。因為我們這里考慮的是TimeWindow,所以這里考慮基於時間戳來進行滑動窗口划分。


概念:

滑動窗口也是一種比較常見的窗口類型,其特點是在滾動窗口基礎之上增加了窗口滑動時間(Slide Time),且允許窗口數據發生重疊。

當 Windows size 固定之后,窗口並不像滾動窗口按照 Windows Size 向前移動,而是根據設定的 Slide Time 向前滑動。

窗口之間的數據重疊大小根據 Windows size 和 Slide time 決定,

  • 當 Slide time 小於 Windows size便會發生窗口重疊,
  • Slide size 大於 Windows size 就會出現窗口不連續,數據可能不能在任何一個窗口內計算,
  • Slide size 和 Windows size 相等時,Sliding Windows 其實就是Tumbling Windows。


滑動窗口是滾動窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成

特點:

  • 窗口長度固定,可以有重疊,可以有空隙。
  • 一個元素可以對應多個窗口,也可以不屬於任意一個窗口,看slide size而定。

比如下圖這樣,設置了一個10分鍾大小的滑動窗口,它的滑動參數(slide)為5分鍾。這樣的話,每5分鍾將會創建一個新的窗口,並且這個窗口中包含了一部分來自上一個窗口的元素。

clipboard

基於時間的滑動窗口

場景: 我們可以每30秒計算一次最近一分鍾用戶購買的商品總數。

基於事件的滑動窗口

場景: 每10個 “相同”元素計算一次最近100個元素的總和.


滑動窗口適用場景:

適用場景:對最近一段時間段內進行統計(如某接口近幾分鍾的失敗調用率)

比如:每隔3秒計算最近5秒內,每個基站的日志數量

每30秒計算一次最近一分鍾用戶購買的商品總數。


滑動時間窗口調用api:

也分為timeWindow()和window()兩種調用方式。

方式一:直接使用.timeWindow()

// inputStream進行keyby后,調用.timeWindow方法,
// 滑動timeWindow里面比滾動多一個參數,窗口滑動間隔slide time
// 增加了一個Time.seconds(2),表示一個步長,向右滑動2秒后,生成一個新的窗口。
keyedStream.timeWindow(Time.seconds(5), Time.seconds(2));

注意:這種方式,如果需要按照處理時間划分窗口,也需要在env指明TimeCharacteristic時間類型。

例如:

// 默認就是EventTime,ProcessingTime需要顯式指定
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

方式二:使用window()

使用window()的方式,就不需要在env里單獨指定TimeCharacteristic時間類型,因為在window()的參數里需要傳入指定的參數。

DataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

同樣,我們可以通過offset參數來為窗口設置偏移量。


應用案例:

這里根據1.1的滾動時間窗口案例改編,數據源、計算邏輯都不變,只是單純的增加了一個滑動時間間隔,就變成了滑動時間窗口了。

下面是每5秒划分一個窗口間隔,滑動間隔為2秒。

keyByed.timeWindow(Time.seconds(5), Time.seconds(2));

意思就是每2秒統計一下最近5秒內的數據情況,我們這里直接打印輸出了。

完整代碼如下:

package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

/**
 * @author doublexi
 * @date 2021/10/30 11:37
 * @description 基於時間的滑動時間窗口
 * 1、獲取流數據源
 * 2、獲取窗口
 * 3、操作窗口數據
 * 4、輸出窗口數據
 */
public class WindowDemoSlide {
    public static void main(String[] args) throws Exception {
        // 獲取數據源
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.addSource(new SourceFunction<String>() {
            int count = 0;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(count + "號數據源");
                    count++;
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 2、獲取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        // 為了增加並行度,進行keyBy聚合操作,相同key數據會進入同一個分區,給同一個subtask任務
        KeyedStream<Tuple3<String, String, String>, String> keyByed = maped.keyBy(value -> value.f0);
        // 每5s割出一個窗口,並且每2秒向前移動,滑動間隔為2s。
        WindowedStream<Tuple3<String, String, String>, String, TimeWindow> timeWindow = keyByed.timeWindow(Time.seconds(5), Time.seconds(2));

        // 3、操作窗口數據
        // 第一個參數Tuple3是窗口輸入進來的數據類型,第二個參數Object是輸出的數據類型,第三個參數String是數據源中key的數據類型,第四個參數指明當前處理的窗口是什么類型的窗口
        SingleOutputStreamOperator<String> applyed = timeWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, TimeWindow>() {
            // s就是上面一行一行的數據源,window代表當前窗口,
            // 一個窗口中數據源可能是相同的,根據keyBy分組的,如果有兩個數據源相同,就會放入這個input迭代器里,
            // out將處理結果往外發送
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                // new 一個StringBuilder去做字符串拼接
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    // 這個next就是一個一個Tuple3數據
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + "..." + next.f1 + "..." + next.f2);
                }
                // 拼接輸出的信息,
                String s1 = s + "..." + sdf.format(window.getStart()) + "..." + sdf.format(window.getEnd()) + "..." + sb;
                out.collect(s1);
            }
        });

        applyed.print();
        env.execute();

    }
}

運行結果如下:

6> 1號數據源...2021-10-30 14:07:54.000...2021-10-30 14:07:59.000...1號數據源...2021-10-30 14:07:58.695...3
5> 0號數據源...2021-10-30 14:07:54.000...2021-10-30 14:07:59.000...0號數據源...2021-10-30 14:07:57.694...4


6> 1號數據源...2021-10-30 14:07:56.000...2021-10-30 14:08:01.000...1號數據源...2021-10-30 14:07:58.695...3
4> 2號數據源...2021-10-30 14:07:56.000...2021-10-30 14:08:01.000...2號數據源...2021-10-30 14:07:59.700...4
5> 0號數據源...2021-10-30 14:07:56.000...2021-10-30 14:08:01.000...0號數據源...2021-10-30 14:07:57.694...4
8> 3號數據源...2021-10-30 14:07:56.000...2021-10-30 14:08:01.000...3號數據源...2021-10-30 14:08:00.706...2


4> 2號數據源...2021-10-30 14:07:58.000...2021-10-30 14:08:03.000...2號數據源...2021-10-30 14:07:59.700...4
1> 4號數據源...2021-10-30 14:07:58.000...2021-10-30 14:08:03.000...4號數據源...2021-10-30 14:08:01.711...3
8> 3號數據源...2021-10-30 14:07:58.000...2021-10-30 14:08:03.000...3號數據源...2021-10-30 14:08:00.706...2
6> 1號數據源...2021-10-30 14:07:58.000...2021-10-30 14:08:03.000...1號數據源...2021-10-30 14:07:58.695...3
1> 5號數據源...2021-10-30 14:07:58.000...2021-10-30 14:08:03.000...5號數據源...2021-10-30 14:08:02.715...4


4> 7號數據源...2021-10-30 14:08:00.000...2021-10-30 14:08:05.000...7號數據源...2021-10-30 14:08:04.726...2
1> 5號數據源...2021-10-30 14:08:00.000...2021-10-30 14:08:05.000...5號數據源...2021-10-30 14:08:02.715...4
2> 6號數據源...2021-10-30 14:08:00.000...2021-10-30 14:08:05.000...6號數據源...2021-10-30 14:08:03.721...0
8> 3號數據源...2021-10-30 14:08:00.000...2021-10-30 14:08:05.000...3號數據源...2021-10-30 14:08:00.706...2
1> 4號數據源...2021-10-30 14:08:00.000...2021-10-30 14:08:05.000...4號數據源...2021-10-30 14:08:01.711...3


5> 9號數據源...2021-10-30 14:08:02.000...2021-10-30 14:08:07.000...9號數據源...2021-10-30 14:08:06.738...4
2> 6號數據源...2021-10-30 14:08:02.000...2021-10-30 14:08:07.000...6號數據源...2021-10-30 14:08:03.721...0
4> 7號數據源...2021-10-30 14:08:02.000...2021-10-30 14:08:07.000...7號數據源...2021-10-30 14:08:04.726...2
8> 8號數據源...2021-10-30 14:08:02.000...2021-10-30 14:08:07.000...8號數據源...2021-10-30 14:08:05.732...1
1> 5號數據源...2021-10-30 14:08:02.000...2021-10-30 14:08:07.000...5號數據源...2021-10-30 14:08:02.715...4

觀察結果會發現:

我們這里相比滾動時間窗口,便於觀察,我們增加了窗口的結束時間的打印。

並且窗口的開始時間與結束時間都不再使用時間戳,使用sdf格式化,轉成了年月日時分秒的格式。

同一個窗口的開始時間都是一樣的,不同窗口之間的滑動間隔,步長為2秒,並且同一個窗口內的時間仍然是5秒。

因為滑動間隔小於窗口大小,我們會發現有些數據會出現在多個窗口上。


1.3、Session Window (會話窗口)

會話窗口的概念:

會話窗口(Session Windows)主要是將某段時間內活躍度較高的數據聚合成一個窗口進行計算,窗口的觸發的條件是 Session Gap,是指在規定的時間內如果沒有數據活躍接入,則認為窗口結束,然后觸發窗口計算結果。

需要注意的是如果數據一直不間斷地進入窗口,也會導致窗口始終不觸發的情況。

與滑動窗口、滾動窗口不同的是,Session Windows 不需要有固定 windows size 和 slide time,只需要定義 session gap,來規定不活躍數據的時間上限即可。

特點:

  • 會話窗口根據會話的間隔來把數據分配到不同的窗口。
  • 會話窗口不重疊,沒有固定的開始時間和結束時間。
  • 與翻滾窗口和滑動窗口相反, 當會話窗口在一段時間內(session gap)沒有接收到元素時會關閉會話窗口。后續的元素將會被分配給新的會話窗口

如下圖所示:

clipboard

會話窗口就是根據上圖中的session gap來切分不同的窗口,當一個窗口在大於session gap時間內沒有接收到數據,窗口就會關閉,所以在這種模式下,窗口的長度是可變的,開始和結束時間也是不確定的,唯獨可以設置定長的session gap.

該類窗口的特點:

  • 時間無對齊
  • 當前系統時間-分組內最后一次的時間如果超時,則進行觸發計算

會話窗口分配器可以直接配置一個靜態常量會話間隔,也可以通過函數來動態指定會話間隔時間。

我們可以設置定長的Session gap,也可以使用SessionWindowTimeGapExtractor動態地確定Session gap的長度。


適用場景:

在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續活躍的周期),由非活躍的間隙分隔開。

場景一:如上圖所示,就是需要計算每個用戶在活躍期間總共購買的商品數量,如果用戶30秒沒有活動則視為會話斷開(假設raw data stream是單個用戶的購買行為流)。

場景二:3秒內如果沒有數據進入,則計算每個基站的日志數量

場景三:比如音樂 app 聽歌的場景,我們想統計一個用戶在一個獨立的 session 中聽了多久的歌曲(如果超過15分鍾沒聽歌,那么就是一個新的 session 了)

我們可以用 spark Streaming ,每一個小時進行一次批處理,計算用戶session的數據分布,但是 spark Streaming 沒有內置對 session 的支持,我們只能手工寫代碼來維護每個 user 的 session 狀態,里面仍然會有諸多的問題。

我們使用 flink 來解決這個問題

(1)讀取 kafka 中的數據

(2)基於用戶的 userId,設置 一個 session window 的 gap,在同一個session window 中的數據表示用戶活躍的區間

(3)最后使用一個自定義的 window Function

參考:https://cloud.tencent.com/developer/article/1539537


會話窗口api調用:

這里沒有像timeWindow()類似的直接的api,要通過window方法指定窗口指派器的方式生成sessionWindow。

方式如下:

// 獲取Session窗口
// 這里沒有像TimeWindow類似的直接的api,要通過window方法指定窗口指派器的方式生成sessionWindow
// 這里會話窗口間隔為10s
WindowedStream<String, String, TimeWindow> sessionWindow = keyByed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));

我們仔細研究TimeWindow的源碼,會發現,其實TimeWindow的本質也是通過這種方式去生成一個TimeWindow窗口的。

注意:這里的時間也分為處理時間(ProcessingTime)和事件時間(EventTime)。

clipboard

window方法,也就是將數據流放到WindowedStream里,里面包含的都是一些根據key進行分組的數據,

元素是根據windowAssigner來往里面放的。

clipboard

WindowAssigner就是指派0個或多個window給到元素。我們將哪些元素放到哪個window當中。

窗口指派器是指以怎樣的規則將元素發給哪個window,哪些規則也就是窗口要包含哪些元素。

並且這些元素還都是根據key分組好的元素。

clipboard

我們會發現withGap就是創建了一個新的SessionWindows的WindowAssigner。

clipboard

參照官網,api調用方式總結如下:

主要分EventTime與ProcessingTime,定長gap與不定長gap。

DataStream<T> input = ...;

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);
// 或者這種方式也行:
// event-time session windows with dynamic gap
input
    .keyBy(...)
    .window(DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
      override def extract(element: T): Long = {
        // determine and return session gap
      }
    }))
    .<window function>(...)



// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);
// 動態的這種類也行
// processing-time session windows with dynamic gap
input
    .keyBy(...)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
      override def extract(element: T): Long = {
        // determine and return session gap
      }
    }))
    .<window function>(...)    

如上,固定大小的會話間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)來指定,動態會話間隔通過實現SessionWindowTimeGapExtractor接口來指定。

注意:由於會話窗口沒有固定的開始結束時間,它的計算方法與滾動窗口、滑動窗口有所不同。在一個會話窗口算子內部會為每一個接收到的元素創建一個新的窗口,如果這些元素之間的時間間隔小於定義的會話窗口間隔,則將阿門合並到一個窗口。為了能夠進行窗口合並,我們需要為會話窗口定義一個Tigger函數和Window Function函數(例如ReduceFunction, AggregateFunction, or ProcessWindowFunction. FoldFunction不能用於合並)。


應用案例:

模擬案例:

這里數據源為:通過nc每秒發送一個數字1,如果10秒內沒有收到數字,則視為會話斷開,統計上個窗口里的所有數字1,拼接為一個字符串。

案例代碼如下:

package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author doublexi
 * @date 2021/10/30 14:19
 * @description 基於會話的窗口
 */
public class WindowDemoSession {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator<String> maped = data.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                // 這里對數據基本沒處理,原模原樣傳出
                return value;
            }
        });

        // 這里指定根據value自身來進行聚合
        KeyedStream<String, String> keyByed = maped.keyBy(value -> value);
        // 獲取Session窗口
        // 這里沒有像TimeWindow類似的直接的api,要通過window方法指定窗口指派器的方式生成sessionWindow
        // 這里是以當前事件處理時間為會話窗口開始時間,間隔為10s,形成一個Session窗口
        WindowedStream<String, String, TimeWindow> sessionWindow = keyByed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        // 第一個參數為輸入數據的類型,第二個參數為輸出數據的類型,第三個參數為key的數據類型,第四個參數為窗口類型,發現這里也是時間窗口
        SingleOutputStreamOperator<String> applyed = sessionWindow.apply(new WindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                for (String str : input) {
                    sb.append(str);
                }
                out.collect(sb.toString());
            }
        });

        applyed.print();
        env.execute();
    }
}

啟動nc:輸入數據源

[root@linux121 ~]# nc -lp 7777
1
1
1

運行flink程序:

數據源輸入完畢后,等待10s,也就是等待結束這次會話,然后就會看到SessionWindow觸發執行了,打印結果,輸出結果如下:

4> 111

如果在這個session gap內,也就是連續10秒,都沒有接收到新元素,則會關閉上一個窗口,觸發窗口計算。


2、CountWindow (計數窗口)

CountWindow是根據到來的元素的個數來生成窗口的。與時間無關。

CountWindow也分滾動窗口(Tumbling Window)和滑動窗口(Sliding Window)

這里是根據事件數量來划分的,所以也可以稱為滾動計數窗口,和滑動計數窗口。


CountWindow沒有像時間窗口那樣豐富的api調用。

這里主要就是使用.countWindow()這一個api,根據參數的不同來設定不同的指派器。


2.1、Tumbling Window(滾動計數窗口)

滾動窗口的概念:

  • 滾動窗口能將數據流切分成不重疊的窗口,每一個事件只能屬於一個窗口
  • 滾動窗具有固定的尺寸,不重疊。

我們這里是基於元素數量來划分的。

clipboard

滾動計數窗口的api:

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)

適用場景:

當我們想要每100個用戶購買行為事件統計購買總數,那么每當窗口中填滿100個元素了,就會對窗口進行計算,這種窗口我們稱之為翻滾計數窗口(Tumbling Count Window)

單詞每出現三次統計一次,統計最近三次的數據?


應用案例:

這是一個模擬的案例。

輸入數據源是通過nc進行輸入數據的,通過socketTextStream監聽nc的數據源。

nc上會輸入一些數字,當接收到3個相同的數字之后,就會觸發window關閉,開始window的計算。

這里的窗口函數主要是將窗口中的數據源進行拼接打印輸出。

代碼如下:

基於事件(數據源數量)的滾動計數窗口:

package com.lagou.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Random;

/**
 * @author doublexi
 * @date 2021/10/30 13:36
 * @description 基於事件(數據源數量)的滾動窗口
 */
public class WindowDemoCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("linux121", 7777);
        // 2、獲取窗口
        SingleOutputStreamOperator<Tuple3<String, String, String>> maped = data.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long l = System.currentTimeMillis();
                String dataTime = sdf.format(l);
                Random random = new Random();
                int randomNum = random.nextInt(5);
                return new Tuple3<>(value, dataTime, String.valueOf(randomNum));
            }
        });
        // 為了增加並行度,進行keyBy聚合操作,相同key數據會進入同一個分區,給同一個subtask任務
        KeyedStream<Tuple3<String, String, String>, String> keyByed = maped.keyBy(value -> value.f0);
        // 根據事件數量去划分窗口,每3個數據源划分為一個窗口
        WindowedStream<Tuple3<String, String, String>, String, GlobalWindow> countWindow = keyByed.countWindow(3);

        // 3、操作窗口數據
        // 第一個參數Tuple3是窗口輸入進來的數據類型,第二個參數Object是輸出的數據類型,第三個參數String是數據源中key的數據類型,第四個參數指明當前處理的窗口是什么類型的窗口,這里是GlobalWindow
        // 這里的GlobalWindow沒有太多的操作接口,無法獲取window相關信息,所以我們就不拿了
        SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {
            @Override
            public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {
                Iterator<Tuple3<String, String, String>> iterator = input.iterator();
                StringBuilder sb = new StringBuilder();
                while (iterator.hasNext()) {
                    Tuple3<String, String, String> next = iterator.next();
                    sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);
                }
                out.collect(sb.toString());
            }
        });

        applyed.print();
        env.execute();
    }
}

啟動nc:

[root@linux121 ~]# nc -lp 7777

運行程序,觀察輸出:

在nc上輸入數據

[root@linux121 ~]# nc -lp 7777
1
2
3
4
5
6
1
1

程序輸出結果:

4> 1..2021-10-30 13:50:19.178..11..2021-10-30 13:50:28.126..31..2021-10-30 13:50:28.729..2

我們發現輸入123456后,都沒有輸出,直到遇上了第三個1,才有一個輸出結果。那是因為這是根據事件的滾動窗口,我們上面設置了3個數據源才會划分一個窗口。

上面的keyBy是將相同的key的數據源交給同一個任務槽去執行。

窗口機制里調用了這個keyBy,相同的key就會調用到相同的槽,同一個槽里又進行了countWindow操作,就是在這一個槽里開啟了窗口。

因為進行了keyBy分組,就會把123456分發到不同的任務槽里,每一個數字都處於單獨的任務槽。

1任務槽里感知到了有3個數據源后,3個1,就會去觸發執行window里的操作,就會打印。


2.2、Sliding Window(滑動計數窗口)

滑動窗口的概念:

因為我們這里考慮的是基於元素的數量來進行滑動窗口划分。

概念:

滑動窗口也是一種比較常見的窗口類型,其特點是在滾動窗口基礎之上增加了窗口滑動間隔(Slide size),且允許窗口數據發生重疊。

當 Windows size 固定之后,窗口並不像滾動窗口按照 Windows Size 向前移動,而是根據設定的 Slide size 向前滑動。

窗口之間的數據重疊大小根據 Windows size 和 Slide size 決定,

  • 當 Slide size 小於 Windows size便會發生窗口重疊,
  • Slide size 大於 Windows size 就會出現窗口不連續,數據可能不能在任何一個窗口內計算,
  • Slide size 和 Windows size 相等時,Sliding Windows 其實就是Tumbling Windows。

如下圖:

clipboard

滑動計數窗口的適用場景:(關鍵詞:最近)

例如計算每10個元素計算一次最近100個元素的總和,

每隔5s計算一下最近10s的數據


滑動計數窗口的api:

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)


應用案例:

基於事件的滑動計數窗口:

也很簡單,在之前滾動計數窗口代碼的基礎上稍加改動即可。

# 根據事件源數量來設置窗口,這里設置步長為1
keyByed.countWindow(3, 1);

開啟一個nc:

[root@linux121 ~]# nc -lp 7777
1
1
1
1

啟動程序,查看運行結果:

4> 1..2021-10-30 14:14:25.243..2
4> 1..2021-10-30 14:14:25.243..21..2021-10-30 14:14:26.547..3
4> 1..2021-10-30 14:14:25.243..21..2021-10-30 14:14:26.547..31..2021-10-30 14:14:27.956..1
4> 1..2021-10-30 14:14:26.547..31..2021-10-30 14:14:27.956..11..2021-10-30 14:14:29.364..4

這里我們發現,每輸入一個1就會輸出一條數據,

因為它的步長為1,來一個元素后就會向右滑動,形成一個新的窗口。


3、GlobalWindows (全局窗口)

概念介紹:

全局窗口分配器會將具有相同key值的所有元素分配在同一個窗口。這種窗口模式下需要我們設置一個自定義的Trigger,否則將不會執行任何計算,這是因為全局窗口中沒有一個可以處理聚合元素的自然末端。所有相同keyed的元素分配到一個窗口里,這種窗口很少使用。

clipboard

適用場景:

全局窗口的應用場景幾乎是沒有的。


全局窗口的api調用:

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

應用案例:

引用自:https://blog.csdn.net/weixin_45764675/article/details/104818931

package com.baizhi.jsy.windowProcessTime
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, ProcessingTimeSessionWindows, SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
import org.apache.flink.util.Collector
object FlinkWindowProcessGlobal   {
  def main(args: Array[String]): Unit = {
    //1.創建流計算執⾏行行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.創建DataStream - 細化
    val text = env.socketTextStream("Centos",9999)
    //3.執⾏行行DataStream的轉換算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(word=>(word._1))
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of(4))
      .apply(new UserDefineGlobalWindowFunction)
      .print()
    //5.執⾏行行流計算任務
    env.execute("Tumbling Window Stream WordCount")
  }
}
class UserDefineGlobalWindowFunction extends WindowFunction[(String,Int),(String,Int),String,GlobalWindow]{
  override def apply(key: String,
                     window: GlobalWindow,
                     input: Iterable[(String, Int)],
                     out: Collector[(String, Int)]): Unit = {
    val sum = input.map(_._2).sum
    out.collect((s"${key}",sum))
  }
}

輸出結果:

clipboard

GlobalWindow與GlobalWindows的區別:

注意:直接使用GlobalWindows指派器的場景很少,幾乎沒有。但是我們卻經常在窗口實現函數里看到GlobalWindow。經常容易看混淆。注意,它們是不一樣的。

GlobalWindow是一種窗口類型,GlobalWindows是一種窗口指派器。

GlobalWindow:

首先,GlobalWindow繼承自Window,它是一種窗口類型。

clipboard

同樣繼承自Window的有GlobalWindow和TimeWindow

clipboard

GlobalWindow與TimeWindow它們都繼承了父類的maxTimeStamp()方法。

它的maxTimestamp方法與TimeWindow不同,TimeWindow有start和end屬性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定義了自己的Serializer

GlobalWindows

GlobalWindows是一種窗口指派器。

clipboard

  • GlobalWindows繼承了WindowAssigner,key類型為Object,窗口類型為GlobalWindow
  • GlobalWindows的assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的為false
  • NeverTrigger繼承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均為TriggerResult.CONTINUE;該行為就是不做任何觸發操作;如果需要觸發操作,則需要在定義window操作時設置自定義的trigger,覆蓋GlobalWindows默認的NeverTrigger

4、Non-keyed Window

當你的stream過來之后,第一件事需要明確的是你的stream需要keyed或者不需要。這個必須要窗口定義前確定。使用keyBy(...)將會把你的無盡的stream切割成邏輯的keyed stream。比如 keyBy(...)沒有被調用,你的stream將不會keyed。

在已經keyed stream中,你寫進來的事件任意屬性attribute可以使用key。由於使用了keyed stream可以允許你的windowd 計算在並行的多任務的模式下運行,每一個邏輯的keyed stream可以相互獨立的運行而相互沒有影響。所有具有相同key的元素會被發射到相同的並行任務上執行。

如果在non-keyed streams中,你原有的stream不會分割成不同的邏輯stream並且所有的window邏輯只會執行在一個單獨的任務上使用並發度為1。(也就說所有的數據會匯總到一個task上執行)

對於KeyedStream,我們直接按照上面1、2、3的方式去調用api就可以了。

注意:

1、Non-keyed Stream都有windowAll()窗口函數

當一個dataStream經過keyBy()之后,就會形成一個KeyedStream,keyedStream后面可以接着調用窗口等函數。api類似如下:

clipboard

里面就是我們上面1、2、3的方式去使用窗口指派器。

如果dataStream沒有經過keyBy(),就是Non-keyed Stream,就是原生的dataStream的話,其實它也可以調用窗口函數。api如下:

clipboard

我們發現Non-keyed Stream相比keyed Stream,Window Assigner的調用方式上,只是多了個All。

因為KeyedStream是並行任務,根據key的不同,會有不同的task在並行執行。

相同的key的元素會划分到同一個task上執行。

而Non-keyed Stream不會划分,只有一個單獨的任務,並行度為1,所有的數據會匯總到一個task上執行,所以Non-keyed Stream的窗口api都是帶All的,因為它們要處理所有的數據元素。

注意:這里和KeyedStream后的GlobalWindow是不一樣的,前者是對分完區后,同一個task上的數據的global。而后者Non-keyed Stream是不分區的,針對所有的元素。


2、Non-keyed Stream也可以划分為滾動窗口、滑動窗口。

Non-keyed Stream上也有timeWindowAll、countWindowAll、windowAll的方法調用。

它們也可以實現滾動時間窗口、滑動時間窗口、滾動計數窗口、滑動計數窗口,以及自己指定窗口指派器。不同的是,它是非並行的,所有的元素都會經過同一個算子。

源碼如下:

clipboard

窗口指派器總結如下圖:

clipboard

參考引用:

官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

https://segmentfault.com/a/1190000022106275

https://blog.csdn.net/qq_28680977/article/details/113531672


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM