Flink系列之Time和WaterMark


  當數據進入Flink的時候,數據需要帶入相應的時間,根據相應的時間進行處理。

  讓咱們想象一個場景,有一個隊列,分別帶着指定的時間,那么處理的時候,需要根據相應的時間進行處理,比如:統計最近五分鍾的訪問量,那么就需要知道數據到來的時間。五分鍾以內的數據將會被計入,超過五分鍾的將會計入下一個計算窗口。

  那么Flink的Time分為三種:

  ProcessingTime : 處理時間,即算子處理數據的機器產生的時間,數據的流程一般是Source -> Transform (Operator,即算子) -> Sink(保存數據)。ProcessingTime出現在Transform,算子計算的點。這個比較好生成,數據一旦到了算子就會生成。如果在分布式系統中或異步系統中,對於一個消息系統,有的機器處理的塊,有的機器消費的慢,算子和算子之間的處理速度,還有可能有些機器出故障了,ProcessingTime將不能根據時間產生決定數據的結果,因為什么時候到了算子才會生成這個時間。

  EventTime : 事件時間,此事件一般是產生數據的源頭生成的。帶着event time的事件進入flink的source之后就可以把事件事件進行提取,提取出來之后可以根據這個時間處理需要一致性和決定性的結果。比如,計算一個小時或者五分鍾內的數據訪問量。當然數據的EventTime可以是有序的,也可以是無序的。有序的數據大家比較好理解,比如,第一秒到第一條,第二秒到第二條數據。無序的數據,舉個例子要計算五秒的數據,假如現在為10:00:00, 那么數據EventTime在[10:00:00 10:00:05), [10:00:05 10:00:10),加入一條數據是04秒產生的,那么由於機器處理的慢,該數據在08秒的時候到了,這個時候我們理解該數據就是無序的。可以通過WaterMark的機制處理無序的數據,一會兒咱們在文章中繼續解釋。

  IngestionTime : 攝入時間,即數據進入Flink的Source的時候計入的時間。相對於以上兩個時間,IngestionTime 介於 ProcessingTime 和 EventTime之間,相比於ProcessingTime,生成的更加方便快捷,ProcessingTime每次進入一個Operator(算子,即map、flatMap、reduce等)都會產生一個時間,而IngestionTime在進入Flink的時候就產生了timestamp。相比於eventTime,它不能處理無序的事件,因為每次進入source產生的時間都是有序的,IngestionTime也無須產生WaterMark,因為會自動生成。

  如果大家還不是特別理解的話,咱們從官網拿一張圖來展示,這個會比較一目了然。

 

 

 

   Event Producer 產生數據,這個時候就帶上EventTime了,這個時候比如用戶訪問的記錄,訪問的時間就是EventTime。然后放入了MessageQueue-消息隊列,進入到Flink Source的時候可以生成IngetionTime,也就是被Flink "吞" 進入時的時間,可以這么理解一下。然后由Source再進入到Operator-算子,也就是將數據進行轉換,如Map, FlatMap等操作,這個時候每進入一個Operator都會生成一個時間即ProcessingTime。

  IngestionTime和ProcessingTime都是生成的,所以時間是升序的,里邊的時間戳timestamp和水位線Watermark都是自動生成的,所以不用考慮這個。而EventTime與其他兩個有些差異,它可以是升序的,也可以不是無序的。

  假如一個消息隊列來了帶着事件時間,時間為: 1, 2,3,4, 5。 這個加入是數據過來的時間順序,如果需要統計2秒之間的數據的話,那么就會產生的窗口數據為[1,2], [3,4] [5],這個有序時間。

  多數情況下,從消息隊列過來的數據一般時間一般沒有順序。比如過來的數據事件時間為 1,3,2,4,5,那么我們想要正確2秒的數據,我們就需要引入Watermark, 水位線一說,這個水位線的含義就是當數據達到了這個水位線的時候才觸發真正的數據統計,對於窗口來講,也就是將窗口進行關閉然后進行統計。假如我們允許最大的延遲時間為1秒,那么這些數據將會分成:

  1, 3, 2 | 水位線 |  4,5 | 水位線 |

  1 -> 分到1-2的窗口中。

  3 -> 新創建一個窗口(3-4),然后分到3-4的窗口中。

  2 -> 分到1-2的窗口看。

  水位線 -> 進行窗口統計和數據匯總。

  4 -> 分到3-4的窗口。

  5 -> 新建一個窗口(5-6),然后分配到新窗口中。

  不知道這樣寫大家能不能理解呢,如果覺得有問題的話可以給我留言。

  上面的這樣是延遲數據的處理機制,當然還有並行流處理的情況,這種情況下有的數據慢,有的數據快,那么eventTime小的數據會先流到下一個算子上,下面事件時間14和29在到window的時候,那么14會先流到window進行處理,

  在Source之后會產生對應的watermark,不同的source接入不同的數據將會分別產生對應的watermark,當然watermark也需要遵循着從小到大進行觸發,保證數據的正確處理。

 

 

 

 

 

 

 

 

 

 

 

 

 

  Watermark的設定:

  一種是Punctuated Watermark, 翻譯過來應該是“間斷的水位線”,咱們來看下原文

  To generate watermarks whenever a certain event indicates that a new watermark might be generated, use AssignerWithPunctuatedWatermarks. For this class Flink will first call the extractTimestamp(...) method to assign the element a timestamp, and then immediately call the checkAndGetNextWatermark(...) method on that element.

  如果數據是間斷性的,那么可以使用這個作為產生watermark的方式。如果一直有數據且EventTime是遞增的話,那么每條數據就會產生一條數據,這種情況下會對系統造成負載,所以連續產生數據的情況下使用這種不合適。這個方法首先調用的是extractTimestamp用於抽取時間戳,checkAndGetNextWatermark用於檢查和生成下一個水位線。

  

  第二種是Periodic Watermark,翻譯過來是“周期性水位線”,看下原文

  AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time).

  周期性的獲取timestamp和生成watermark。可以依賴流元素的時間,比如EventTime或者ProcessingTime。這個接口先調用extractTimestamp方法獲取timestamp,接着調用getCurrentWatermark生成相應的時間戳。

  

  這種周期性水位線有如下三種實現:

  1)AscendingTimestampExtractor,如果數據產生的時間是升序的,可以使用這個實現獲取timestamp和生成watermark。這種情況下,如果有數據升序中有小於當前時間戳的事件時,比如1,2,3,2,4,在這種情況下數據2將會丟失。丟失的數據可以通過sideOutputLateData獲取到。

  2)BoundedOutOfOrdernessTimestampExtractor,如果數據是無需的,可以使用這個實現,指定相應的延遲時間。

  3)IngestionTimeExtractor, 這個是當指定時間特性為IngestionTime時,直接生成時間戳和獲取水印。

  

  下面寫一個例子,進一步加深理解。以下是通過建立一個socket服務端,通過數據數據進行數據展示,數據分為word和時間戳來演示,首先指定時間特性為EventTime,默認的時間特性為ProcessingTime。將單詞和時間戳進行解析拆分進行FlatMap進行數據解析成WordCount類,分配時間戳和生成水印,按word字段進行拆分,統計5秒鍾的滾動窗口數據做reduce,最后是打印和輸出。

package com.hqs.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

/**
 * @author huangqingshi
 * @Date 2020-01-11
 */
public class SocketEventTime {


    public static void main(String[] args) throws Exception{
        //創建env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置流的時間特性,
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設置並行度
        env.setParallelism(1);
        //設置監聽localhost:9000端口,以回車分割
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

        DataStream<SocketWindowCount.WordCount> wordCountStream = text.
                flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {
                    @Override
                    public void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {
                        String[] args = value.split(",");
                        out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));
                    }
                }).


                assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SocketWindowCount.WordCount>() {



                    long currentTimeStamp = 0L;
                    //允許的最大延遲時間,單位為毫秒
                    long maxDelayAllowed = 0L;
                    long currentWaterMark;


                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp - maxDelayAllowed;
//                        System.out.println("當前waterMark:" + currentWaterMark);
                        return new Watermark(currentWaterMark);
                    }

                    @Override
                    public long extractTimestamp(SocketWindowCount.WordCount wordCount, long l) {

                        long timeStamp = Long.parseLong(wordCount.timestamp);
                        currentTimeStamp = Math.max(currentTimeStamp, timeStamp);

                        System.out.println("Key:" + wordCount.word + ",EventTime:" + timeStamp + ",前一條數據的水位線:" + currentWaterMark
                                + ",當前水位線:" + (currentTimeStamp - maxDelayAllowed));
                        return timeStamp;
                    }


                });

        DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.
                keyBy("word").
        window(TumblingEventTimeWindows.of(Time.seconds(5))).


                        reduce(new ReduceFunction<SocketWindowCount.WordCount>() {
                            @Override
                            public SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {

//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);
                                t1.timestamp = wordCount.timestamp + "," + t1.timestamp;
                                return t1;
                            }
                        });

        //將結果集進行打印
        windowsCounts.print();

        //提交所設置的執行
        env.execute("EventTime Example");

    }


    public static class WordCount {

        public String word;
        public String timestamp;

        public static SocketWindowCount.WordCount of(String word, String timestamp) {
            SocketWindowCount.WordCount wordCount = new SocketWindowCount.WordCount();
            wordCount.word = word;
            wordCount.timestamp = timestamp;
            return wordCount;
        }

        @Override
        public String toString() {
            return "word:" + word + " timestamp:" + timestamp;
        }
    }


}

  使用nc命令建立一個socket連接並且輸入數據,前邊為單詞,后邊為timestamp時間戳,大家可以轉換為時間:      

huangqingshideMacBook-Pro:~ huangqingshi$ nc -lk 9000
hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000 hello,1553503187000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

  輸出的結果如下,從上邊我們看到最大延遲時間maxDelayAllowed為0秒,也就意味着采用升序的獲取,等於使用AscendingTimestampExtractor,每來一條數據即生成一個時間戳和水位。因為中間有一條數據為155350318700,小於上邊的數據,所以這條數據丟失了。當5秒的時候觸發一個window時間,即數據的結果輸出。

Key:hello,EventTime:1553503185000,前一條數據的水位線:0,當前水位線:1553503185000
Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503185000,當前水位線:1553503186000
Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503186000,當前水位線:1553503187000
Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503187000,當前水位線:1553503188000
Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503188000,當前水位線:1553503189000
Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503189000,當前水位線:1553503190000
word:hello timestamp:1553503185000,1553503186000,1553503187000,1553503188000,1553503189000
Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503190000,當前水位線:1553503190000
Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503190000,當前水位線:1553503191000
Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503191000,當前水位線:1553503192000
Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503192000,當前水位線:1553503193000
Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503193000,當前水位線:1553503194000
Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503194000,當前水位線:1553503195000
word:hello timestamp:1553503190000,1553503191000,1553503192000,1553503193000,1553503194000

  下面咱們調整下最大延遲時間代碼:

//允許的最大延遲時間,單位為毫秒
long maxDelayAllowed = 5000L;

  咱們來看下輸出的結果,這次數據有了上邊丟失的數據了。

Key:hello,EventTime:1553503185000,前一條數據的水位線:-5000,當前水位線:1553503180000
Key:hello,EventTime:1553503186000,前一條數據的水位線:1553503180000,當前水位線:1553503181000
Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503181000,當前水位線:1553503182000
Key:hello,EventTime:1553503188000,前一條數據的水位線:1553503182000,當前水位線:1553503183000
Key:hello,EventTime:1553503189000,前一條數據的水位線:1553503183000,當前水位線:1553503184000
Key:hello,EventTime:1553503190000,前一條數據的水位線:1553503184000,當前水位線:1553503185000
Key:hello,EventTime:1553503187000,前一條數據的水位線:1553503185000,當前水位線:1553503185000
Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503185000,當前水位線:1553503186000
Key:hello,EventTime:1553503191000,前一條數據的水位線:1553503186000,當前水位線:1553503186000
Key:hello,EventTime:1553503192000,前一條數據的水位線:1553503186000,當前水位線:1553503187000
Key:hello,EventTime:1553503193000,前一條數據的水位線:1553503187000,當前水位線:1553503188000
Key:hello,EventTime:1553503194000,前一條數據的水位線:1553503188000,當前水位線:1553503189000
Key:hello,EventTime:1553503195000,前一條數據的水位線:1553503189000,當前水位線:1553503190000
word:hello timestamp:1553503185000,1553503186000,1553503187000,1553503188000,1553503189000,1553503187000

  下面咱們來分析下上面的結果,第一條數據的時間為45秒整,上邊的數據基本上是連續的,只有一條數據 1553503187000為47秒的時候出現了亂序中。再來回憶一下上邊的代碼,上邊的數據延遲為5秒,統計的數據為5秒的滾動窗口的數據,將時間戳合起來。

  那么第一個匯總的窗口為[2019-03-25 16:39:45 2019-03-25 16:39:50),那么數據在什么時間觸發窗口呢,也就是在輸入1553503195000的時候進行的窗口匯總, 這條數據的時間為2019-03-25 16:39:55,水位線為2019-03-25 16:39:50,由此我們得出結論:

  當統計時間window窗口中有數據的時候,watermark時間 >= 窗口的結束時間時進行觸發。

  如果想使用IngestionTime設置為時間特性的話,只需要更改幾行代碼即可。  

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<SocketWindowCount.WordCount> wordCountStream = text.
                flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {
                    @Override
                    public void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {
                        String[] args = value.split(",");
                        out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));
                    }
                }).


                assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

 DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.
                keyBy("word").
                timeWindow(Time.seconds(5L)).


                reduce(new ReduceFunction<SocketWindowCount.WordCount>() {
                    @Override
                    public SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {

//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);
                        t1.timestamp = wordCount.timestamp + "," + t1.timestamp;
                        return t1;
                    }
                });

  如果要使用ProcessingTime,同理把時間特性改一下即可。完整的代碼如下,紅色的代碼為改變的代碼。

package com.hqs.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.sql.Timestamp;

/**
 * @author huangqingshi
 * @Date 2020-01-11
 */
public class SocketIngestionTime {

    public static void main(String[] args) throws Exception {
        //創建env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置流的時間特性,
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //設置並行度
        env.setParallelism(1);
        //設置監聽localhost:9000端口,以回車分割
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

        DataStream<SocketWindowCount.WordCount> wordCountStream = text.
                flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {
                    @Override
                    public void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {
                        String[] args = value.split(",");
                        out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));
                    }
                });

        DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.
                keyBy("word").
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).


                reduce(new ReduceFunction<SocketWindowCount.WordCount>() {
                    @Override
                    public SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {

//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);
                        t1.timestamp = wordCount.timestamp + "," + t1.timestamp;
                        return t1;
                    }
                });

        //將結果集進行打印
        windowsCounts.print();

        //提交所設置的執行
        env.execute("EventTime Example");

    }


    public static class WordCount {

        public String word;
        public String timestamp;

        public static SocketWindowCount.WordCount of(String word, String timestamp) {
            SocketWindowCount.WordCount wordCount = new SocketWindowCount.WordCount();
            wordCount.word = word;
            wordCount.timestamp = timestamp;
            return wordCount;
        }

        @Override
        public String toString() {
            return "word:" + word + " timestamp:" + timestamp;
        }
    }
}

  好了,如果有什么問題,可以留言或加我微信與我聯系。

 

 

  

  

  

 

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM