1、WaterMark,翻譯成水印或水位線,水印翻譯更抽象,水位線翻譯接地氣。
watermark是用於處理亂序事件的,通常用watermark機制結合window來實現。 流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、背壓等原因,導致亂序的產生(out-of-order或者說late element)。 但是對於遲到或者亂序的元素,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window進行計算。這個特別的機制,就是watermark。觸發時間遵循自然時間以及左閉右開原則。
正常有序流:watermark實際上與event的時間戳重合
亂序流:watermark用於觸發窗口計算,也就是水印不到,即使流數據已經落入多個窗口也不會觸發,如果水印到了,該窗口的數據即使沒到也會觸發計算,遲到的數據缺省將被拋棄。
2、TumblingEventWindow 窗口結合WaterMark,用代碼驗證一下有序和亂序的流。
從socket里接收文本,文本以對子(時間戳 +文本)出現,字段分隔符是空格,行分隔符是“\n”,對收到的文本以10秒滾動窗口給文本計數。
有序情況下:watermark是0,也就是不延時接收數據。
亂序情況下:watermark是3s,延時3秒觸發窗口計算。
code:
public class TumblingEventWindowExample { public static void main(String args[]) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> socketStream = env.socketTextStream("192.168.31.10",9000); DataStream<Tuple2<String,Long>> resultStream = socketStream //Time.seconds(3)有序的情況修改成0 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) { @Override public long extractTimestamp(String element) { long eventTime = Long.parseLong(element.split(" ")[0]); System.out.println(eventTime); return eventTime; } }) .map(new MapFunction<String, Tuple2<String,Long>>() { @Override public Tuple2<String,Long> map(String value) throws Exception { return Tuple2.of(value.split(" ")[1],1L); } }).keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction<Tuple2<String,Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { return new Tuple2<>(value1.f0,value1.f1+value2.f1); } }); resultStream.print(); env.execute(); } }
2.1 有序的情況,watermark為0s
第一個窗口:
10000
11000
12000
13000
14000
19888
13000
20000
1> (b,2)
3> (a,5)
時間戳20000觸發第一個窗口計算,實際上19999也會觸發,因為左閉右開原則,20000這個時間戳並不會在第一個窗口計算,第一個窗口是[10000-20000),第二個窗口是[20000-30000),以此類推。
第二個窗口:
10000 11000 12000 13000 14000 19888 13000 20000 1> (b,2) 3> (a,5) 11000 12000 21000 22000 29999 3> (a,3) 1> (b,1)
第一個窗口觸發計算后,后續來的11000,12000這兩條數據被拋棄,29999直接觸發窗口計算,並且本身也屬於第二個窗口,所以也參與計算了。
2.2 watermark為3s的情況
10000
11000
12000
20000
21000
22000
23000
3> (a,2)
1> (b,1)
從數據中可以驗證,第一個窗口在20000的時候沒有觸發計算,而是在23000的時候觸發計算,計算內容是第一個窗口[10000,20000),所以20000,21000,22000,23000屬於第二個窗口,沒有參與計算。
第二個窗口:
10000 11000 12000 20000 21000 22000 23000 3> (a,2) 1> (b,1) 24000 29000 30000 22000 23000 33000 3> (a,6) 1> (b,2)
第二個窗口[20000,30000),它是在33000觸發計算,並且,遲到的數據22000,23000也被計算在內(如果這兩個數據在水印33000后到達,則會被拋棄),30000和33000是第三個窗口的數據,沒有計算在內。