Apache Flink 如何正確處理實時計算場景中的亂序數據


一、流式計算的未來

在谷歌發表了 GFS、BigTable、Google MapReduce 三篇論文后,大數據技術真正有了第一次飛躍,Hadoop 生態系統逐漸發展起來。

Hadoop 在處理大批量數據時表現非常好,主要有以下特點:

1、計算開始之前,數據必須提前准備好,然后才可以開始計算;

2、當大量數據計算完成之后,會輸出最后計算結果,完成計算;

3、時效性比較低,不適用於實時計算;

而隨着實時推薦、風控等業務的發展,數據處理時延要求越來越高,實時性要求也越來越高,Flink 開始在社區嶄露頭角。

Apache Flink 作為一款真正的流處理框架,具有較低的延遲性,能夠保證消息傳輸不丟失不重復,具有非常高的吞吐,支持原生的流處理。

本文主要介紹 Flink 的時間概念、窗口計算以及 Flink 是如何處理窗口中的亂序數據。

在 Flink 中主要有三種時間概念:

(1)事件產生的時間,叫做 Event Time;

(2)數據接入到 Flink 的時間,叫做 Ingestion Time;

(3)數據在 Flink 系統里被操作時機器的系統時間,叫做 Processing Time

處理時間是一種比較簡單的時間概念,不需要流和系統之間進行協調,可以提供最佳的性能和最低的延遲。但是在分布式環境中,多台機器的處理時間無法做到嚴格一致,無法提供確定性的保障。

而事件時間是事件產生的時間,在進入到 Flink 系統的時候,已經在 record 中進行記錄,可以通過用提取事件時間戳的方式,保證在處理過程中,反映事件發生的先后關系。

file

file

我們知道流式數據集是沒有邊界的,數據會源源不斷的發送到我們的系統中。

流式計算最終的目的是去統計數據產生匯總結果的,而在無界數據集上,如果做一個全局的窗口統計,是不現實的。

只有去划定一定大小的窗口范圍去做計算,才能最終匯總到下游的系統中,用來分析和展示。
file

在 Flink 進行窗口計算的時候,需要去知道兩個核心的信息:

  • 每個 Element 的 EventTime 時間戳?(在數據記錄中指定即可)
  • 接入的數據,何時可以觸發統計計算 ? (窗口 11:00 ~ 11:10 的數據全部被接收完)

有序事件

假設在完美的條件下,數據都是嚴格有序,那么此時,流式計算引擎是可以正確計算出每個窗口的數據的

file

無序事件

但是現實中,數據可能會因為各種各樣的原因(系統延遲,網絡延遲等)不是嚴格有序到達系統,甚至有的數據還會遲到很久,此時 Flink 需要有一種機制,允許數據可以在一定范圍內亂序。

這種機制就是水印。

file

如上面,有一個參數: MaxOutOfOrderness = 4,為最大亂序時間,意思是可以允許數據在多少范圍內亂序,可以是 4 分鍾,4 個小時 等。

水印的生成策略是,當前窗口最大事件時間戳減去 MaxOutOfOrderness 的值。

如上圖,事件 7 會產生一個 w(3) 的水印,事件 11 會產生要給 w(7) 的水印,但是事件 9 ,是小於事件 11 的,此時不會觸發水印的更新。事件 15 會產生一個 w(11) 的水印。 也就是說,水印反映了事件的整體流轉的趨勢,只會上升,不會下降。

水印表示了所有小於水印值的事件都已經到達了窗口。

每當有新的最大時間戳出現時,就會產生新的 watermark

遲到事件

對於事件時間小於水印時間的事件,稱為遲到事件。遲到事件是不會被納入窗口統計的。

如下圖,21 的事件進入系統之后,會產生 w(17) 的水印。而后來的 16 事件,由於小於當前水印時間 w(17),是不會被統計的了。

file

何時觸發計算

我們用一個圖來展示何時會觸發窗口的計算

如下圖,表示一個 11:50 到 12:00 的窗口,此時有一條數據, cat,11:55,事件時間是 11:55,在窗口中,最大延遲時間是 5 分鍾,所以當前水印時間是 11:50

file

此時又來了一條數據,dog,11:59,事件時間是 11:59,進入到了窗口中。

由於這個事件時間比上次的事件時間大,所以水印被更新成 11:54。此時由於水印時間仍然小於窗口結束時間,所以仍然沒有觸發計算。

file

又來了一條數據, cow,12:06,此時水印時間被更新到了 12:01 ,已經大於了窗口結束時間,此時觸發了窗口計算(假設計算邏輯就是統計窗口內不同元素的個數)。

file

假設又來了一條事件,是 dog,11:58,由於它已經小於了水印時間,並且在上次觸發窗口計算之后,窗口已經被銷毀,所以,這條事件是不會被觸發計算的了。

此時,可以這個事件放到 sideoutput 隊列中,額外邏輯處理。

file

所以在 1.11 版本中,重構了水印生成接口。新版本中,主要通過 WatermarkStrategy 類,來使用不同的策略生成水印。

新的接口提供了很多靜態的方法和帶有缺省實現的方法,如果想自己定義生成策略,可以實現這個方法:

file

生成一個 WatermarkGenerator

file

這個類也很簡單明了

  • onEvent:如果我們想依賴每個元素生成一個水印發射到下游,可以實現這個方法;
  • OnPeriodicEmit:如果數據量比較大的時候,我們每條數據都生成一個水印的話,會影響性能,所以這里還有一個周期性生成水印的方法。

為了方便開發,Flink 還提供了一些內置的水印生成方法供我們使用

  • 固定延遲生成水印
    我們想生成一個延遲 3 s 的固定水印,可以這樣做
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
  • 單調遞增生成水印
    相當於上述的延遲策略去掉了延遲時間,以 event 中的時間戳充當了水印,可以這樣使用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

五、一個簡單的小例子,來統計窗口中字母出現的次數

public class StreamTest1 {


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public static class MyLog {
        private String msg;
        private Integer cnt;
        private long timestamp;
    }

    public static class MySourceFunction implements SourceFunction<MyLog> {

        private boolean running = true;

        @Override
        public void run(SourceContext<MyLog> ctx) throws Exception {
            while (true) {
                Thread.sleep(1000);
                ctx.collect(new MyLog(RandomUtil.randomString(1),1,System.currentTimeMillis()));
            }
        }
        @Override
        public void cancel() {
            this.running = false;
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 數據源使用自定義數據源,每1s發送一條隨機消息
        env.addSource(new MySourceFunction())
                // 指定水印生成策略是,最大事件時間減去 5s,指定事件時間字段為 timestamp
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.
                                <MyLog>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((event,timestamp)->event.timestamp))
                // 按 消息分組
                .keyBy((event)->event.msg)
                // 定義一個10s的時間窗口
                .timeWindow(Time.seconds(10))
                // 統計消息出現的次數
                .sum("cnt")
                // 打印輸出
                .print();

        env.execute("log_window_cnt");
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM