Flink Streaming基於滾動窗口的事件時間分析


  使用flink-1.9.0進行的測試,在不同的並行度下,Flink對事件時間的處理邏輯不同。包括1.1在並行度為1的本地模式分析和1.2在多並行度的本地模式分析兩部分。通過理論結合源碼進行驗證,得到具有說服力的結論。

 

一、使用並行度為1的本地模式測試

1.1、Flink時間時間窗口代碼,使用SocketSource:

 1 package com.mengyao.flink.stream.window;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.ArrayList;
 5 import java.util.List;
 6 
 7 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 8 import org.apache.flink.api.common.typeinfo.Types;
 9 import org.apache.flink.api.java.tuple.Tuple;
10 import org.apache.flink.api.java.tuple.Tuple4;
11 import org.apache.flink.configuration.ConfigConstants;
12 import org.apache.flink.configuration.Configuration;
13 import org.apache.flink.configuration.RestOptions;
14 import org.apache.flink.streaming.api.TimeCharacteristic;
15 import org.apache.flink.streaming.api.datastream.DataStream;
16 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
17 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
18 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
19 import org.apache.flink.streaming.api.watermark.Watermark;
20 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
21 import org.apache.flink.streaming.api.windowing.time.Time;
22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
23 import org.apache.flink.util.Collector;
24 
25 import com.mengyao.flink.stream.utils.DateUtil;
26 
27 /**
28  * 啟動netcat:nc -L -p 9999 -v
29  * 
30  * Created by: mengyao
31  * 2019年10月15日
32  */
33 public class SocketEventTimeWindowApp {
34 
35     private static String jobName = SocketEventTimeWindowApp.class.getSimpleName();
36     
37     
38     public static void main(String[] args) throws Exception {
39         Configuration conf = new Configuration();
40         conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
41         conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
42         final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
43         // 設置重啟策略,5次嘗試,每次嘗試的間隔為30秒
44         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 30000));
45         // 使用事件時間
46         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
47         // 數據處理
48         DataStream<Tuple4<String, String, String, Long>> inputDS = env.socketTextStream("localhost", 9999)
49             .map(line -> {
50                 String[] fields = line.split(",",3);
51                 return Tuple4.of(fields[0], fields[1], fields[2], DateUtil.FMT05.get().parse((fields[2])).getTime());
52             })
53             .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG))
54             .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, Long>>() {// 分配時間戳並定期生成水印
55                 private static final long serialVersionUID = -4195773369390603522L;
56                 private SimpleDateFormat formatter = DateUtil.FMT15.get();
57                 long currentMaxTimestamp = 0L;
58                 long maxOutOfOrderness = 0L;//允許最大的亂序時間是0秒
59                 @Override
60                 public long extractTimestamp(Tuple4<String, String, String, Long> ele, long prevEleTs) {
61                     long timestamp = ele.f3;
62                     currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
63                     System.out.println("事件: "+ele+", 最大時間戳:"+DateUtil.ts2DateStr(currentMaxTimestamp, formatter)+", 水印時間戳:"+DateUtil.ts2DateStr(getCurrentWatermark().getTimestamp(), formatter));
64                     return timestamp;
65                 }
66                 @Override
67                 public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
68             });
69         
70         inputDS
71             .keyBy(0)
72             .window(TumblingEventTimeWindows.of(Time.seconds(2)))// 使用滾動窗口
73             .apply(new WindowFunction<Tuple4<String,String,String,Long>, String, Tuple, TimeWindow>() {
74                 private static final long serialVersionUID = -4990083905742822422L;
75                 private SimpleDateFormat formatter = DateUtil.FMT15.get();
76                 @Override
77                 public void apply(Tuple key, TimeWindow window, Iterable<Tuple4<String, String, String, Long>> input,
78                         Collector<String> out) throws Exception {
79                     // 按照事件時間升序排序
80                     List<Tuple4<String, String, String, Long>> list = new ArrayList<>();
81                     input.forEach(t4->list.add(t4));
82                     list.sort((e1,e2)->e1.f3.compareTo(e2.f3));
83                     System.out.println("==== "+key+", 窗口開始:"+DateUtil.ts2DateStr(window.getStart(), formatter)+",窗口結束:"+DateUtil.ts2DateStr(window.getEnd(), formatter)+"; 窗口內的數據:"+list);
84                 }
85             })
86             .print();
87         
88         env.execute(jobName);
89     }
90     
91 }

 

1.2、使用netcat啟動SocketServer,發送數據到FlinkStreaming中(數據是有序的情況下)

C:\Users\mengyao>nc -l -p 9999 -v
listening on [any] 9999 ...
connect to [127.0.0.1] from DESKTOP-H7J35OJ [127.0.0.1] 63187
1,1,20190101090000000
1,1,20190101090001000
1,1,20190101090001999
1,1,20190101090002000
1,1,20190101090003000
1,1,20190101090004000
1,1,20190101090005000
1,1,20190101090005500
1,1,20190101090007000

 

1.3、程序控制台輸出:

  log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  事件: (1,1,20190101090000000,1546304400000), 最大時間戳:2019-01-01 09:00:00.000, 水印時間戳:2019-01-01 09:00:00.000
  事件: (1,1,20190101090001000,1546304401000), 最大時間戳:2019-01-01 09:00:01.000, 水印時間戳:2019-01-01 09:00:01.000
  事件: (1,1,20190101090001999,1546304401999), 最大時間戳:2019-01-01 09:00:01.999, 水印時間戳:2019-01-01 09:00:01.999
  ==== (1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999)]
  事件: (1,1,20190101090002000,1546304402000), 最大時間戳:2019-01-01 09:00:02.000, 水印時間戳:2019-01-01 09:00:02.000
  事件: (1,1,20190101090003000,1546304403000), 最大時間戳:2019-01-01 09:00:03.000, 水印時間戳:2019-01-01 09:00:03.000
  事件: (1,1,20190101090004000,1546304404000), 最大時間戳:2019-01-01 09:00:04.000, 水印時間戳:2019-01-01 09:00:04.000
  ==== (1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090003000,1546304403000)]
  事件: (1,1,20190101090005000,1546304405000), 最大時間戳:2019-01-01 09:00:05.000, 水印時間戳:2019-01-01 09:00:05.000
  事件: (1,1,20190101090005500,1546304405500), 最大時間戳:2019-01-01 09:00:05.500, 水印時間戳:2019-01-01 09:00:05.500
  事件: (1,1,20190101090007000,1546304407000), 最大時間戳:2019-01-01 09:00:07.000, 水印時間戳:2019-01-01 09:00:07.000
  ==== (1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090005000,1546304405000), (1,1,20190101090005500,1546304405500)]

 

1.4、滾動窗口分析: 

 

控制台打印如下:
第1次觸發窗口 (水印時間戳:2019-01-01 09:00:01.999)
==== (1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999)]
解釋:
窗口的開始時間:2019-01-01 09:00:00.000
窗口開始時間由事件數據決定,即接收到第一條事件數據是:(1,1,20190101090000000,1546304400000),所以窗口開始時間為:20190101090000000。
結論是:
  窗口的開始時間 = 第一條事件數據的時間。
窗口的結束時間:2019-01-01 09:00:02.000
窗口開始時間是2019-01-01 09:00:00.000,窗口長度是2秒,那窗口結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:02.000。
結論是:

  窗口的結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:02.000。
窗口的長度:2秒
在1.1代碼的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滾動窗口,且窗口長度為2秒。
結論是:
  窗口長度 =
window(TumblingEventTimeWindows.of(Time.seconds(2)))代碼中設置的2秒。
進入窗口的數據:[
          (1,1,20190101090000000,1546304400000),
          (1,1,20190101090001000,1546304401000),
          (1,1,20190101090001999,1546304401999)
        ]
因為窗口屬於左閉右開(包前不包后),所以這個窗口的時間范圍是從2019-01-01 09:00:00.000 到 2019-01-01 09:00:02.000 - 1。只要數據的事件時間屬於該區間就會落在這個窗口中。
窗口結束條件的源碼如下:

結論是:
  當數據的事件時間 >= 窗口開始時間 && <=窗口結束時間-1時,都會落在窗口內。
   窗口的水印時間:因延遲時間為0,所以水印時間 = 事件時間。
在1.1代碼的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
currentMaxTimestamp是事件數據Tuple3(id,count,time)的時間f3字段,也就是事件時間。
maxOutOfOrderness是允許最大亂序延遲時間,該值=0。
所以,currentMaxTimestamp - maxOutOfOrderness 即 currentMaxTimestamp - 0。
結論是:
  如果最大允許的亂序時間是0, 則:水印時間 = 事件時間。
   窗口的觸發時機:窗口結束時間-1
源碼如下(TriggerResult枚舉類的FIRE狀態表示將窗口求值並發出結果,不清除窗口數據,會保留所有元素)。
觸發窗口計算的源碼:

觸發器的觸發狀態定義源碼:

結論是:
  1. 當事件時間 = 水印時間的情況下:
    1.1、水印時間(最后一條事件數據的事件時間) = 窗口結束時間-1,會觸發窗口計算;【因為最后一條數據的事件時間是20190101090001999滿足窗口結束時間-1毫秒】
    1.2、水印時間(最后一條事件數據的事件時間 = 窗口結束時間), 會觸發計算;
    1.3、水印時間(最后一條事件數據的事件時間 > 窗口結束時間), 會觸發計算;


  

  


































































第二次觸發窗口計算
控制台打印如下:
第2次觸發窗口 (水印時間戳:2019-01-01 09:00:04.000)
==== (1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090003000,1546304403000)]
解釋:
窗口的開始時間:2019-01-01 09:00:02.000
窗口開始時間由事件數據決定,即接收到第一條事件數據是:(1,1,20190101090002000,1546304402000),所以窗口開始時間為:20190101090002000。
結論是:
  窗口的開始時間 = 大於或等於前一個窗口結束時間的數據。
窗口的結束時間:2019-01-01 09:00:04.000
窗口開始時間是2019-01-01 09:00:02.000,窗口長度是2秒,那窗口結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:04.000。
結論是:

  窗口的結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:04.000。
窗口的長度:2秒
在1.1代碼的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滾動窗口,且窗口長度為2秒。
結論是:
  窗口長度 = window(TumblingEventTimeWindows.of(Time.seconds(2)))代碼中設置的2秒。
進入窗口的數據:[
          (1,1,20190101090002000,1546304402000),
          (1,1,20190101090003000,1546304403000)
        ]
因為窗口屬於左閉右開(包前不包后),所以這個窗口的時間范圍是從2019-01-01 09:00:02.000 到 2019-01-01 09:00:04.000 - 1。只要數據的事件時間屬於該區間就會落在這個窗口中。
窗口結束條件的源碼如下:

結論是:
  當數據的事件時間 >= 窗口開始時間 && <=窗口結束時間-1時,都會落在窗口內。
   窗口的水印時間:因延遲時間為0,所以事件時間 = 水印時間。
在1.1代碼的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
currentMaxTimestamp是事件數據Tuple3(id,count,time)的時間f3字段,也就是事件時間。
maxOutOfOrderness是允許最大亂序延遲時間,該值=0。
所以,currentMaxTimestamp - maxOutOfOrderness 即 currentMaxTimestamp - 0。
結論是:
  如果最大允許的亂序時間是0, 則:水印時間 = 事件時間。
   窗口的觸發時機:窗口結束時間-1
源碼如下(TriggerResult枚舉類的FIRE狀態表示將窗口求值並發出結果,不清除窗口數據,會保留所有元素)。
觸發窗口計算的源碼:

觸發器的觸發狀態定義源碼:

結論是:
  1. 當事件時間 = 水印時間的情況下:
    1.1、水印時間(最后一條事件數據的事件時間) = 窗口結束時間-1,會觸發窗口計算;
    1.2、水印時間(最后一條事件數據的事件時間 = 窗口結束時間), 會觸發窗口計算;【因為最后一條數據的事件時間是20190101090003000小於窗口結束時間無法觸發,而下一條數據20190101090004000等於窗口結束時間,所以觸發計算】
    1.3、水印時間(最后一條事件數據的事件時間 > 窗口結束時間), 會觸發窗口計算;
























































































第三次觸發窗口計算:

控制台打印如下:
第3次觸發窗口 (水印時間戳:2019-01-01 09:00:07.000)
==== (1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090005000,1546304405000), (1,1,20190101090005500,1546304405500)]
解釋:
窗口的開始時間:2019-01-01 09:00:04.000
窗口開始時間由事件數據決定,即接收到第一條事件數據是:(1,1,20190101090004000,1546304404000),所以窗口開始時間為:20190101090004000。
結論是:
  窗口的開始時間 = 大於或等於前一個窗口的數據。
窗口的結束時間:2019-01-01 09:00:06.000
窗口開始時間是2019-01-01 09:00:04.000,窗口長度是2秒,那窗口結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:06.000。
結論是:

  窗口的結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:06.000。
窗口的長度:2秒
在1.1代碼的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滾動窗口,且窗口長度為2秒。
結論是:
  窗口長度 = window(TumblingEventTimeWindows.of(Time.seconds(2)))代碼中設置的2秒。
進入窗口的數據:[
          (1,1,20190101090004000,1546304404000),
          (1,1,20190101090005000,1546304405000),
          (1,1,20190101090005500,1546304405500)

        ]
因為窗口屬於左閉右開(包前不包后),所以這個窗口的時間范圍是從2019-01-01 09:00:04.000 到 2019-01-01 09:00:06.000 - 1。只要數據的事件時間屬於該區間就會落在這個窗口中。
窗口結束條件的源碼如下:

結論是:
  當數據的事件時間 >= 窗口開始時間 && <=窗口結束時間-1時,都會落在窗口內。
   窗口的水印時間:因延遲時間為0,所以事件時間 = 水印時間。
在1.1代碼的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
currentMaxTimestamp是事件數據Tuple3(id,count,time)的時間f3字段,也就是事件時間。
maxOutOfOrderness是允許最大亂序延遲時間,該值=0。
所以,currentMaxTimestamp - maxOutOfOrderness 即 currentMaxTimestamp - 0。
結論是:
  如果最大允許的亂序時間是0, 則:水印時間 = 事件時間。
   窗口的觸發時機:窗口結束時間-1
源碼如下(TriggerResult枚舉類的FIRE狀態表示將窗口求值並發出結果,不清除窗口數據,會保留所有元素)。
觸發窗口計算的源碼:

觸發器的觸發狀態定義源碼:

結論是:
  1. 當事件時間 = 水印時間的情況下:
    1.1、水印時間(最后一條事件數據的事件時間) = 窗口結束時間-1,會觸發窗口計算;
    1.2、水印時間(最后一條事件數據的事件時間 = 窗口結束時間), 會觸發窗口計算;
    1.3、水印時間(最后一條事件數據的事件時間 > 窗口結束時間), 會觸發窗口計算;【因為窗口3最后一條數據的事件時間是20190101090005500小於窗口結束時間無法觸發,而下一條數據20190101090007000大於窗口結束時間,所以觸發計算】
















  

總結:
  先上圖

=================================================
這3個窗口的規律總結為:
  1、水印時間:事件時間 - 允許的最大亂序時間0秒,即每個水印時間落后於事件時間0秒,代碼:new Watermark(currentMaxTimestamp - maxOutOfOrderness)。
  2、窗口的開始時間:基於事件時間的滾動窗口下,是以(第一條數據的事件時間 or 事件時間大於等於前一個窗口的數據)作為窗口的開始時間,而不是算子所處節點的系統時鍾。
  3、窗口的結束時間:基於事件時間的滾動窗口下,窗口結束時間 = 窗口開始時間 + 窗口的長度即TumblingEventTimeWindows.of(Time.seconds(2))。
  4、窗口的計算時機:數據的水印時間 >= 窗口結束時間 or 數據的水印時間 = 窗口結束時間-1毫秒時,就觸發了窗口的計算。
  5、落入窗口內的數據:窗口屬於左閉右開,即數據的事件時間 >= 窗口起始時間 並且 < 窗口的結束時間。

 

二、使用並行度為4(多並行度)的本地模式測試

2.1、Flink時間時間窗口代碼,使用SocketSource:

package com.mengyao.flink.stream.window;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import com.mengyao.flink.stream.utils.DateUtil;

/**
 * 啟動netcat:nc -L -p 9999 -v
 * 驗證多並行度時的事件時間窗口。
 * Created by: mengyao
 * 2019年10月15日
 */
public class MultipleParallelismSocketEventTimeWindowApp {

    private static String jobName = MultipleParallelismSocketEventTimeWindowApp.class.getSimpleName();
    
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, conf);
        // 設置重啟策略,5次嘗試,每次嘗試的間隔為30秒
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 30000));
        // 使用事件時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 數據處理
        DataStream<Tuple4<String, String, String, Long>> inputDS = env.socketTextStream("localhost", 9999)
            .map(line -> {
                String[] fields = line.split(",",3);
                return Tuple4.of(fields[0], fields[1], fields[2], DateUtil.FMT05.get().parse((fields[2])).getTime());
            })
            .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG))
            .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, Long>>() {// 分配時間戳並定期生成水印
                private static final long serialVersionUID = -4195773369390603522L;
                private SimpleDateFormat formatter = DateUtil.FMT15.get();
                long currentMaxTimestamp = 0L;
                long maxOutOfOrderness = 000L;//允許最大的亂序時間是5秒
                @Override
                public long extractTimestamp(Tuple4<String, String, String, Long> ele, long prevEleTs) {
                    long timestamp = ele.f3;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                    long tId = Thread.currentThread().getId();
                    System.out.println("線程ID:"+tId+", 事件: "+ele+", 最大時間戳:"+DateUtil.ts2DateStr(currentMaxTimestamp, formatter)+", 水印時間戳:"+DateUtil.ts2DateStr(getCurrentWatermark().getTimestamp(), formatter));
                    return timestamp;
                }
                @Override
                public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
            });
        
        inputDS
            .keyBy(0)
            .window(TumblingEventTimeWindows.of(Time.seconds(2)))// 使用滾動窗口
            .apply(new WindowFunction<Tuple4<String,String,String,Long>, String, Tuple, TimeWindow>() {
                private static final long serialVersionUID = -4990083905742822422L;
                private SimpleDateFormat formatter = DateUtil.FMT15.get();
                @Override
                public void apply(Tuple key, TimeWindow window, Iterable<Tuple4<String, String, String, Long>> input,
                        Collector<String> out) throws Exception {
                    // 按照事件時間升序排序
                    List<Tuple4<String, String, String, Long>> list = new ArrayList<>();
                    input.forEach(t4->list.add(t4));
                    list.sort((e1,e2)->e1.f3.compareTo(e2.f3));
                    long tId = Thread.currentThread().getId();
                    System.out.println("==== 線程ID:"+tId+",key="+key+", 窗口開始:"+DateUtil.ts2DateStr(window.getStart(), formatter)+",窗口結束:"+DateUtil.ts2DateStr(window.getEnd(), formatter)+"; 窗口內的數據:"+list);
                }
            })
            .print();
        
        env.execute(jobName);
    }
    
}

2.2、使用netcat啟動SocketServer,發送數據到FlinkStreaming中

listening on [127.0.0.1] 9999 ...
connect to [127.0.0.1] from DESKTOP-H7J35OJ [127.0.0.1] 59356
1,1,20190101090000000
1,1,20190101090000500
1,1,20190101090001000
1,1,20190101090001999
1,1,20190101090002000
1,1,20190101090002500
1,1,20190101090001999
1,1,20190101090002300
1,1,20190101090003000
1,1,20190101090003999
1,1,20190101090004000
1,1,20190101090004500
1,1,20190101090003999
1,1,20190101090005999
1,1,20190101090005999
1,1,20190101090005999
1,1,20190101090005999

2.3、程序控制台輸出:

線程ID:63, 事件: (1,1,20190101090000000,1546304400000), 最大時間戳:2019-01-01 09:00:00.000, 水印時間戳:2019-01-01 09:00:00.000
線程ID:64, 事件: (1,1,20190101090000500,1546304400500), 最大時間戳:2019-01-01 09:00:00.500, 水印時間戳:2019-01-01 09:00:00.500
線程ID:65, 事件: (1,1,20190101090001000,1546304401000), 最大時間戳:2019-01-01 09:00:01.000, 水印時間戳:2019-01-01 09:00:01.000
線程ID:66, 事件: (1,1,20190101090001999,1546304401999), 最大時間戳:2019-01-01 09:00:01.999, 水印時間戳:2019-01-01 09:00:01.999
線程ID:63, 事件: (1,1,20190101090002000,1546304402000), 最大時間戳:2019-01-01 09:00:02.000, 水印時間戳:2019-01-01 09:00:02.000
線程ID:64, 事件: (1,1,20190101090002500,1546304402500), 最大時間戳:2019-01-01 09:00:02.500, 水印時間戳:2019-01-01 09:00:02.500
線程ID:65, 事件: (1,1,20190101090001999,1546304401999), 最大時間戳:2019-01-01 09:00:01.999, 水印時間戳:2019-01-01 09:00:01.999
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090000500,1546304400500), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999), (1,1,20190101090001999,1546304401999)]
線程ID:66, 事件: (1,1,20190101090002300,1546304402300), 最大時間戳:2019-01-01 09:00:02.300, 水印時間戳:2019-01-01 09:00:02.300
線程ID:63, 事件: (1,1,20190101090003000,1546304403000), 最大時間戳:2019-01-01 09:00:03.000, 水印時間戳:2019-01-01 09:00:03.000
線程ID:64, 事件: (1,1,20190101090003999,1546304403999), 最大時間戳:2019-01-01 09:00:03.999, 水印時間戳:2019-01-01 09:00:03.999
線程ID:65, 事件: (1,1,20190101090004000,1546304404000), 最大時間戳:2019-01-01 09:00:04.000, 水印時間戳:2019-01-01 09:00:04.000
線程ID:66, 事件: (1,1,20190101090004500,1546304404500), 最大時間戳:2019-01-01 09:00:04.500, 水印時間戳:2019-01-01 09:00:04.500
線程ID:63, 事件: (1,1,20190101090003999,1546304403999), 最大時間戳:2019-01-01 09:00:03.999, 水印時間戳:2019-01-01 09:00:03.999
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090002300,1546304402300), (1,1,20190101090002500,1546304402500), (1,1,20190101090003000,1546304403000), (1,1,20190101090003999,1546304403999), (1,1,20190101090003999,1546304403999)]
線程ID:64, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999
線程ID:65, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999
線程ID:66, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999
線程ID:63, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090004500,1546304404500), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999)]

 

2.4、滾動窗口分析:

第一次窗口觸發(水印時間:2019-01-01 09:00:01.999)
  控制台打印如下:
    ==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090000500,1546304400500), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999), (1,1,20190101090001999,1546304401999)]
  
解釋:
    當每一個線程中最新的水印時間都 > = 窗口結束時間 / 窗口結束時間-1毫秒時,就會觸發窗口的計算。

第二次窗口觸發(水印時間:2019-01-01 09:00:03.999)
  控制台打印如下:
    
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090002300,1546304402300), (1,1,20190101090002500,1546304402500), (1,1,20190101090003000,1546304403000), (1,1,20190101090003999,1546304403999), (1,1,20190101090003999,1546304403999)]
  解釋:
    當每一個線程中最新的水印時間都 > = 窗口結束時間 / 窗口結束時間-1毫秒時,就會觸發窗口的計算。
第三次窗口觸發(水印時間:2019-01-01 09:00:05.999)
  控制台打印如下:
    ==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090004500,1546304404500), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999)]
  解釋:
    當每一個線程中最新的水印時間都 > = 窗口結束時間 / 窗口結束時間-1毫秒時,就會觸發窗口的計算。

多並行度下事件時間窗口總結:
  1、當所有線程內的最新水印時間 >= 窗口結束時間 / 窗口結束時間-1。 就會觸發窗口的計算。
  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM