通俗易懂之flink的窗口、時間和水印


1,經常說的窗口是個啥?

大家平時開發經常會做一些聚合操作,比如count,sum等。在離線跑批的情況下,這些數據都是恆定的,所以不會有什么問題。但是到了實時流的場景,似乎就不太行了。比如小伙伴陸續排隊來游樂園玩耍,售票員如果需要做統計,是怎么樣的呢?

 

3個,4個,5個。。。。。。可以知道,在流的世界里,數據是源源不斷的過來的,我們是無法進行聚合統計的,因為流是無界的,而在離線情況,這些數據是一個有界的。為了方便對流的數據進行聚合操作,有了窗口的概念。售票員通常的做法是,比如每隔10分鍾統計下進去了多少個小伙伴,又或者每進去100個小伙伴就把通道門關閉,等這100個小伙伴玩好了再打開通道門,再進去100個小伙伴。可以看到,window(窗口)是一種可以把無限數據切割為有限數據塊的手段。窗口可以是時間驅動的(比如這里的每隔10分鍾),也可以是數據驅動的(比如100個小伙伴);

2,窗口的分類

Flink的窗口可以分為三種類型:

滾動窗口:

對應的語義:每隔10分鍾統計一次人數

 

一眼看出來這里的window size就是10分鍾,可以看出來數據是不會又重疊的。

滑動窗口:

對應的語義:每隔5分鍾統計下前10分鍾的人數

這里每5分鍾作為一個滑動,得到一個10分鍾的窗口,可以看出來數據是有重疊的。

會話窗口:

這種窗口使用的不多,略過。

3,flink時間的分類

Flink的時間可以分為三類:

事件時間(Event Time)

事件時間是外部原始數據自帶的時間,比如{"name":"zhangsan","opreator":"visit","time":"2020-08-31 17:10:15"},這里的time就是事件時間。 

處理時間(Processing Time)

事件被處理時系統的當前時間。

攝入時間(Ingestion Time)

事件進入flink的時間。

4,從沒聽說過的水印又是個什么東東,是為了解決什么問題產生的?

場景:

首先大家都知道kafka是能做到每個分區有序,但是不能做到全局有序的。(當然也可以,就是一個分區,但是這樣完全沒了分布式的意義了。)那么這種情況下,如何做到數據不亂序呢,水印的出現就是為了解決數據亂序的問題

 

水印的本質來說其實就是一個時間戳。如果flink系統出現來了一個WaterMark T,這就意味着時間小於T的事件都已經到達,窗口結束時間和T相同的那個窗口被觸發計算。簡單的說,水印就是判斷是否遲到的標准,同時它也是窗口是否被觸發的一個標記。

5,一個案例解決你的困惑。

下面通過一個例子來加深大家對這些概念的理解,前面說了,水印的出現主要是為了解決數據亂序的問題,所以我們看看是如何解決亂序的。比如lisi不停的打卡,假設原始message為”lisi,時間戳”這樣的字符串。設置窗口大小為5秒。首先flink會根據系統時間划分出來窗口,這個划分的目的后面解釋。只要知道這時的窗口划分是flink系統定義好的,和消息所帶的時間無關。划分的結構如下:

現在假設我們允許消息是可以延遲10秒的。這里是為了計算出來水印,水印的計算:

消息的時間-最大的延遲;

下面我們構造一批測試數據:

* lisi,1598839882000       2020-08-31 10:11:22
* lisi,1598839886000       2020-08-31 10:11:26
* lisi,1598839892000       2020-08-31 10:11:32
* lisi,1598839893000       2020-08-31 10:11:33
* lisi,1598839894000       2020-08-31 10:11:34
* lisi,1598839895000       2020-08-31 10:11:35
* lisi,1598839900000       2020-08-31 10:11:40

 

這里我寫了相應的代碼,核心的代碼為:

        //抽取timestamp和生成watermark
        DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            /**
             * 定義生成watermark的邏輯
             * 默認100ms被調用一次
             */
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            //定義如何提取timestamp
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

//                System.out.println("key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" +
//                        sdf.format(currentMaxTimestamp) + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]");

                System.out.println("傳入數據:" + element.f1 + "|" + sdf.format(element.f1) + ",此時的水印為:" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()));
                return timestamp;
            }
        });

這里主要是定義了水印的計算,下面我們依次看一下每條數據輸入進去以后的結果。

在數據源頭輸入:lisi,1598839882000,此時打印出來:

 

這里驗證了水印的計算就是消息的時間-延遲的時間(用戶自己指定),我們是10s,所以水印是2020-08-31 10:11:12.000;

繼續輸入:lisi,1598839886000,此時打印出來:

水印變為:2020-08-31 10:11:16;

繼續輸入:lisi,1598839892000,此時打印出來:

水印變為:2020-08-31 10:11:2

注意:這里我們的水印已經和我們第一條輸入數據的時間相同了;

繼續輸入:lisi,1598839893000,此時打印出來:

 

 

水印變為:2020-08-31 10:11:23

說明一點,我們的代碼在滿足水印和窗口的條件是做了排序,然后打印出原始消息的。但是這里遲遲沒有打印,而且我們的第一條原始數據都比水印小了。

繼續輸入:lisi,1598839894000,此時打印出來:

水印變為:2020-08-31 10:11:24。

最開始的數據依舊沒有觸發,此時已經比最新的數據小了12秒。

那我們再輸入會怎樣呢:

繼續輸入:lisi,1598839895000,此時打印出來:

這里發現當我們輸入1598839895000這個時間以后,就會觸發窗口數據的執行了。此時的水印為:2020-08-31 10:11:25。那么是什么原因觸發了這個窗口的執行呢?我們本章節最開始把flink系統時間做了划分的目的是什么呢?

 

謎底就在這里了!首先我們第一條數據的時間是1598839882000(2020-08-31 10:11:22),在區間為[00:00:20,00:00:25)這個范圍,然后當我們的水印時間達到該窗口的最大時間,就會觸發這個窗口的執行了。這里當我們的水印達到了2020-08-31 10:11:25,也就是第一條數據所在的窗口的最大值時,那么該窗口的數據就會依次被執行了。該窗口只有一條數據,也就是lisi,1598839882000(2020-08-31 10:11:22)。

同理,第二條數據lisi,1598839886000(2020-08-31 10:11:26)在區間[00:00:25,00:00:30)上,

繼續輸入:lisi,1598839900000(2020-08-31 10:11:40),此時打印出來:

 

這時候的水印為2020-08-31 10:11:30,和第二條數據所在的窗口最大值一樣大,所以也會觸發這個窗口數據的打印;

我們輸入的第3~第5條數據在區間[00:00:30,00:00:35)這個范圍,此時只要我們把水印提高到2020-08-31 10:11:35,理論上就算觸發該窗口的執行,且該窗口size=3;

繼續輸入lisi,1598839905000(2020-08-31 10:11:45),此時打印出來:

結果如我們的所料,然后我們的代碼在窗口對時間做了排序,所以3條數據依次為:

* lisi,1598839892000       2020-08-31 10:11:32
* lisi,1598839893000       2020-08-31 10:11:33
* lisi,1598839894000       2020-08-31 10:11:34

最后再來梳理下這個例子;

第一步,我們會根據窗口的大小,按照系統時間划分,這個時間是系統時間,而不是我們的事件時間,這樣會把后面的每個事件時間依次划分到某個窗口去,這樣就有了窗口的邊界,最小值和最大值;

第二步,根據水印的計算規則(一般和我們的事件時間綁定),計算出水印超過了(或者等於)窗口的最大值,這時候就可以觸發該窗口所有的值的計算了。。

通過這種方式,隨着數據的進入,flink的水印一步步提高,然后一步步觸發窗口數據的執行,同時窗口的數據匯聚到一起,開發者可以保證數據的有序性,從而解決數據亂序的問題。

6,總結

Flink的水印,時間,窗口是一個比較難理解的概念,本文只是講了其中的一部分,許多細節還要讀者慢慢體會。這里我用了單線程來模擬,如果是多線程的情況下水印又將是如何的?遲到的數據會怎樣呢?還有諸多細節,有機會繼續分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM