flink WaterMark之TumblingEventWindow


1、WaterMark,翻譯成水印或水位線,水印翻譯更抽象,水位線翻譯接地氣。

watermark是用於處理亂序事件的,通常用watermark機制結合window來實現。

流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、背壓等原因,導致亂序的產生(out-of-order或者說late element)。

但是對於遲到或者亂序的元素,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window進行計算。這個特別的機制,就是watermark。觸發時間遵循自然時間以及左閉右開原則。

正常有序流:watermark實際上與event的時間戳重合

 

 亂序流:watermark用於觸發窗口計算,也就是水印不到,即使流數據已經落入多個窗口也不會觸發,如果水印到了,該窗口的數據即使沒到也會觸發計算,遲到的數據缺省將被拋棄。

2、TumblingEventWindow 窗口結合WaterMark,用代碼驗證一下有序和亂序的流。

從socket里接收文本,文本以對子(時間戳 +文本)出現,字段分隔符是空格,行分隔符是“\n”,對收到的文本以10秒滾動窗口給文本計數。
有序情況下:watermark是0,也就是不延時接收數據。
亂序情況下:watermark是3s,延時3秒觸發窗口計算。

code:

public class TumblingEventWindowExample {
    public static void main(String args[]) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> socketStream = env.socketTextStream("192.168.31.10",9000);
        DataStream<Tuple2<String,Long>> resultStream = socketStream
                 //Time.seconds(3)有序的情況修改成0
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(String element) {
                        long eventTime = Long.parseLong(element.split(" ")[0]);
                        System.out.println(eventTime);
                        return eventTime;
                    }
                })
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String,Long> map(String value) throws Exception {
                        return Tuple2.of(value.split(" ")[1],1L);
                    }
                }).keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return new Tuple2<>(value1.f0,value1.f1+value2.f1);
                    }
                });
        resultStream.print();

        env.execute();
    }
}

 

2.1 有序的情況,watermark為0s

第一個窗口:

10000
11000
12000
13000
14000
19888
13000
20000
1> (b,2)
3> (a,5)

時間戳20000觸發第一個窗口計算,實際上19999也會觸發,因為左閉右開原則,20000這個時間戳並不會在第一個窗口計算,第一個窗口是[10000-20000),第二個窗口是[20000-30000),以此類推。

 

第二個窗口:

10000
11000
12000
13000
14000
19888
13000
20000
1> (b,2)
3> (a,5)
11000
12000
21000
22000
29999
3> (a,3)
1> (b,1)

第一個窗口觸發計算后,后續來的11000,12000這兩條數據被拋棄,29999直接觸發窗口計算,並且本身也屬於第二個窗口,所以也參與計算了。

 

 2.2 watermark為3s的情況

10000
11000
12000
20000
21000
22000
23000
3> (a,2)
1> (b,1)

從數據中可以驗證,第一個窗口在20000的時候沒有觸發計算,而是在23000的時候觸發計算,計算內容是第一個窗口[10000,20000),所以20000,21000,22000,23000屬於第二個窗口,沒有參與計算。

 

第二個窗口:

10000
11000
12000
20000
21000
22000
23000
3> (a,2)
1> (b,1)
24000
29000
30000
22000
23000
33000
3> (a,6)
1> (b,2)

第二個窗口[20000,30000),它是在33000觸發計算,並且,遲到的數據22000,23000也被計算在內(如果這兩個數據在水印33000后到達,則會被拋棄),30000和33000是第三個窗口的數據,沒有計算在內。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM