在實時數據處理的場景中,數據的到達延時或亂序是經常遇到的問題,比如:
* 按時間順序發生的數據1 -> 2,本來應該是1先發送,1先到達,但是在1發送過程中,因為網絡延時之類的原因,導致1反而到達晚了,變成2先到達,也就造成所謂的接收亂序;
* 發送方本身就延時了,比如:事實上按1 -> 2產生的數據 ,發送方如果是多線程發送數據,可能造成2先發,1后發,中間網絡傳輸就算沒有延時,也會導致接收到時已經亂序;
* 有一些比如本來是19:59:59發生的業務數據,由於一些中間環節耗時(比如:最長可能需要5秒),到了發送的時候,已經是20:00:04了,但是在處理時,又希望這條數據能算到上1個小時的統計窗口里(即:數據雖然晚到了,已經錯過了上1個時間窗口的計算時機,但是不希望被扔掉)
flink做為一個流批一體的框架,自然也考慮到這個問題,它提供了3種機制來應對,還是以最經典的wordcount為例,先定義WordCount類:
package com.cnblogs.yjmyzz.flink.demo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.Date; @Data @AllArgsConstructor @NoArgsConstructor public class WordCount { private String word; private Date eventDateTime; }
為了后面json序列化方便,定義一個Gson工具類(可參考)
package com.cnblogs.yjmyzz.flink.demo; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonDeserializer; import com.google.gson.JsonPrimitive; import java.text.SimpleDateFormat; import java.util.Date; /** * @author jimmy */ public enum GsonUtils { INSTANCE; private static Gson gson; public Gson gson() { if (gson != null) { return gson; } String dateFormatWithMS = "yyyy-MM-dd HH:mm:ss.SSS"; String dateFormatNoMS = "yyyy-MM-dd HH:mm:ss"; GsonBuilder builder = new GsonBuilder(); builder.registerTypeAdapter(Date.class, (JsonDeserializer<Date>) (json, typeOfT, context) -> { if (json == null || json.toString().equalsIgnoreCase("\"\"")) { //空字符判斷 return null; } JsonPrimitive jsonPrimitive = json.getAsJsonPrimitive(); SimpleDateFormat sdfMS = new SimpleDateFormat(dateFormatWithMS); SimpleDateFormat sdfNoMS = new SimpleDateFormat(dateFormatNoMS); Date dt = null; try { if (jsonPrimitive.isString()) { if (jsonPrimitive.getAsString().length() == 19) { //這里只是示例,簡單用長度來判斷是哪種格式 //yyyy-MM-dd HH:mm:ss格式 dt = sdfNoMS.parse(json.getAsString()); } else { //yyyy-MM-dd HH:mm:ss.SSS格式 dt = sdfMS.parse(json.getAsString()); } } else if (jsonPrimitive.isNumber()) { //兼容timestamp類型 dt = new Date(jsonPrimitive.getAsLong()); } } catch (Exception e) { //錯誤日志記錄,略 e.printStackTrace(); } return dt; }); gson = builder .setDateFormat(dateFormatWithMS) .setPrettyPrinting() .create(); return gson; } }
開始flink處理,我們的場景是先啟動一個nc模擬網絡服務端發送數據,然后flink實時接收,然后按1分鍾做為時間窗口,統計窗口內收到的word個數。
發送的數據格式類似:
{"word":"hello","eventDateTime":"2021-05-09 22:01:10.000"}
代碼如下:
package com.cnblogs.yjmyzz.flink.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Date; public class WorkCountSample { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // 1 設置環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment() .setParallelism(1); //指定使用eventTime作為時間標准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SingleOutputStreamOperator<Tuple3<String, Integer, String>> count = env .socketTextStream("127.0.0.1", 9999) .map((MapFunction<String, WordCount>) value -> { //將接收到的json轉換成WordCount對象 WordCount wordCount = GsonUtils.INSTANCE.gson().fromJson(value, WordCount.class); return wordCount; }) //這里先不指定任何水印延時 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WordCount>(Time.milliseconds(0)) { @Override public long extractTimestamp(WordCount element) { //指定事件時間的字段 return element.getEventDateTime().getTime(); } }) .flatMap((FlatMapFunction<WordCount, Tuple3<String, Integer, String>>) (value, out) -> { String word = value.getWord(); //輔助輸出窗口信息,方便調試 String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.getEventDateTime().getTime(), 0, 60 * 1000))); if (word != null && word.trim().length() > 0) { out.collect(new Tuple3<>(word.trim(), 1, windowTime)); } }) .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class))) .keyBy(0) //按每分鍾開窗 .timeWindow(Time.minutes(1)) .sum(1); count.print(); env.execute("wordCount"); } }
來測試一下,先啟用1個網絡服務端,mac或linux上,終端輸入 nc -l 9999,再啟用上面的flink程序,在終端依次輸入下面3條json(即:模擬發送了3條數據)
{"word":"hello","eventDateTime":"2021-05-09 22:01:10.000"} {"word":"hello","eventDateTime":"2021-05-09 22:01:00.999"} {"word":"hello","eventDateTime":"2021-05-09 22:02:00.000"}
可以看到,在輸入到第3條時,因為事件時間已經到了第2分鍾,所以上1分鍾的窗口被關閉,觸發了計算,輸出了hello:2,符合預期。
注意一下:第1條與第2條的事件時間,正好的是反的,第1條是22:01:10,而第2條是更早的22:01:00,也就是亂序,但是仍然都正確的統計在了22:01:00這個1分鍾的窗口里。所以按時間開窗的場景,flink天然就能兼容一些亂序情況。
如果是延時問題,比如希望延時1秒才開始觸發上1個時間窗口的計算,即: 22:02.00.999 的事件時間數據到達時,才開始計算22:01:00 開始的這個1分鍾窗口(相當於多等1秒),可以調整第40行代碼,即所謂的水印WaterMark機制
一、Watermark延時設置
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WordCount>(Time.milliseconds(1000))
很簡單對吧?只要把40行這里的0,調整成1000,也就是延時1000ms觸發計算,注:這是一個左閉右開的區間,即[0,1000)的延時范圍都是允許的。
再測試一下:
可以看到,當輸入第3條數據時,雖然已經是22:02:00.000,到了第2分鍾,但是並沒有觸發前1個時間窗口的計算輸出,而是在第4條數據輸入,也就是22:02.00.999時才觸發22:01 窗口的計算,以此之后,哪怕再有01分窗口的數據上報,將被扔掉。
二、時間窗口延時設置
在剛才示例中,如果某個窗口計算過了(也就是窗口關閉了),后面哪怕還有該窗口內的數據上報,默認也會被丟失。這好比:公司組織團建,約好第2天早上8點發車(即:時間窗口的截止時間為8點),然后考慮到可能有人會遲到(即: 數據延時上報),會讓司機多等5分鍾(即:watermark的延時),但是過了08:05,如果還有人沒來,就不管了,這個好象有點不厚道。怎么辦?通常公司會說,現在我們先點下人數(即:窗口先計算1次),如果還有人沒到,我們最后再多等10分鍾(即:這10分鍾內,如果還有人再來,每來1個人,再清點1次,看看人有沒有到齊,如果到了08:15還沒到齊,就只能發車了,不讓讓全公司的人等個別懶蟲)。
這就是flink的第2種處理延時機制,窗口延時計算,只要加一行allowLateness就好。
.timeWindow(Time.minutes(1))
.allowedLateness(Time.seconds(10))
觀察上面的運行結果 ,第3次輸入時,觸發了窗口的第1次計算,緊接着第4條輸入,仍然是01分窗口的數據(相當於遲到趕來的人),又觸發了1次計算,但是到了第5條,也就是第1個黃色箭頭的數據到達時,已經到了最后截止時間,窗口徹底關閉(即:發車了),后面再有數據過來,也不管了。
三、遲到數據的側輸出流
還是以上面的公司團建發車為例,如果有些人真的有事情,來不及,但是又想去團建怎么辦?(即:肯定是遲到了,但是數據不能丟)一般的做法,我們是讓他自行打車,單獨前往。這在Flink里,叫做所謂“側輸出流”,把遲到的數據單獨放在一個Stream里收集起來,然后單獨處理。
package com.cnblogs.yjmyzz.flink.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; import java.util.Date; public class WorkCountSample { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // 1 設置環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment() .setParallelism(1); //指定使用eventTime作為時間標准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); OutputTag<Tuple3<String, Integer, String>> lateTag = new OutputTag<>("late", ((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class))); SingleOutputStreamOperator<Tuple3<String, Integer, String>> count = env .socketTextStream("127.0.0.1", 9999) .map((MapFunction<String, WordCount>) value -> { //將接收到的json轉換成WordCount對象 WordCount wordCount = GsonUtils.INSTANCE.gson().fromJson(value, WordCount.class); return wordCount; }) //這里先不指定任何水印延時 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WordCount>(Time.milliseconds(1000)) { @Override public long extractTimestamp(WordCount element) { //指定事件時間的字段 return element.getEventDateTime().getTime(); } }) .flatMap((FlatMapFunction<WordCount, Tuple3<String, Integer, String>>) (value, out) -> { String word = value.getWord(); //輔助輸出窗口信息,方便調試 String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.getEventDateTime().getTime(), 0, 60 * 1000))); if (word != null && word.trim().length() > 0) { out.collect(new Tuple3<>(word.trim(), 1, windowTime)); } }) .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class))) .keyBy(0) //按每分鍾開窗 .timeWindow(Time.minutes(1)) .allowedLateness(Time.seconds(10)) //定義遲到數據的側輸出流 .sideOutputLateData(lateTag) .sum(1); count.print(); //遲到的數據,這時只是簡單的打印出來 count.getSideOutput(lateTag).print(); env.execute("wordCount"); } }
33行,先定義一個OutputTag
64行,通過sideOutputLateData(lateTag)指定側輸出流,將遲到的數據收集於此
71行,將收集到的測輸出流,打印出來(實際業務中,可以存到mysql等一些存儲體系中)
運行效果:
注:
右側倒數第2條{"word":"hello","eventDateTime":"2021-05-09 22:02:10.999"}發送完畢后,01分的窗口已關閉。
再發送最后1條{"word":"world","eventDateTime":"2021-05-09 22:01:10.000"}時,這條就是遲到數據了,從左側輸出來看,已正確輸出,被側輸出流處理了。
小結一下:
1、Watermark水印在窗口計算觸發前延時;
2、allowedLateness則是只要窗口計算時機被觸發了,把現有數據先算一把,后面如果還有該窗口的數據過來,可以繼續再算(前提是在允許的延時閾值范圍內)
3、如果上述2種延時都滿足不了,在窗口徹底關閉了后,還有遲到數據進來,可以放到側輸出流,單獨處理。