使用flink-1.9.0進行的測試,在不同的並行度下,Flink對事件時間的處理邏輯不同。包括1.1在並行度為1的本地模式分析和1.2在多並行度的本地模式分析兩部分。通過理論結合源碼進行驗證,得到具有說服力的結論。
一、使用並行度為1的本地模式測試
1.1、Flink時間時間窗口代碼,使用SocketSource:
1 package com.mengyao.flink.stream.window; 2 3 import java.text.SimpleDateFormat; 4 import java.util.ArrayList; 5 import java.util.List; 6 7 import org.apache.flink.api.common.restartstrategy.RestartStrategies; 8 import org.apache.flink.api.common.typeinfo.Types; 9 import org.apache.flink.api.java.tuple.Tuple; 10 import org.apache.flink.api.java.tuple.Tuple4; 11 import org.apache.flink.configuration.ConfigConstants; 12 import org.apache.flink.configuration.Configuration; 13 import org.apache.flink.configuration.RestOptions; 14 import org.apache.flink.streaming.api.TimeCharacteristic; 15 import org.apache.flink.streaming.api.datastream.DataStream; 16 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 17 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; 18 import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 19 import org.apache.flink.streaming.api.watermark.Watermark; 20 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; 21 import org.apache.flink.streaming.api.windowing.time.Time; 22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 23 import org.apache.flink.util.Collector; 24 25 import com.mengyao.flink.stream.utils.DateUtil; 26 27 /** 28 * 啟動netcat:nc -L -p 9999 -v 29 * 30 * Created by: mengyao 31 * 2019年10月15日 32 */ 33 public class SocketEventTimeWindowApp { 34 35 private static String jobName = SocketEventTimeWindowApp.class.getSimpleName(); 36 37 38 public static void main(String[] args) throws Exception { 39 Configuration conf = new Configuration(); 40 conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); 41 conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue()); 42 final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf); 43 // 設置重啟策略,5次嘗試,每次嘗試的間隔為30秒 44 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 30000)); 45 // 使用事件時間 46 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 47 // 數據處理 48 DataStream<Tuple4<String, String, String, Long>> inputDS = env.socketTextStream("localhost", 9999) 49 .map(line -> { 50 String[] fields = line.split(",",3); 51 return Tuple4.of(fields[0], fields[1], fields[2], DateUtil.FMT05.get().parse((fields[2])).getTime()); 52 }) 53 .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)) 54 .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, Long>>() {// 分配時間戳並定期生成水印 55 private static final long serialVersionUID = -4195773369390603522L; 56 private SimpleDateFormat formatter = DateUtil.FMT15.get(); 57 long currentMaxTimestamp = 0L; 58 long maxOutOfOrderness = 0L;//允許最大的亂序時間是0秒 59 @Override 60 public long extractTimestamp(Tuple4<String, String, String, Long> ele, long prevEleTs) { 61 long timestamp = ele.f3; 62 currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); 63 System.out.println("事件: "+ele+", 最大時間戳:"+DateUtil.ts2DateStr(currentMaxTimestamp, formatter)+", 水印時間戳:"+DateUtil.ts2DateStr(getCurrentWatermark().getTimestamp(), formatter)); 64 return timestamp; 65 } 66 @Override 67 public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);} 68 }); 69 70 inputDS 71 .keyBy(0) 72 .window(TumblingEventTimeWindows.of(Time.seconds(2)))// 使用滾動窗口 73 .apply(new WindowFunction<Tuple4<String,String,String,Long>, String, Tuple, TimeWindow>() { 74 private static final long serialVersionUID = -4990083905742822422L; 75 private SimpleDateFormat formatter = DateUtil.FMT15.get(); 76 @Override 77 public void apply(Tuple key, TimeWindow window, Iterable<Tuple4<String, String, String, Long>> input, 78 Collector<String> out) throws Exception { 79 // 按照事件時間升序排序 80 List<Tuple4<String, String, String, Long>> list = new ArrayList<>(); 81 input.forEach(t4->list.add(t4)); 82 list.sort((e1,e2)->e1.f3.compareTo(e2.f3)); 83 System.out.println("==== "+key+", 窗口開始:"+DateUtil.ts2DateStr(window.getStart(), formatter)+",窗口結束:"+DateUtil.ts2DateStr(window.getEnd(), formatter)+"; 窗口內的數據:"+list); 84 } 85 }) 86 .print(); 87 88 env.execute(jobName); 89 } 90 91 }
1.2、使用netcat啟動SocketServer,發送數據到FlinkStreaming中(數據是有序的情況下)
C:\Users\mengyao>nc -l -p 9999 -v listening on [any] 9999 ... connect to [127.0.0.1] from DESKTOP-H7J35OJ [127.0.0.1] 63187 1,1,20190101090000000 1,1,20190101090001000 1,1,20190101090001999 1,1,20190101090002000 1,1,20190101090003000 1,1,20190101090004000 1,1,20190101090005000 1,1,20190101090005500 1,1,20190101090007000
1.3、程序控制台輸出:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
事件: (1,1,20190101090000000,1546304400000), 最大時間戳:2019-01-01 09:00:00.000, 水印時間戳:2019-01-01 09:00:00.000
事件: (1,1,20190101090001000,1546304401000), 最大時間戳:2019-01-01 09:00:01.000, 水印時間戳:2019-01-01 09:00:01.000
事件: (1,1,20190101090001999,1546304401999), 最大時間戳:2019-01-01 09:00:01.999, 水印時間戳:2019-01-01 09:00:01.999
==== (1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999)]
事件: (1,1,20190101090002000,1546304402000), 最大時間戳:2019-01-01 09:00:02.000, 水印時間戳:2019-01-01 09:00:02.000
事件: (1,1,20190101090003000,1546304403000), 最大時間戳:2019-01-01 09:00:03.000, 水印時間戳:2019-01-01 09:00:03.000
事件: (1,1,20190101090004000,1546304404000), 最大時間戳:2019-01-01 09:00:04.000, 水印時間戳:2019-01-01 09:00:04.000
==== (1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090003000,1546304403000)]
事件: (1,1,20190101090005000,1546304405000), 最大時間戳:2019-01-01 09:00:05.000, 水印時間戳:2019-01-01 09:00:05.000
事件: (1,1,20190101090005500,1546304405500), 最大時間戳:2019-01-01 09:00:05.500, 水印時間戳:2019-01-01 09:00:05.500
事件: (1,1,20190101090007000,1546304407000), 最大時間戳:2019-01-01 09:00:07.000, 水印時間戳:2019-01-01 09:00:07.000
==== (1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090005000,1546304405000), (1,1,20190101090005500,1546304405500)]
1.4、滾動窗口分析:
控制台打印如下: |
|
解釋: |
|
窗口的開始時間:2019-01-01 09:00:00.000 |
窗口開始時間由事件數據決定,即接收到第一條事件數據是:(1,1,20190101090000000,1546304400000),所以窗口開始時間為:20190101090000000。 |
窗口的結束時間:2019-01-01 09:00:02.000 |
窗口開始時間是2019-01-01 09:00:00.000,窗口長度是2秒,那窗口結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:02.000。 |
窗口的長度:2秒 |
在1.1代碼的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滾動窗口,且窗口長度為2秒。 |
進入窗口的數據:[ |
因為窗口屬於左閉右開(包前不包后),所以這個窗口的時間范圍是從2019-01-01 09:00:00.000 到 2019-01-01 09:00:02.000 - 1。只要數據的事件時間屬於該區間就會落在這個窗口中。 |
窗口的水印時間:因延遲時間為0,所以水印時間 = 事件時間。 | 在1.1代碼的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);} |
窗口的觸發時機:窗口結束時間-1 | 源碼如下(TriggerResult枚舉類的FIRE狀態表示將窗口求值並發出結果,不清除窗口數據,會保留所有元素)。 |
第二次觸發窗口計算
控制台打印如下: |
|
解釋: |
|
窗口的開始時間:2019-01-01 09:00:02.000 |
窗口開始時間由事件數據決定,即接收到第一條事件數據是:(1,1,20190101090002000,1546304402000),所以窗口開始時間為:20190101090002000。 |
窗口的結束時間:2019-01-01 09:00:04.000 |
窗口開始時間是2019-01-01 09:00:02.000,窗口長度是2秒,那窗口結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:04.000。 |
窗口的長度:2秒 |
在1.1代碼的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滾動窗口,且窗口長度為2秒。 |
進入窗口的數據:[ |
因為窗口屬於左閉右開(包前不包后),所以這個窗口的時間范圍是從2019-01-01 09:00:02.000 到 2019-01-01 09:00:04.000 - 1。只要數據的事件時間屬於該區間就會落在這個窗口中。 |
窗口的水印時間:因延遲時間為0,所以事件時間 = 水印時間。 | 在1.1代碼的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);} |
窗口的觸發時機:窗口結束時間-1 | 源碼如下(TriggerResult枚舉類的FIRE狀態表示將窗口求值並發出結果,不清除窗口數據,會保留所有元素)。 |
第三次觸發窗口計算:
控制台打印如下: |
|
解釋: |
|
窗口的開始時間:2019-01-01 09:00:04.000 |
窗口開始時間由事件數據決定,即接收到第一條事件數據是:(1,1,20190101090004000,1546304404000),所以窗口開始時間為:20190101090004000。 |
窗口的結束時間:2019-01-01 09:00:06.000 |
窗口開始時間是2019-01-01 09:00:04.000,窗口長度是2秒,那窗口結束時間 = 窗口開始時間+2秒 = 2019-01-01 09:00:06.000。 |
窗口的長度:2秒 |
在1.1代碼的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滾動窗口,且窗口長度為2秒。 |
進入窗口的數據:[ |
因為窗口屬於左閉右開(包前不包后),所以這個窗口的時間范圍是從2019-01-01 09:00:04.000 到 2019-01-01 09:00:06.000 - 1。只要數據的事件時間屬於該區間就會落在這個窗口中。 |
窗口的水印時間:因延遲時間為0,所以事件時間 = 水印時間。 | 在1.1代碼的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);} |
窗口的觸發時機:窗口結束時間-1 | 源碼如下(TriggerResult枚舉類的FIRE狀態表示將窗口求值並發出結果,不清除窗口數據,會保留所有元素)。 |
總結:
先上圖
=================================================
這3個窗口的規律總結為:
1、水印時間:事件時間 - 允許的最大亂序時間0秒,即每個水印時間落后於事件時間0秒,代碼:new Watermark(currentMaxTimestamp - maxOutOfOrderness)。
2、窗口的開始時間:基於事件時間的滾動窗口下,是以(第一條數據的事件時間 or 事件時間大於等於前一個窗口的數據)作為窗口的開始時間,而不是算子所處節點的系統時鍾。
3、窗口的結束時間:基於事件時間的滾動窗口下,窗口結束時間 = 窗口開始時間 + 窗口的長度即TumblingEventTimeWindows.of(Time.seconds(2))。
4、窗口的計算時機:數據的水印時間 >= 窗口結束時間 or 數據的水印時間 = 窗口結束時間-1毫秒時,就觸發了窗口的計算。
5、落入窗口內的數據:窗口屬於左閉右開,即數據的事件時間 >= 窗口起始時間 並且 < 窗口的結束時間。
二、使用並行度為4(多並行度)的本地模式測試
2.1、Flink時間時間窗口代碼,使用SocketSource:
package com.mengyao.flink.stream.window; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import com.mengyao.flink.stream.utils.DateUtil; /** * 啟動netcat:nc -L -p 9999 -v * 驗證多並行度時的事件時間窗口。 * Created by: mengyao * 2019年10月15日 */ public class MultipleParallelismSocketEventTimeWindowApp { private static String jobName = MultipleParallelismSocketEventTimeWindowApp.class.getSimpleName(); public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue()); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, conf); // 設置重啟策略,5次嘗試,每次嘗試的間隔為30秒 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 30000)); // 使用事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 數據處理 DataStream<Tuple4<String, String, String, Long>> inputDS = env.socketTextStream("localhost", 9999) .map(line -> { String[] fields = line.split(",",3); return Tuple4.of(fields[0], fields[1], fields[2], DateUtil.FMT05.get().parse((fields[2])).getTime()); }) .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, Long>>() {// 分配時間戳並定期生成水印 private static final long serialVersionUID = -4195773369390603522L; private SimpleDateFormat formatter = DateUtil.FMT15.get(); long currentMaxTimestamp = 0L; long maxOutOfOrderness = 000L;//允許最大的亂序時間是5秒 @Override public long extractTimestamp(Tuple4<String, String, String, Long> ele, long prevEleTs) { long timestamp = ele.f3; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); long tId = Thread.currentThread().getId(); System.out.println("線程ID:"+tId+", 事件: "+ele+", 最大時間戳:"+DateUtil.ts2DateStr(currentMaxTimestamp, formatter)+", 水印時間戳:"+DateUtil.ts2DateStr(getCurrentWatermark().getTimestamp(), formatter)); return timestamp; } @Override public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);} }); inputDS .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(2)))// 使用滾動窗口 .apply(new WindowFunction<Tuple4<String,String,String,Long>, String, Tuple, TimeWindow>() { private static final long serialVersionUID = -4990083905742822422L; private SimpleDateFormat formatter = DateUtil.FMT15.get(); @Override public void apply(Tuple key, TimeWindow window, Iterable<Tuple4<String, String, String, Long>> input, Collector<String> out) throws Exception { // 按照事件時間升序排序 List<Tuple4<String, String, String, Long>> list = new ArrayList<>(); input.forEach(t4->list.add(t4)); list.sort((e1,e2)->e1.f3.compareTo(e2.f3)); long tId = Thread.currentThread().getId(); System.out.println("==== 線程ID:"+tId+",key="+key+", 窗口開始:"+DateUtil.ts2DateStr(window.getStart(), formatter)+",窗口結束:"+DateUtil.ts2DateStr(window.getEnd(), formatter)+"; 窗口內的數據:"+list); } }) .print(); env.execute(jobName); } }
2.2、使用netcat啟動SocketServer,發送數據到FlinkStreaming中
listening on [127.0.0.1] 9999 ... connect to [127.0.0.1] from DESKTOP-H7J35OJ [127.0.0.1] 59356 1,1,20190101090000000 1,1,20190101090000500 1,1,20190101090001000 1,1,20190101090001999 1,1,20190101090002000 1,1,20190101090002500 1,1,20190101090001999 1,1,20190101090002300 1,1,20190101090003000 1,1,20190101090003999 1,1,20190101090004000 1,1,20190101090004500 1,1,20190101090003999
1,1,20190101090005999
1,1,20190101090005999
1,1,20190101090005999
1,1,20190101090005999
2.3、程序控制台輸出:
線程ID:63, 事件: (1,1,20190101090000000,1546304400000), 最大時間戳:2019-01-01 09:00:00.000, 水印時間戳:2019-01-01 09:00:00.000 線程ID:64, 事件: (1,1,20190101090000500,1546304400500), 最大時間戳:2019-01-01 09:00:00.500, 水印時間戳:2019-01-01 09:00:00.500 線程ID:65, 事件: (1,1,20190101090001000,1546304401000), 最大時間戳:2019-01-01 09:00:01.000, 水印時間戳:2019-01-01 09:00:01.000 線程ID:66, 事件: (1,1,20190101090001999,1546304401999), 最大時間戳:2019-01-01 09:00:01.999, 水印時間戳:2019-01-01 09:00:01.999 線程ID:63, 事件: (1,1,20190101090002000,1546304402000), 最大時間戳:2019-01-01 09:00:02.000, 水印時間戳:2019-01-01 09:00:02.000 線程ID:64, 事件: (1,1,20190101090002500,1546304402500), 最大時間戳:2019-01-01 09:00:02.500, 水印時間戳:2019-01-01 09:00:02.500 線程ID:65, 事件: (1,1,20190101090001999,1546304401999), 最大時間戳:2019-01-01 09:00:01.999, 水印時間戳:2019-01-01 09:00:01.999 ==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090000500,1546304400500), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999), (1,1,20190101090001999,1546304401999)] 線程ID:66, 事件: (1,1,20190101090002300,1546304402300), 最大時間戳:2019-01-01 09:00:02.300, 水印時間戳:2019-01-01 09:00:02.300 線程ID:63, 事件: (1,1,20190101090003000,1546304403000), 最大時間戳:2019-01-01 09:00:03.000, 水印時間戳:2019-01-01 09:00:03.000 線程ID:64, 事件: (1,1,20190101090003999,1546304403999), 最大時間戳:2019-01-01 09:00:03.999, 水印時間戳:2019-01-01 09:00:03.999 線程ID:65, 事件: (1,1,20190101090004000,1546304404000), 最大時間戳:2019-01-01 09:00:04.000, 水印時間戳:2019-01-01 09:00:04.000 線程ID:66, 事件: (1,1,20190101090004500,1546304404500), 最大時間戳:2019-01-01 09:00:04.500, 水印時間戳:2019-01-01 09:00:04.500 線程ID:63, 事件: (1,1,20190101090003999,1546304403999), 最大時間戳:2019-01-01 09:00:03.999, 水印時間戳:2019-01-01 09:00:03.999 ==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090002300,1546304402300), (1,1,20190101090002500,1546304402500), (1,1,20190101090003000,1546304403000), (1,1,20190101090003999,1546304403999), (1,1,20190101090003999,1546304403999)] 線程ID:64, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999 線程ID:65, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999 線程ID:66, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999 線程ID:63, 事件: (1,1,20190101090005999,1546304405999), 最大時間戳:2019-01-01 09:00:05.999, 水印時間戳:2019-01-01 09:00:05.999 ==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090004500,1546304404500), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999)]
2.4、滾動窗口分析:
第一次窗口觸發(水印時間:2019-01-01 09:00:01.999)
控制台打印如下:
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:00.000,窗口結束:2019-01-01 09:00:02.000; 窗口內的數據:[(1,1,20190101090000000,1546304400000), (1,1,20190101090000500,1546304400500), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999), (1,1,20190101090001999,1546304401999)]
解釋:
當每一個線程中最新的水印時間都 > = 窗口結束時間 / 窗口結束時間-1毫秒時,就會觸發窗口的計算。
第二次窗口觸發(水印時間:2019-01-01 09:00:03.999)
控制台打印如下:
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:02.000,窗口結束:2019-01-01 09:00:04.000; 窗口內的數據:[(1,1,20190101090002000,1546304402000), (1,1,20190101090002300,1546304402300), (1,1,20190101090002500,1546304402500), (1,1,20190101090003000,1546304403000), (1,1,20190101090003999,1546304403999), (1,1,20190101090003999,1546304403999)]
解釋:
當每一個線程中最新的水印時間都 > = 窗口結束時間 / 窗口結束時間-1毫秒時,就會觸發窗口的計算。
第三次窗口觸發(水印時間:2019-01-01 09:00:05.999)
控制台打印如下:
==== 線程ID:72,key=(1), 窗口開始:2019-01-01 09:00:04.000,窗口結束:2019-01-01 09:00:06.000; 窗口內的數據:[(1,1,20190101090004000,1546304404000), (1,1,20190101090004500,1546304404500), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999)]
解釋:
當每一個線程中最新的水印時間都 > = 窗口結束時間 / 窗口結束時間-1毫秒時,就會觸發窗口的計算。
多並行度下事件時間窗口總結:
1、當所有線程內的最新水印時間 >= 窗口結束時間 / 窗口結束時間-1。 就會觸發窗口的計算。