flink 1.11.2 學習筆記(5)-處理消息延時/亂序的三種機制


在實時數據處理的場景中,數據的到達延時或亂序是經常遇到的問題,比如:

* 按時間順序發生的數據1 -> 2,本來應該是1先發送,1先到達,但是在1發送過程中,因為網絡延時之類的原因,導致1反而到達晚了,變成2先到達,也就造成所謂的接收亂序;

* 發送方本身就延時了,比如:事實上按1 -> 2產生的數據 ,發送方如果是多線程發送數據,可能造成2先發,1后發,中間網絡傳輸就算沒有延時,也會導致接收到時已經亂序;

* 有一些比如本來是19:59:59發生的業務數據,由於一些中間環節耗時(比如:最長可能需要5秒),到了發送的時候,已經是20:00:04了,但是在處理時,又希望這條數據能算到上1個小時的統計窗口里(即:數據雖然晚到了,已經錯過了上1個時間窗口的計算時機,但是不希望被扔掉)

 

flink做為一個流批一體的框架,自然也考慮到這個問題,它提供了3種機制來應對,還是以最經典的wordcount為例,先定義WordCount類:

package com.cnblogs.yjmyzz.flink.demo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class WordCount {

    private String word;

    private Date eventDateTime;

}

為了后面json序列化方便,定義一個Gson工具類(可參考)

package com.cnblogs.yjmyzz.flink.demo;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonPrimitive;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author jimmy
 */
public enum GsonUtils {

    INSTANCE;

    private static Gson gson;

    public Gson gson() {
        if (gson != null) {
            return gson;
        }
        String dateFormatWithMS = "yyyy-MM-dd HH:mm:ss.SSS";
        String dateFormatNoMS = "yyyy-MM-dd HH:mm:ss";
        GsonBuilder builder = new GsonBuilder();
        builder.registerTypeAdapter(Date.class, (JsonDeserializer<Date>) (json, typeOfT, context) -> {
            if (json == null || json.toString().equalsIgnoreCase("\"\"")) {
                //空字符判斷
                return null;
            }
            JsonPrimitive jsonPrimitive = json.getAsJsonPrimitive();
            SimpleDateFormat sdfMS = new SimpleDateFormat(dateFormatWithMS);
            SimpleDateFormat sdfNoMS = new SimpleDateFormat(dateFormatNoMS);
            Date dt = null;
            try {
                if (jsonPrimitive.isString()) {
                    if (jsonPrimitive.getAsString().length() == 19) {
                        //這里只是示例,簡單用長度來判斷是哪種格式
                        //yyyy-MM-dd HH:mm:ss格式
                        dt = sdfNoMS.parse(json.getAsString());
                    } else {
                        //yyyy-MM-dd HH:mm:ss.SSS格式
                        dt = sdfMS.parse(json.getAsString());
                    }
                } else if (jsonPrimitive.isNumber()) { //兼容timestamp類型
                    dt = new Date(jsonPrimitive.getAsLong());
                }
            } catch (Exception e) {
                //錯誤日志記錄,略
                e.printStackTrace();
            }
            return dt;
        });
        gson = builder
                .setDateFormat(dateFormatWithMS)
                .setPrettyPrinting()
                .create();
        return gson;
    }
}

開始flink處理,我們的場景是先啟動一個nc模擬網絡服務端發送數據,然后flink實時接收,然后按1分鍾做為時間窗口,統計窗口內收到的word個數。

發送的數據格式類似:

{"word":"hello","eventDateTime":"2021-05-09 22:01:10.000"}

代碼如下:

package com.cnblogs.yjmyzz.flink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Date;

public class WorkCountSample {

    public static void main(String[] args) throws Exception {

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 1 設置環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment()
                .setParallelism(1);

        //指定使用eventTime作為時間標准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SingleOutputStreamOperator<Tuple3<String, Integer, String>> count = env
                .socketTextStream("127.0.0.1", 9999)
                .map((MapFunction<String, WordCount>) value -> {
                    //將接收到的json轉換成WordCount對象
                    WordCount wordCount = GsonUtils.INSTANCE.gson().fromJson(value, WordCount.class);
                    return wordCount;
                })
                //這里先不指定任何水印延時
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WordCount>(Time.milliseconds(0)) {
                    @Override
                    public long extractTimestamp(WordCount element) {
                        //指定事件時間的字段
                        return element.getEventDateTime().getTime();
                    }
                })
                .flatMap((FlatMapFunction<WordCount, Tuple3<String, Integer, String>>) (value, out) -> {
                    String word = value.getWord();
                    //輔助輸出窗口信息,方便調試
                    String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.getEventDateTime().getTime(), 0, 60 * 1000)));
                    if (word != null && word.trim().length() > 0) {
                        out.collect(new Tuple3<>(word.trim(), 1, windowTime));
                    }
                })
                .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class)))
                .keyBy(0)
                //按每分鍾開窗
                .timeWindow(Time.minutes(1))
                .sum(1);


        count.print();

        env.execute("wordCount");

    }
}

來測試一下,先啟用1個網絡服務端,mac或linux上,終端輸入 nc -l 9999,再啟用上面的flink程序,在終端依次輸入下面3條json(即:模擬發送了3條數據)

{"word":"hello","eventDateTime":"2021-05-09 22:01:10.000"}
{"word":"hello","eventDateTime":"2021-05-09 22:01:00.999"}
{"word":"hello","eventDateTime":"2021-05-09 22:02:00.000"}

可以看到,在輸入到第3條時,因為事件時間已經到了第2分鍾,所以上1分鍾的窗口被關閉,觸發了計算,輸出了hello:2,符合預期。

注意一下:第1條與第2條的事件時間,正好的是反的,第1條是22:01:10,而第2條是更早的22:01:00,也就是亂序,但是仍然都正確的統計在了22:01:00這個1分鍾的窗口里。所以按時間開窗的場景,flink天然就能兼容一些亂序情況。

 

如果是延時問題,比如希望延時1秒才開始觸發上1個時間窗口的計算,即: 22:02.00.999 的事件時間數據到達時,才開始計算22:01:00 開始的這個1分鍾窗口(相當於多等1秒),可以調整第40行代碼,即所謂的水印WaterMark機制

一、Watermark延時設置

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WordCount>(Time.milliseconds(1000))

很簡單對吧?只要把40行這里的0,調整成1000,也就是延時1000ms觸發計算,注:這是一個左閉右開的區間,即[0,1000)的延時范圍都是允許的。

再測試一下:

 可以看到,當輸入第3條數據時,雖然已經是22:02:00.000,到了第2分鍾,但是並沒有觸發前1個時間窗口的計算輸出,而是在第4條數據輸入,也就是22:02.00.999時才觸發22:01 窗口的計算,以此之后,哪怕再有01分窗口的數據上報,將被扔掉。

 

二、時間窗口延時設置

在剛才示例中,如果某個窗口計算過了(也就是窗口關閉了),后面哪怕還有該窗口內的數據上報,默認也會被丟失。這好比:公司組織團建,約好第2天早上8點發車(即:時間窗口的截止時間為8點),然后考慮到可能有人會遲到(即: 數據延時上報),會讓司機多等5分鍾(即:watermark的延時),但是過了08:05,如果還有人沒來,就不管了,這個好象有點不厚道。怎么辦?通常公司會說,現在我們先點下人數(即:窗口先計算1次),如果還有人沒到,我們最后再多等10分鍾(即:這10分鍾內,如果還有人再來,每來1個人,再清點1次,看看人有沒有到齊,如果到了08:15還沒到齊,就只能發車了,不讓讓全公司的人等個別懶蟲)。

這就是flink的第2種處理延時機制,窗口延時計算,只要加一行allowLateness就好。

.timeWindow(Time.minutes(1))
.allowedLateness(Time.seconds(10))

觀察上面的運行結果 ,第3次輸入時,觸發了窗口的第1次計算,緊接着第4條輸入,仍然是01分窗口的數據(相當於遲到趕來的人),又觸發了1次計算,但是到了第5條,也就是第1個黃色箭頭的數據到達時,已經到了最后截止時間,窗口徹底關閉(即:發車了),后面再有數據過來,也不管了。

 

三、遲到數據的側輸出流

還是以上面的公司團建發車為例,如果有些人真的有事情,來不及,但是又想去團建怎么辦?(即:肯定是遲到了,但是數據不能丟)一般的做法,我們是讓他自行打車,單獨前往。這在Flink里,叫做所謂“側輸出流”,把遲到的數據單獨放在一個Stream里收集起來,然后單獨處理。

package com.cnblogs.yjmyzz.flink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.Date;

public class WorkCountSample {

    public static void main(String[] args) throws Exception {

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 1 設置環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment()
                .setParallelism(1);

        //指定使用eventTime作為時間標准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        OutputTag<Tuple3<String, Integer, String>> lateTag = new OutputTag<>("late", ((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class)));

        SingleOutputStreamOperator<Tuple3<String, Integer, String>> count = env
                .socketTextStream("127.0.0.1", 9999)
                .map((MapFunction<String, WordCount>) value -> {
                    //將接收到的json轉換成WordCount對象
                    WordCount wordCount = GsonUtils.INSTANCE.gson().fromJson(value, WordCount.class);
                    return wordCount;
                })
                //這里先不指定任何水印延時
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WordCount>(Time.milliseconds(1000)) {
                    @Override
                    public long extractTimestamp(WordCount element) {
                        //指定事件時間的字段
                        return element.getEventDateTime().getTime();
                    }
                })
                .flatMap((FlatMapFunction<WordCount, Tuple3<String, Integer, String>>) (value, out) -> {
                    String word = value.getWord();
                    //輔助輸出窗口信息,方便調試
                    String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.getEventDateTime().getTime(), 0, 60 * 1000)));
                    if (word != null && word.trim().length() > 0) {
                        out.collect(new Tuple3<>(word.trim(), 1, windowTime));
                    }
                })
                .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class)))
                .keyBy(0)
                //按每分鍾開窗
                .timeWindow(Time.minutes(1))
                .allowedLateness(Time.seconds(10))
                //定義遲到數據的側輸出流
                .sideOutputLateData(lateTag)
                .sum(1);


        count.print();

        //遲到的數據,這時只是簡單的打印出來
        count.getSideOutput(lateTag).print();

        env.execute("wordCount");

    }
}

33行,先定義一個OutputTag

64行,通過sideOutputLateData(lateTag)指定側輸出流,將遲到的數據收集於此

71行,將收集到的測輸出流,打印出來(實際業務中,可以存到mysql等一些存儲體系中)

運行效果:

注:

右側倒數第2條{"word":"hello","eventDateTime":"2021-05-09 22:02:10.999"}發送完畢后,01分的窗口已關閉。

再發送最后1條{"word":"world","eventDateTime":"2021-05-09 22:01:10.000"}時,這條就是遲到數據了,從左側輸出來看,已正確輸出,被側輸出流處理了。

 

小結一下:

1、Watermark水印在窗口計算觸發前延時;

2、allowedLateness則是只要窗口計算時機被觸發了,把現有數據先算一把,后面如果還有該窗口的數據過來,可以繼續再算(前提是在允許的延時閾值范圍內)

3、如果上述2種延時都滿足不了,在窗口徹底關閉了后,還有遲到數據進來,可以放到側輸出流,單獨處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM