flink 1.11.2 學習筆記(3)-統計窗口window


上節繼續,通常在做數據分析時需要指定時間范圍,比如:"每天凌晨1點統計前一天的訂單量" 或者 "每個整點統計前24小時的總發貨量"。這個統計時間段,就稱為統計窗口。Flink中支持多種Window統計,今天介紹二種常見的窗口:TumbingWindowSlidingWindow

 

如上圖,最下面是時間線,假設每1分鍾上游系統產生1條數據,分別對應序號1~7。如果每隔1分鍾,需要統計前3分鍾的數據,這種就是SlidingWindow。如果每2分鍾的數據做1次統計(注:2次相鄰的統計之間,沒有數據重疊部分),這種就是TumbingWindow

 

在開始寫示例代碼前,再來說一個概念:時間語義

通常每條業務數據都有自己的"業務發生時間"(比如:訂單數據有“下單時間”,IM聊天消息有"消息發送時間"),由於網絡延時等原因,數據到達flink時,flink有一個"數據接收時間"。那么在數據分析時,前面提到的各種窗口統計應該以哪個時間為依據呢?這就是時間語義。 flink允許開發者自行指定用哪個時間來做為處理依據,大多數業務系統通常會采用業務發生時間(即:所謂的事件時間)。

 

下面還是以WordCount這個經典示例來演示一番:(flink版本:1.11.2

1、准備數據源

仍以kafka作為數據源,准備向其發送以下格式的數據:

{
	"event_datetime": "2020-12-19 14:10:21.209",
	"event_timestamp": "1608358221209",
	"word": "hello"
}

注意:這里event_timestamp相當於業務時間(即:事件時間)對應的時間戳,word為每次要統計的單詞。event_datetime不參與處理,只是為了肉眼看日志更方便。

寫一個java類,不停發送數據:(每10秒生成一條隨機數據,1分鍾共6條)

package com.cnblogs.yjmyzz.flink.demo;

import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @author 菩提樹下的楊過
 */
public class KafkaProducerSample {

    private static String topic = "test3";

    private static Gson gson = new Gson();

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    public static void main(String[] args) throws InterruptedException {
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

        String[] words = new String[]{"hello", "world", "flink"};
        Random rnd = new Random();
        try {
            while (true) {
                Map<String, String> map = new HashMap<>();
                map.put("word", words[rnd.nextInt(words.length)]);
                long timestamp = System.currentTimeMillis();
                map.put("event_timestamp", timestamp + "");
                map.put("event_datetime", sdf.format(new Date(timestamp)));
                String msg = gson.toJson(map);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                kafkaProducer.send(record);
                System.out.println(msg);
                Thread.sleep(10000);
            }
        } finally {
            kafkaProducer.close();
        }

    }
}

 

2. TumbingWindow示例

package com.cnblogs.yjmyzz.flink.demo;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Properties;

/**
 * @author 菩提樹下的楊過(http : / / yjmyzz.cnblogs.com /)
 */
public class KafkaStreamTumblingWindowCount {

    private final static Gson gson = new Gson();
    private final static String SOURCE_TOPIC = "test3";
    private final static String SINK_TOPIC = "test4";
    private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");

    public static void main(String[] args) throws Exception {

        // 1 設置環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //指定使用eventTime作為時間標准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 2. 定義數據
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "test-read-group-2");
        props.put("deserializer.encoding", "GB2312");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource<String> text = env.addSource(new FlinkKafkaConsumer011<>(
                SOURCE_TOPIC,
                new SimpleStringSchema(),
                props));

        // 3. 處理邏輯
        DataStream<Tuple3<String, Integer, String>> counts = text.assignTimestampsAndWatermarks(new WatermarkStrategy<String>() {
            @Override
            public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<String>() {
                    private long maxTimestamp;
                    private long delay = 100;

                    @Override
                    public void onEvent(String s, long l, WatermarkOutput watermarkOutput) {
                        Map<String, String> map = gson.fromJson(s, new TypeToken<Map<String, String>>() {
                        }.getType());
                        String timestamp = map.getOrDefault("event_timestamp", l + "");
                        maxTimestamp = Math.max(maxTimestamp, Long.parseLong(timestamp));
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                        watermarkOutput.emitWatermark(new Watermark(maxTimestamp - delay));
                    }
                };
            }
        }).flatMap(new FlatMapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                //解析message中的json
                Map<String, String> map = gson.fromJson(value, new TypeToken<Map<String, String>>() {
                }.getType());

                String word = map.getOrDefault("word", "");
                String eventTimestamp = map.getOrDefault("event_timestamp", "0");
                //獲取每個統計窗口的時間(用於顯示)
                String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(Long.parseLong(eventTimestamp), 0, 60 * 1000)));
                if (word != null && word.trim().length() > 0) {
                    //收集(類似:map-reduce思路)
                    out.collect(new Tuple3<>(word.trim(), 1, windowTime));
                }

            }
        })
                //按Tuple3里的第0項,即:word分組
                .keyBy(value -> value.f0)
                //按每1分整點開固定窗口計算
                .timeWindow(Time.minutes(1))
                //然后對Tuple3里的第1項求合
                .sum(1);

        // 4. 打印結果
        counts.addSink(new FlinkKafkaProducer010<>("localhost:9092", SINK_TOPIC,
                (SerializationSchema<Tuple3<String, Integer, String>>) element -> (element.f2 + " (" + element.f0 + "," + element.f1 + ")").getBytes()));
        counts.print();

        // execute program
        env.execute("Kafka Streaming WordCount");

    }
}

代碼看着一大堆,但是並不復雜,解釋 一下:

31-34 行是一些常量定義 ,從test3這個topic拿數據,處理好的結果,發送到test4這個topic

42行指定時間語義:使用事件時間做為依據。但是這還不夠,不是空口白話,說用“事件時間”就用“事件時間”,flink怎么知道哪個字段代表事件時間? 62-77行,這里給出了細節,解析kafka消息中的json體,然后把event_timestamp提取出來,做為時間依據。另外65行,還指定了允許數據延時100ms(這個可以先不管,后面學習watermark時,再詳細解釋 )

89-90行,為了讓wordCount的統計結果更友好,本次窗口對應的起始時間,使用靜態方法TimeWindow.getWindowStartWithOffset計算后,直接放到結果里了。

102行, timeWindow(Time.munites(1)) 這里指定了使用tumblingWindow,每次統計1分鍾的數據。(注:這里的1分鍾是從0秒開始,到59秒結束,即類似: 2020-12-12 14:00:00.000 ~ 2020-12-12 14:00:59.999)

運行結果:

下面是數據源的kafka消息日志(截取了部分)

...
{"event_datetime":"2020-12-19 14:32:36.873","event_timestamp":"1608359556873","word":"hello"}
{"event_datetime":"2020-12-19 14:32:46.874","event_timestamp":"1608359566874","word":"world"}
{"event_datetime":"2020-12-19 14:32:56.874","event_timestamp":"1608359576874","word":"hello"}

{"event_datetime":"2020-12-19 14:33:06.875","event_timestamp":"1608359586875","word":"hello"}
{"event_datetime":"2020-12-19 14:33:16.876","event_timestamp":"1608359596876","word":"world"}
{"event_datetime":"2020-12-19 14:33:26.877","event_timestamp":"1608359606877","word":"hello"}
{"event_datetime":"2020-12-19 14:33:36.878","event_timestamp":"1608359616878","word":"world"}
{"event_datetime":"2020-12-19 14:33:46.879","event_timestamp":"1608359626879","word":"flink"}
{"event_datetime":"2020-12-19 14:33:56.879","event_timestamp":"1608359636879","word":"hello"}

{"event_datetime":"2020-12-19 14:34:06.880","event_timestamp":"1608359646880","word":"world"}
{"event_datetime":"2020-12-19 14:34:16.881","event_timestamp":"1608359656881","word":"world"}
{"event_datetime":"2020-12-19 14:34:26.883","event_timestamp":"1608359666883","word":"hello"}
{"event_datetime":"2020-12-19 14:34:36.883","event_timestamp":"1608359676883","word":"flink"}
{"event_datetime":"2020-12-19 14:34:46.885","event_timestamp":"1608359686885","word":"flink"}
{"event_datetime":"2020-12-19 14:34:56.885","event_timestamp":"1608359696885","word":"world"}

{"event_datetime":"2020-12-19 14:35:06.885","event_timestamp":"1608359706885","word":"flink"}
...

flink的處理結果:

...
3> (world,2,2020-12-19 14:33)
4> (flink,1,2020-12-19 14:33)
2> (hello,3,2020-12-19 14:33)

3> (world,3,2020-12-19 14:34)
2> (hello,1,2020-12-19 14:34)
4> (flink,2,2020-12-19 14:34)
...

  

3.SlidingWindow示例

package com.cnblogs.yjmyzz.flink.demo;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Properties;

/**
 * @author 菩提樹下的楊過(http : / / yjmyzz.cnblogs.com /)
 */
public class KafkaStreamSlidingWindowCount {

    private final static Gson gson = new Gson();
    private final static String SOURCE_TOPIC = "test3";
    private final static String SINK_TOPIC = "test4";
    private final static SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private final static SimpleDateFormat sdf2 = new SimpleDateFormat("HH:mm");

    public static void main(String[] args) throws Exception {

        // 1 設置環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //指定使用eventTime作為時間標准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 2. 定義數據
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "test-read-group-1");
        props.put("deserializer.encoding", "GB2312");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource<String> text = env.addSource(new FlinkKafkaConsumer011<>(
                SOURCE_TOPIC,
                new SimpleStringSchema(),
                props));

        // 3. 處理邏輯
        DataStream<Tuple3<String, Integer, String>> counts = text.assignTimestampsAndWatermarks(new WatermarkStrategy<String>() {
            @Override
            public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<String>() {
                    private long maxTimestamp;
                    private long delay = 1000;

                    @Override
                    public void onEvent(String s, long l, WatermarkOutput watermarkOutput) {
                        Map<String, String> map = gson.fromJson(s, new TypeToken<Map<String, String>>() {
                        }.getType());
                        String timestamp = map.getOrDefault("event_timestamp", l + "");
                        maxTimestamp = Math.max(maxTimestamp, Long.parseLong(timestamp));
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                        watermarkOutput.emitWatermark(new Watermark(maxTimestamp - delay));
                    }
                };
            }
        }).flatMap(new FlatMapFunction<String, Tuple3<String, Integer, String>>() {

            @Override
            public void flatMap(String value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                //解析message中的json
                Map<String, String> map = gson.fromJson(value, new TypeToken<Map<String, String>>() {
                }.getType());

                String eventTimestamp = map.getOrDefault("event_timestamp", "0");
                String windowTimeStart = sdf1.format(new Date(TimeWindow.getWindowStartWithOffset(Long.parseLong(eventTimestamp), 2 * 60 * 1000, 1 * 60 * 1000)));
                String windowTimeEnd = sdf2.format(new Date(1 * 60 * 1000 + TimeWindow.getWindowStartWithOffset(Long.parseLong(eventTimestamp), 2 * 60 * 1000, 1 * 60 * 1000)));
              
                String word = map.getOrDefault("word", "");
                if (word != null && word.trim().length() > 0) {
                    out.collect(new Tuple3<>(word.trim(), 1, windowTimeStart + " ~ " + windowTimeEnd));
                }

            }
        })
                //按Tuple3里的第0項,即:word分組
                .keyBy(value -> value.f0)
                //每1分鍾算1次,每次算過去2分鍾內的數據
                .timeWindow(Time.minutes(2), Time.minutes(1))
                //然后對Tuple3里的第1項求合
                .sum(1);

        // 4. 打印結果
        counts.addSink(new FlinkKafkaProducer010<>("localhost:9092", SINK_TOPIC,
                (SerializationSchema<Tuple3<String, Integer, String>>) element -> (element.f2 + " (" + element.f0 + "," + element.f1 + ")").getBytes()));
        counts.print();

        // execute program
        env.execute("Kafka Streaming WordCount");

    }
}

與TumbingWindow最大的區別在於105行,除了指定窗口的size,還指定了slide值,有興趣的同學可以研究下這個方法:

	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			return window(SlidingEventTimeWindows.of(size, slide));
		}
	}

輸出結果:

發送到kafka的數據源片段:

...
{"event_datetime":"2020-12-19 14:32:36.873","event_timestamp":"1608359556873","word":"hello"}
{"event_datetime":"2020-12-19 14:32:46.874","event_timestamp":"1608359566874","word":"world"}
{"event_datetime":"2020-12-19 14:32:56.874","event_timestamp":"1608359576874","word":"hello"}

{"event_datetime":"2020-12-19 14:33:06.875","event_timestamp":"1608359586875","word":"hello"}
{"event_datetime":"2020-12-19 14:33:16.876","event_timestamp":"1608359596876","word":"world"}
{"event_datetime":"2020-12-19 14:33:26.877","event_timestamp":"1608359606877","word":"hello"}
{"event_datetime":"2020-12-19 14:33:36.878","event_timestamp":"1608359616878","word":"world"}
{"event_datetime":"2020-12-19 14:33:46.879","event_timestamp":"1608359626879","word":"flink"}
{"event_datetime":"2020-12-19 14:33:56.879","event_timestamp":"1608359636879","word":"hello"}

{"event_datetime":"2020-12-19 14:34:06.880","event_timestamp":"1608359646880","word":"world"}
{"event_datetime":"2020-12-19 14:34:16.881","event_timestamp":"1608359656881","word":"world"}
{"event_datetime":"2020-12-19 14:34:26.883","event_timestamp":"1608359666883","word":"hello"}
{"event_datetime":"2020-12-19 14:34:36.883","event_timestamp":"1608359676883","word":"flink"}
{"event_datetime":"2020-12-19 14:34:46.885","event_timestamp":"1608359686885","word":"flink"}
{"event_datetime":"2020-12-19 14:34:56.885","event_timestamp":"1608359696885","word":"world"}

{"event_datetime":"2020-12-19 14:35:06.885","event_timestamp":"1608359706885","word":"flink"}
...

處理后的結果:

...
3> (world,2,2020-12-19 14:33)
4> (flink,1,2020-12-19 14:33)
2> (hello,3,2020-12-19 14:33)

3> (world,3,2020-12-19 14:34)
2> (hello,1,2020-12-19 14:34)
4> (flink,2,2020-12-19 14:34)
...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM