[Flink] Flink的waterMark的通俗理解


導讀

Flink 為實時計算提供了三種時間,即事件時間(event time)、攝入時間(ingestion time)和處理時間(processing time)。

遇到的問題:

假設在一個5秒的Tumble窗口,有一個EventTime是 11秒的數據,在第16秒時候到來了。圖示第11秒的數據,在16秒到來了,如下圖:該如何處理遲到數據

undefined

什么是Watermark

Watermark的關鍵點:

  • 目的:處理EventTime 窗口計算
  • 本質:時間戳
  • 生成方式:Punctuated和Periodic(常用)
  • 特性:單調遞增

Watermark的產生方式

  • Punctuated

    數據流中每一個遞增的EventTime都會產生一個Watermark。

  • Periodic(推薦)

    周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。

Watermark解決的問題

上面的問題在於如何將遲來的EventTime 位11的元素正確處理?

當Watermark的時間戳等於Event中攜帶的EventTime時候,上面場景(Watermark=EventTime)的計算結果如下:

undefined

如果想正確處理遲來的數據可以定義Watermark生成策略為 Watermark = EventTime -5s, 如下:

undefined

WaterMark的例子

設置WaterMark步驟:

1.設置StreamTime Characteristic為Event Time,即設置流式時間窗口(也可以稱為流式時間特性)

2.創建的DataStreamSource調用assignTimestampsAndWatermarks方法,並設置WaterMark種類:AssignerWithPeriodicWatermarks / AssignerWithPunctuatedWatermarks

或者 實現AssignerWithPeriodicWatermarks接口 / 實現AssignerWithPunctuatedWatermarks接口

3.重寫getCurrentWatermark與extractTimestamp方法

getCurrentWatermark方法:獲取當前的水位線

extractTimestamp方法:提取數據流中的時間戳(必須顯式的指定數據中的Event Time)

實例

通過一段程序,實踐一下WaterMark的設定以及WaterMark的工作方式

數據示例

key + 時間戳

hello,1553503210000 

程序說明

1.使用Socket模擬接收數據

2.設置WaterMark

設置的邏輯:在第一條數據進來時,設置WaterMark為0,指定第一條數據的時間戳后,獲取該時間戳與當前 WaterMark的最大值,並將最大值設置為下一條數據的WaterMark,以此類推

3.進行map基礎轉換,將String轉換為Tuple2<String,String>

4.根據Key分組

5.使用滾動Event Time窗口,將5秒內的同組數據,進行Fold拼接輸出

代碼如下:

package waterMark;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


import javax.annotation.Nullable;

/**
 * waterMark實例
 *
 * @author lixiyan
 * @date 2019/10/22 4:45 PM
 */
public class MainWaterMark001 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator<String> dataStream = env.socketTextStream("localhost", 12345)
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
                    // 當前時間戳
                    long currentTimeStamp = 0L;
                    // 允許的遲到數據
                    long maxDelayAllowed = 0L;
                    // 當前水位線
                    long currentWaterMark;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp - maxDelayAllowed;
                        System.out.println("當前水位線:" + currentWaterMark);
                        return new Watermark(currentWaterMark);
                    }

                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr = s.split(",");
                        long timeStamp = Long.parseLong(arr[1]);
                        currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                        System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",水位線:" + currentWaterMark);
                        return timeStamp;
                    }
                });

        dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            }
        }).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold("Start:", new FoldFunction<Tuple2<String, String>, String>() {
                    @Override
                    public String fold(String s, Tuple2<String, String> o) throws Exception {
                        return s + " - " + o.f1;
                    }
                }).print();

        env.execute("MainWaterMark001");

    }
}

開啟9999端口,並輸入第一條數據:

hello,1553503185000

那么,我先假設后續的數據Event Time間隔為1秒,推斷一下WaterMark的設定,如下圖所示

1.第一條數據的Event Time為1553503185000,那么當前窗口時間為:1553503185000 -> 1553503189000,即下圖中紅色框線

2.第一條數據進來時,這條數據之前的WaterMark為0,當第一條數據已經進入后,指定Event Time位置,並與現在的WaterMark比較,將兩者中大的那個值設置為新的WaterMark,那么當前數據的WaterMark為1553503185000

3.第二條數據進來時,前一條數據的WaterMark為1553503185000,第二條數據的Event Time比之前的WaterMark大,於是更新WaterMark,將當前的WaterMark更新為1553503186000,但還沒到窗口觸發時間,不進行計算

4.后面幾個以此類推,直到Event Time為:1553503190000的數據進來的時候,前一條數據的WaterMark為1553503189000,於是更新當前的WaterMark為155350390000,Flink認為1553503190000之前的數據都已經到達,且達到了窗口的觸發條件,開始進行計算

undefined

根據上面的推斷,啟動程序驗證一下

先啟動監聽9999端口,再啟動Flink程序,並向端口監聽終端輸入以下內容:

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000

Flink輸出結果:

Key:hello,EventTime:1553503185000,水位線:0
Key:hello,EventTime:1553503186000,水位線:1553503185000
Key:hello,EventTime:1553503187000,水位線:1553503186000
Key:hello,EventTime:1553503188000,水位線:1553503187000
Key:hello,EventTime:1553503189000,水位線:1553503188000
Key:hello,EventTime:1553503190000,水位線:1553503189000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000

通過結果可以發現,Flink在指定WaterMark時,先調用extractTimestamp方法,再調用getCurrentWatermark方法, 所以打印信息中的WaterMark為上一條數據的WaterMark,並非當前的WaterMark

為了驗證這個結論,修改一下代碼:

@Nullable
@Override
public Watermark getCurrentWatermark() {
    currentWaterMark = currentTimeStamp - maxDelayAllowed;
    System.out.println("當前水位線:" + currentWaterMark);
    return new Watermark(currentWaterMark);
}

@Override
public long extractTimestamp(String s, long l) {
    String[] arr = s.split(",");
    long timeStamp = Long.parseLong(arr[1]);
    currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
    System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一條數據的水位線:" + currentWaterMark);
    return timeStamp;
}

在監聽終端輸入同一批數據:

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000

Flink輸出結果:

Key:hello,EventTime:1553503185000,前一條數據的水位線:0
當前水位線:1553503185000

Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000
當前水位線:1553503186000

Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000
當前水位線:1553503187000

Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000
當前水位線:1553503188000

Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000
當前水位線:1553503189000

Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000
當前水位線:1553503190000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000

通過上面的結果,驗證了之前的結論,在設置WaterMark方法中,先調用extractTimestamp方法,再調用getCurrentWatermark方法

數據亂序

上面的實例,Event Time是有序,現在來做一下數據亂序的場景模擬

啟動程序,在監聽終端中輸入如下數據:

其中,在觸發了了第一個窗口計算后,又來了兩條遲到數據hello,1553503187000,hello,1553503186000

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000
hello,1553503187000
hello,1553503186000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

Flink結果:

Key:hello,EventTime:1553503185000,前一條數據的水位線:0
當前水位線:1553503185000

Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000
當前水位線:1553503186000

Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000
當前水位線:1553503187000

Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000
當前水位線:1553503188000

Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000
當前水位線:1553503189000

Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000
當前水位線:1553503190000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000
當前水位線:1553503190000

Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503190000
當前水位線:1553503190000

Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503190000
當前水位線:1553503190000

Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503190000
當前水位線:1553503191000

Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503191000
當前水位線:1553503192000

Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503192000
當前水位線:1553503193000

Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503193000
當前水位線:1553503194000

Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503194000
當前水位線:1553503195000
2> Start: - 1553503190000 - 1553503191000 - 1553503192000 - 1553503193000 - 1553503194000

從結果中可以看到,在第二個窗口中,那兩條遲到數據並沒有進行處理,這個就是遲到丟棄。

亂序時間的設置:

為了解決上面的問題,我們允許Flink處理延遲以5秒內的遲到數據

修改最大亂序時間

long maxDelayAllowed = 5000l;

在監聽終端中,輸入數據

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000
hello,1553503187000
hello,1553503186000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

Flink輸出結果:

Key:hello,EventTime:1553503185000,前一條數據的水位線:-5000
當前水位線:1553503180000

Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503180000
當前水位線:1553503181000

Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503181000
當前水位線:1553503182000

Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503182000
當前水位線:1553503183000

Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503183000
當前水位線:1553503184000

Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503184000
當前水位線:1553503185000

Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503185000
當前水位線:1553503185000

Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000
當前水位線:1553503185000

Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503185000
當前水位線:1553503186000

Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503186000
當前水位線:1553503187000

Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503187000
當前水位線:1553503188000

Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503188000
當前水位線:1553503189000

Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503189000
當前水位線:1553503190000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000 - 1553503187000 - 1553503186000

可以看到,設置了最大允許亂序時間后,WaterMark要比原來低5秒,可以對延遲5秒內的數據進行處理,窗口的觸發條件也同樣會往后延遲

關於延遲時間,請結合業務場景進行設置

至此,WaterMark實例就寫完了

總結

一開始,你先不要把Windowing、WaterMark、Trigger三者混在一起去考慮最終輸出的結果是什么,建議獨立考慮清楚這三者都做了什么,以及三者之間的依賴關系是什么:

1、Windowing:就是負責該如何生成Window,比如Fixed Window、Slide Window,當你配置好生成Window的策略時,Window就會根據時間動態生成,最終得到一個一個的Window,包含一個時間范圍:[起始時間, 結束時間),它們是一個一個受限於該時間范圍的事件記錄的容器,每個Window會收集一堆記錄,滿足指定條件會觸發Window內事件記錄集合的計算處理。

2、WaterMark:它其實不太好理解,可以將它定義為一個函數E=f(P),當前處理系統的處理時間P,根據一定的策略f會映射到一個事件時間E,可見E在坐標系中的表現形式是一條曲線,根據f的不同曲線形狀也不同。假設,處理時間12:00:00,我希望映射到事件時間11:59:30,這時對於延遲30秒以內(事件時范圍11:59:30~12:00:00)的事件記錄到達處理系統,都指派到時間范圍包含處理時間12:00:00這個Window中。事件時間超過12:00:00的就會由Trigger去做補償了。

3、Trigger:為了滿足實際不同的業務需求,對上述事件記錄指派給Window未能達到實際效果,而做出的一種補償,比如事件記錄在WaterMark時間戳之后到達事件處理系統,因為已經在對應的Window時間范圍之后,我有很多選擇:選擇丟棄,選擇是滿足延遲3秒后還是指派給該Window,選擇只接受對應的Window時間范圍之后的5個事件記錄,等等,這都是滿足業務需要而制定的觸發Window重新計算的策略,所以非常靈活。

本文由博客群發一文多發等運營工具平台 OpenWrite 發布


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM