Flink的時間類型和watermark機制


一FlinkTime類型

    有3類時間,分別是數據本身的產生時間、進入Flink系統的時間和被處理的時間,在Flink系統中的數據可以有三種時間屬性:

Event Time 是每條數據在其生產設備上發生的時間。這段時間通常嵌入在記錄數據中,然后進入Flink,可以從記錄中提取事件的時間戳;Event Time即使在數據發生亂序,延遲或者從備份或持久性日志中重新獲取數據的情況下,也能提供正確的結果。這個時間是最有價值的,和掛在任何電腦/操作系統的時鍾時間無關。

Processing Time 是指執行相應操作的機器的系統時間。如果流計算系統基於Processing Time來處理,對流處理系統來說是最簡單的,所有基於時間的操作(如Time Window)將使用運行相應算子的機器的系統時鍾。然而,在分布式和異步環境中,Processing Time並不能保證確定性,它容易受到Event到達系統的速度(例如來自消息隊列)以及數據在Flink系統內部處理的先后順序的影響,所以Processing Time不能准確地反應數據發生的時間序列情況。

Ingestion Time是事件進入Flink的時間。 在Source算子處產生,也就是在Source處獲取到這個數據的時間,Ingestion Time在概念上位於Event Time和Processing Time之間。在Source處獲取數據的時間,不受Flink分布式系統內部處理Event的先后順序和數據傳輸的影響,相對穩定一些,但是Ingestion Time和Processing Time一樣,不能准確地反應數據發生的時間序列情況。

二 Watermark機制

上面提到Event Time是最能反映數據時間屬性的,但是Event Time可能會發生延遲或亂序,Flink系統本身只能逐個處理數據,如何應對Event Time可能會發生延遲或亂序情況呢?

比如需要統計從10:00到11:00發生某個事件的次數,也就是對Event Time是在10:00和11:00之間的數據統計個數。Event Time可能會發生延遲或亂序的情況下,Flink系統怎么判斷10:00到11:00發生的事件數據都已到達,可以給出統計結果了呢?長時間地等待會推遲結果輸出時間,而且占用更多系統資源。

Watermark是一個對Event Time的標識,內容方面Watermark是個時間戳,一個帶有時間戳X的Watermark到達,相當於告訴Flink系統,任何Event Time小於X的數據都已到達。比如上面的例子,如果Flink收到一個時間戳是11:01的Watermark,它就可以把之前統計的Event Time在[10:00,11:01)之間的事件個數輸出,清空相關被占用的資源。這里需要注意窗口的長度問題,只有窗口采集完成的數據,才會統計。

三 Watermark生成

Periodic - 一定時間間隔或者達到一定的記錄條數會產生一個watermark。

Punctuated – 基於event time通過一定的邏輯產生watermark,比如收到一個數據就產生一個WaterMark,時間是event time - 5秒。

這兩種產生方式,都有機制來保證產生的watermark是單調遞增的。

即使有了watermark,如果現實中,數據沒有滿足watermark所保證的條件怎么辦?比如Flink處理了11:01的watermark,但是之后遇到了event time是10:00~11:00之間的數據怎么辦?首先如果這種事情出現的概率非常小,不影響所要求的准確度,可以直接把數據丟棄;如果這種事情出現的概率比較大,就要調整產生water mark的機制了。

除了把違反watermark機制的數據丟棄,也有不丟棄的處理方法,比如通過一些機制來更新之前統計的結果,這種方式會有一定的性能開銷。

四代碼示例

package org.tonny.flink.bi.job.water;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.StringUtils;

/**
 *
在指定的linux機器上開啟nc -l 9900
 *
輸入的數據格式:
 
* hello1 1567059808519
 * hello2 1567059809519
 * hello3 1567059810519
 */
public class WaterMarkJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();//關閉日志打印
       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //設置時間分配器

       
env.setParallelism(1);  //設置並行度
       
env.getConfig().setAutoWatermarkInterval(3000);//9秒發出一個watermark

       
DataStream<String> text = env.socketTextStream("localhost", 9900);

        DataStream<Tuple3<String, Long, Integer>> counts = text
                // 設置過濾
               
.filter(new FilterClass())
                // 設置分詞
               
.map(new LineSplitter())
                //設置watermark方法
               
.assignTimestampsAndWatermarks(new PeriodicWatermarks())
                .keyBy(0)
                //設置滾動窗口大小
               
.timeWindow(Time.seconds(60))
                .sum(2);

        counts.print();
        env.execute("Window WordCount");

    }

    public static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>> {
        private long currentMaxTimestamp = 0L;

        private final long maxOutOfOrderness = 10000L;   //這個控制失序已經延遲的度量,時間戳10秒以前的數據

       
//獲取EventTime
        
@Override
        public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) {
            if (element == null) {
                return currentMaxTimestamp;
            }

            long timestamp = element.f1;
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            System.out.println("get timestamp is " + timestamp + " currentMaxTimestamp " + currentMaxTimestamp);
            return timestamp;
        }

        //獲取Watermark
       
@Override
        public Watermark getCurrentWatermark() {
            System.out.println("wall clock is " + System.currentTimeMillis() + " new watermark " + (currentMaxTimestamp - maxOutOfOrderness));
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }

    //構造出element以及它的event time.然后把次數賦值為1
   
public static final class LineSplitter implements MapFunction<String, Tuple3<String, Long, Integer>> {
        @Override
        public Tuple3<String, Long, Integer> map(String value) throws Exception {
            if (org.apache.commons.lang3.StringUtils.isBlank(value)) {
                return null;
            }

            String[] tokens = value.toLowerCase().split("\\W+");
            if (ArrayUtils.isEmpty(tokens) || ArrayUtils.getLength(tokens) < 2) {
                return null;
            }
            long eventtime = 0L;
            try {
                eventtime = Long.parseLong(tokens[1]);
            } catch (NumberFormatException e) {
                return null;
            }
            return new Tuple3<String, Long, Integer>(tokens[0], eventtime, 1);
        }
    }

    /**
     *
過濾掉為nullwhitespace的字符串
    
*/
   
public static final class FilterClass implements FilterFunction<String> {
        @Override
        public boolean filter(String value) throws Exception {
            if (StringUtils.isNullOrWhitespaceOnly(value)) {
                return false;
            } else {
                return true;
            }
        }

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM