精通flink读书笔记(2)——用DataStreamAPI处理数据


来源于 https://blog.csdn.net/yyqq188/article/details/79527017

 

Flink提供了Kafka连接器,用于从或向Kafka读写数据。

本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。

问题一: 读Kafka的方式
## 读取一个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

## 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

# 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

# 从指定的时间戳开始消费
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)

# 从指定的偏移量开始消费,可为每个分区单独设置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)


问题二: 读Kafka与反序列化器
可通过org.apache.flink.api.common.serialization.DeserializationSchema或org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema,将从Kafka读取的二进制字节流反序列化成Flink内部支持的Java/Scala对象。

Flink内置支持以下2种常用反序列化器:

org.apache.flink.api.common.serialization.SimpleStringSchema:反序列化成String。

org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema:反序列化成jackson ObjectNode。

如果想实现Kafka复杂JSON直接转换成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema自定义即可。主要实现deserialize反序列化方法。

问题三: 读Kafka动态发现Topic、Partition
之前使用Spark Streaming,Spark 2.2.2不支持动态发现Kafka 0.10.1中新增的Topic(基于正则指定)和Partition。当新增了Topic或Partition,需要重启Spark Streaming任务。

在Flink中, 默认支持动态发现Kafka中新增的Topic或Partition,但需要手动开启。

kafkaProperties.put("flink.partition-discovery.interval-millis","10000");

flink.partition-discovery.interval-millis: 检查间隔,单位毫秒。

问题四: 读Kafka与Exactly Once语义
没有开启Checkpoint,默认自动提交Offset至外部存储(如Kafka Broker或Zookeeper),自动提交的间隔是5秒。Flink Kafka Consumer的容错依赖于自动提交的Offset。

开启Checkpoint,默认在Checkpoint完成后将存储在Checkpoint中的Offset再提交至外部存储(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作业运行过程中的容错依赖于Checkpoint中的Offset,Flink作业恢复,则可能是从Checkpoint中的Offset恢复,也可能是从外部存储如Kafka Broker中的Offset恢复,具体取决于恢复方式。注意: 在这种方式下,Kafka Broker(或0.8中Zookeeper)存储的Offset仅用于监控消费进度。

总结,基于Kafka可重复消费的能力并结合Flink Checkpoint机制,Flink Kafka Consumer能提供Exactly-Once语义。

问题五: 写Kafka与Exactly Once语义
Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once语义。

Kafka 0.9、0.10 Flink启用Checkpoint,FlinkKafkaProducer09和FlinkKafkaProducer010提供At-Least-Once语义。除此之外,还需设置以下参数:

setLogFailuresOnly(false): 若为true,Producer遇到异常时,仅记录失败时的日志,流处理程序继续。需要设置为false,当遇到异常,流处理程序失败,抛出异常恢复任务并重试发送。

setFlushOnCheckpoint(true): Checkpoint中包含Kafka Producer Buffer中的数据,设置为true, 确保Checkpoint成功前,Buffer中的所有记录都已写入Kafka。

retries: 重试次数,默认0,建议设置更大。

Kafka 0.11、1.0.0+ Flink启用Checkpoint,基于Two Phase Commit,FlinkKafkaProducer011和FlinkKafkaProducer(Kafka >=1.0.0) 默认提供Exactly-Once语义。
如需要其他语义Semantic.NONE(可能会丢或重)、Semantic.AT_LEAST_ONCE(可能会重)、Semantic.EXACTLY_ONCE(默认),可手动选择。

从Kafka 0.10.1读数据并写入到Kafka 0.11.0.3并实现PV统计
部分依赖
<!--Kafka连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<!--Kafka连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.0</version>
</dependency>

代码实现
package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;

import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
* Author: Wang Pei
* Summary: 读写Kafka
*/
public class ReadWriteKafka {
public static void main(String[] args) throws Exception{

/**解析命令行参数*/
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));

//checkpoint参数
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");

//fromKafka参数
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");

//toKafka参数
String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");
String toKafkaTopic = parameterTool.getRequired("toKafka.topic");

//窗口参数
long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");
long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");


/**配置运行环境*/

//设置Local Web Server
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//设置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));

//设置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

/**配置数据源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

/**抽取转换*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source
.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){})
.name("Map: ExtractTransform").uid("map-id");

/**过滤掉异常数据*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap
.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null)
.name("Filter: FilterExceptionData").uid("filter-id");


/**抽取时间戳并发射水印*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {

SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(Tuple4<String, String, String, Integer> element) {

long timestamp = 0L;

try {
Date date = format.parse(element.f1);
timestamp = date.getTime();
} catch (ParseException e) {
e.printStackTrace();
}

return timestamp;
}
}).uid("watermark-id");


/**窗口统计*/
SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks
//默认用Hash方式
.keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2)
.window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))
//在每个窗口(Window)上应用WindowFunction(CustomWindowFunction)
//CustomAggFunction用于增量聚合
//在每个窗口上,先进行增量聚合(CustomAggFunction),然后将增量聚合的结果作为WindowFunction(CustomWindowFunction)的输入,计算后并输出
//具体: 可参考底层AggregateApplyWindowFunction的实现
.aggregate(new CustomAggFunction(), new CustomWindowFunction());

//aggregate.print();

/**结果输出*/
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);
kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");
FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(
toKafkaTopic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProducerProperties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);

aggregate.addSink(kafkaProducer011).name("outputToKafka");

env.execute();

}

/**
* 自定义AggregateFunction
* 增量聚合,这里实现累加效果
*/
static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {
return accumulator + 1;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}

/**
* 自定义WindowFunction
* 对增量聚合的结果再做处理,并输出
*/
static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {

@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {

Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

long windowStart = window.getStart();
long windowEnd = window.getEnd();
Long windowPV = input.iterator().next();

String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;

out.collect(output);

}
}

}

 

————————————————
版权声明:本文为CSDN博主「wangpei1949」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/wangpei1949/article/details/98476354


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM