事件時間(event time)與水印(watermark)


  1. 事件時間和水印誕生的背景

    • 在實際的流式計算中數據到來的順序對計算結果的正確性有至關重要的影響

    • 比如:某數據源中的某些數據由於某種原因(如:網絡原因,外部存儲自身原因)會有2秒的延時,也就是在實際時間的第1秒產生的數據有可能在第3秒中產生的數據之后到來。

    • 假設在一個5秒的滾動窗口中,有一個EventTime是 9秒的數據,在第11秒時候到來了。

    • 圖示:

      image-20191113120336652

      • 那么對於一個Count聚合的Tumble(5s)的window,上面的情況如何處理才能window3=3,window2=3 呢?
  2. 時間類型

    • Flink支持不同的時間概念

      image-20191113125554399

    • Processing Time(處理時間)

      • 處理時間是指當前機器處理該條事件的時間。
      • 它是當數據流入到具體某個算子時候相應的系統。
      • 他提供了最小的延時和最佳的性能。
      • 但是在分布式和異步環境中, 處理時間不能提供確定性。
      • 因為其對時間到達 系統的速度和數據流在系統的各個operator 之間處理的速度很銘感。
    • Event Time(事件時間)

      • 事件時間是每個事件在其生產設備上發生的時間。
      • 此時間通常在進入Flink之前嵌入到記錄中,並且可以從每個記錄中提取該事件時間戳。
      • 事件時間對於亂序、延時、或者數據重放等情況,都能給出正確的結果。
      • 事件時間依賴於事件本身,而跟物理時鍾沒有關系。
      • 基於事件時間的程序必須指定如何生成事件時間水印(watermark),這是指示事件時間進度的機制。
      • 事件時間處理通常存在一定的延時,因此需要為延時和無序的事件等待一段時間。
      • 因此,使用事件時間編程通常需要與處理時間相結合。
    • Ingestion Time(攝入時間)

      • 攝入時間是數據進入Flink框架的時間,是在Source Operator中設置的
      • 與ProcessingTime相比可以提供更可預測的結果,因為攝入時間的時間戳比較穩定(在源處只記錄一次)
      • 同一數據在流經不同窗口操作時將使用相同的時間戳
      • 而對於ProcessingTime同一數據在流經不同窗口算子會有不同的處理時間戳
    • Process time 與 Event time對比:

      image-20191113130424996

      • 如上圖所示,在一個亂序的數據流里,使用event time類型的事件時間,可以保證數據流的順序性。
    • 設置時間特行

      • Flink程序的第一部分工作通常是設置時間特性,該設置用於定義數據源使用什么時間,在時間窗口處理中使用什么時間。

      • 代碼:

        // 設置執行環境, 類似spark中初始化SparkContext
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
                env.setParallelism(1);
        
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                
                // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                
                // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        
  3. Watermark (水印)

    • WaterMark 產生背景

      • 流處理從事件產生,到數據流經source,再到operator,中間是有一個過程和時間的。
      • 雖然大部分情況下,數據流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、背壓等原因,導致亂序的產生(out-of-order或者說late element)。
      • 但是對於late element(延遲數據),我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。
      • 這個特別的機制,就是watermark。
    • WaterMark 介紹

      • Watermark是Flink為了處理EventTime時間類型的窗口計算提出的一種機制, 本質上也是一種時間戳。
      • Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。
      • 當operator通過基於Event Time的時間窗口來處理數據時,它必須在確定所有屬於該時間窗口的消息全部流入此操作符后,才能開始處理數據。
      • 但是由於消息可能是亂序的,所以operator無法直接確認何時所有屬於該時間窗口的消息全部流入此操作符。
      • WaterMark包含一個時間戳,Flink使用WaterMark標記所有小於該時間戳的消息都已流入
      • Flink的數據源在確認所有小於某個時間戳的消息都已輸出到Flink流處理系統后,會生成一個包含該時間戳的WaterMark,插入到消息流中輸出到Flink流處理系統中,Flink operator算子按照時間窗口緩存所有流入的消息。
      • 當操作符處理到WaterMark時,它對所有小於該WaterMark時間戳的時間窗口的數據進行處理並發送到下一個操作符節點,然后也將WaterMark發送到下一個操作符節點。

      image-20191113131949588

    • WaterMark 的產生方式

      • Punctuated
        • 數據流中每一個遞增的EventTime都會產生一個Watermark。
        • 在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
      • Periodic
        • 周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。
        • 在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性產生Watermark,否則在極端情況下會有很大的延時。
    • 代碼:

      package com.ronnie.flink.stream.test;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
      import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
      import org.apache.flink.streaming.api.watermark.Watermark;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
      import org.apache.flink.util.Collector;
      
      import javax.annotation.Nullable;
      import java.text.ParseException;
      import java.text.SimpleDateFormat;
      
      /**
       *
       hello,2019-09-17 11:34:05.890
       hello,2019-09-17 11:34:07.890
       hello,2019-09-17 11:34:13.890
       hello,2019-09-17 11:34:08.890
       hello,2019-09-17 11:34:16.890
       hello,2019-09-17 11:34:19.890
       hello,2019-09-17 11:34:21.890
       */
      public class WaterMarkTest {
          public static void main(String[] args) {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
              env.setParallelism(1);
      
           // 設置多久查看一下當前的水位線... 默認200ms
              env.getConfig().setAutoWatermarkInterval(10000);
      
              System.err.println("interval : " + env.getConfig().getAutoWatermarkInterval());
      
              DataStreamSource<String> streamSource = env.socketTextStream("ronnie01", 9999);
      
              SingleOutputStreamOperator<String> watermarks = streamSource.assignTimestampsAndWatermarks(new MyWaterMark());
      
              watermarks.map(new MapFunction<String, Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> map(String value) throws Exception {
                      String[] split = value.split(",");
                      String key = split[0];
      
                      return new Tuple2<String, Integer>(key, 1);
                  }
              }).keyBy(0)
                      .timeWindow(Time.seconds(10))
                       // 自定義的一個計算規則......
                      .apply(new MyWindowFunction())
                      .printToErr();
      
              try {
                  env.execute();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      class MyWaterMark implements AssignerWithPeriodicWatermarks<String>{
      
          // 目前系統里所有數據的最大事件時間
          long currentMaxTimeStamp = 0;
          // 允許數據延遲5s
          long maxLateTime = 5000;
      
          Watermark wm = null;
      
          SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      
          @Nullable
          @Override
          // 周期性地獲取目前的水位線時間, 默認200ms
          public Watermark getCurrentWatermark() {
              // 未處理的延遲/亂序問題
             // wm = new Watermark(currentMaxTimeStamp);
      
              // 處理數據的延遲/亂序問題
              wm = new Watermark(currentMaxTimeStamp - maxLateTime);
              System.out.println(format.format(System.currentTimeMillis()) + " 獲取當前水位線: " + wm + ","+ format.format(wm.getTimestamp()));
              return wm;
          }
      
      
          @Override
          public long extractTimestamp(String element, long previousElementTimestamp) {
              String[] split = element.split(",");
      
              String key = split[0];
      
              long timestamp = 0;
      
              try {
                  //將2019-09-17 10:24:50.958 格式時間轉成時間戳
                  timestamp = format.parse(split[1]).getTime();
              } catch (ParseException e) {
                  e.printStackTrace();
              }
      
              // 對比新數據的時間戳和目前最大的時間戳, 取大的值作為新的時間戳
              currentMaxTimeStamp= Math.max(timestamp, currentMaxTimeStamp);
      
              System.err.println(key +", 本條數據的時間戳: "+ timestamp + "," +format.format(timestamp)
                      + "|目前數據中的最大時間戳: "+  currentMaxTimeStamp + ","+ format.format(currentMaxTimeStamp)
                      + "|水位線時間戳: "+ wm + ","+ format.format(wm.getTimestamp()));
      
              return timestamp;
          }
      }
      
      class MyWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>{
      
          @Override
          public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
              SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      
              int sum = 0;
      
              for (Tuple2<String, Integer> tuple2:input){
               sum += tuple2.f1;
              }
              long start = window.getStart();
              long end = window.getEnd();
      
              out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
                      + format.format(start) + "  window_end :" + format.format(end)
              );
          }
      }
      
      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM