一FlinkTime類型
有3類時間,分別是數據本身的產生時間、進入Flink系統的時間和被處理的時間,在Flink系統中的數據可以有三種時間屬性:
Event Time 是每條數據在其生產設備上發生的時間。這段時間通常嵌入在記錄數據中,然后進入Flink,可以從記錄中提取事件的時間戳;Event Time即使在數據發生亂序,延遲或者從備份或持久性日志中重新獲取數據的情況下,也能提供正確的結果。這個時間是最有價值的,和掛在任何電腦/操作系統的時鍾時間無關。
Processing Time 是指執行相應操作的機器的系統時間。如果流計算系統基於Processing Time來處理,對流處理系統來說是最簡單的,所有基於時間的操作(如Time Window)將使用運行相應算子的機器的系統時鍾。然而,在分布式和異步環境中,Processing Time並不能保證確定性,它容易受到Event到達系統的速度(例如來自消息隊列)以及數據在Flink系統內部處理的先后順序的影響,所以Processing Time不能准確地反應數據發生的時間序列情況。
Ingestion Time是事件進入Flink的時間。 在Source算子處產生,也就是在Source處獲取到這個數據的時間,Ingestion Time在概念上位於Event Time和Processing Time之間。在Source處獲取數據的時間,不受Flink分布式系統內部處理Event的先后順序和數據傳輸的影響,相對穩定一些,但是Ingestion Time和Processing Time一樣,不能准確地反應數據發生的時間序列情況。
二 Watermark機制
上面提到Event Time是最能反映數據時間屬性的,但是Event Time可能會發生延遲或亂序,Flink系統本身只能逐個處理數據,如何應對Event Time可能會發生延遲或亂序情況呢?
比如需要統計從10:00到11:00發生某個事件的次數,也就是對Event Time是在10:00和11:00之間的數據統計個數。Event Time可能會發生延遲或亂序的情況下,Flink系統怎么判斷10:00到11:00發生的事件數據都已到達,可以給出統計結果了呢?長時間地等待會推遲結果輸出時間,而且占用更多系統資源。
Watermark是一個對Event Time的標識,內容方面Watermark是個時間戳,一個帶有時間戳X的Watermark到達,相當於告訴Flink系統,任何Event Time小於X的數據都已到達。比如上面的例子,如果Flink收到一個時間戳是11:01的Watermark,它就可以把之前統計的Event Time在[10:00,11:01)之間的事件個數輸出,清空相關被占用的資源。這里需要注意窗口的長度問題,只有窗口采集完成的數據,才會統計。
三 Watermark生成
Periodic - 一定時間間隔或者達到一定的記錄條數會產生一個watermark。
Punctuated – 基於event time通過一定的邏輯產生watermark,比如收到一個數據就產生一個WaterMark,時間是event time - 5秒。
這兩種產生方式,都有機制來保證產生的watermark是單調遞增的。
即使有了watermark,如果現實中,數據沒有滿足watermark所保證的條件怎么辦?比如Flink處理了11:01的watermark,但是之后遇到了event time是10:00~11:00之間的數據怎么辦?首先如果這種事情出現的概率非常小,不影響所要求的准確度,可以直接把數據丟棄;如果這種事情出現的概率比較大,就要調整產生water mark的機制了。
除了把違反watermark機制的數據丟棄,也有不丟棄的處理方法,比如通過一些機制來更新之前統計的結果,這種方式會有一定的性能開銷。
四代碼示例
package org.tonny.flink.bi.job.water; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.StringUtils; /** * 在指定的linux機器上開啟nc -l 9900 * 輸入的數據格式: * hello1 1567059808519 * hello2 1567059809519 * hello3 1567059810519 */ public class WaterMarkJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging();//關閉日志打印 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設置時間分配器 env.setParallelism(1); //設置並行度 env.getConfig().setAutoWatermarkInterval(3000);//每9秒發出一個watermark DataStream<String> text = env.socketTextStream("localhost", 9900); DataStream<Tuple3<String, Long, Integer>> counts = text // 設置過濾 .filter(new FilterClass()) // 設置分詞 .map(new LineSplitter()) //設置watermark方法 .assignTimestampsAndWatermarks(new PeriodicWatermarks()) .keyBy(0) //設置滾動窗口大小 .timeWindow(Time.seconds(60)) .sum(2); counts.print(); env.execute("Window WordCount"); } public static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>> { private long currentMaxTimestamp = 0L; private final long maxOutOfOrderness = 10000L; //這個控制失序已經延遲的度量,時間戳10秒以前的數據 //獲取EventTime @Override public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) { if (element == null) { return currentMaxTimestamp; } long timestamp = element.f1; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); System.out.println("get timestamp is " + timestamp + " currentMaxTimestamp " + currentMaxTimestamp); return timestamp; } //獲取Watermark @Override public Watermark getCurrentWatermark() { System.out.println("wall clock is " + System.currentTimeMillis() + " new watermark " + (currentMaxTimestamp - maxOutOfOrderness)); return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } //構造出element以及它的event time.然后把次數賦值為1 public static final class LineSplitter implements MapFunction<String, Tuple3<String, Long, Integer>> { @Override public Tuple3<String, Long, Integer> map(String value) throws Exception { if (org.apache.commons.lang3.StringUtils.isBlank(value)) { return null; } String[] tokens = value.toLowerCase().split("\\W+"); if (ArrayUtils.isEmpty(tokens) || ArrayUtils.getLength(tokens) < 2) { return null; } long eventtime = 0L; try { eventtime = Long.parseLong(tokens[1]); } catch (NumberFormatException e) { return null; } return new Tuple3<String, Long, Integer>(tokens[0], eventtime, 1); } } /** * 過濾掉為null和whitespace的字符串 */ public static final class FilterClass implements FilterFunction<String> { @Override public boolean filter(String value) throws Exception { if (StringUtils.isNullOrWhitespaceOnly(value)) { return false; } else { return true; } } } } |