来源于 https://blog.csdn.net/yyqq188/article/details/79527017
Flink提供了Kafka连接器,用于从或向Kafka读写数据。
本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。
问题一: 读Kafka的方式
## 读取一个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
## 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
# 从指定的时间戳开始消费
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)
# 从指定的偏移量开始消费,可为每个分区单独设置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
问题二: 读Kafka与反序列化器
可通过org.apache.flink.api.common.serialization.DeserializationSchema或org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema,将从Kafka读取的二进制字节流反序列化成Flink内部支持的Java/Scala对象。
Flink内置支持以下2种常用反序列化器:
org.apache.flink.api.common.serialization.SimpleStringSchema:反序列化成String。
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema:反序列化成jackson ObjectNode。
如果想实现Kafka复杂JSON直接转换成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema自定义即可。主要实现deserialize反序列化方法。
问题三: 读Kafka动态发现Topic、Partition
之前使用Spark Streaming,Spark 2.2.2不支持动态发现Kafka 0.10.1中新增的Topic(基于正则指定)和Partition。当新增了Topic或Partition,需要重启Spark Streaming任务。
在Flink中, 默认支持动态发现Kafka中新增的Topic或Partition,但需要手动开启。
kafkaProperties.put("flink.partition-discovery.interval-millis","10000");
flink.partition-discovery.interval-millis: 检查间隔,单位毫秒。
问题四: 读Kafka与Exactly Once语义
没有开启Checkpoint,默认自动提交Offset至外部存储(如Kafka Broker或Zookeeper),自动提交的间隔是5秒。Flink Kafka Consumer的容错依赖于自动提交的Offset。
开启Checkpoint,默认在Checkpoint完成后将存储在Checkpoint中的Offset再提交至外部存储(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作业运行过程中的容错依赖于Checkpoint中的Offset,Flink作业恢复,则可能是从Checkpoint中的Offset恢复,也可能是从外部存储如Kafka Broker中的Offset恢复,具体取决于恢复方式。注意: 在这种方式下,Kafka Broker(或0.8中Zookeeper)存储的Offset仅用于监控消费进度。
总结,基于Kafka可重复消费的能力并结合Flink Checkpoint机制,Flink Kafka Consumer能提供Exactly-Once语义。
问题五: 写Kafka与Exactly Once语义
Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once语义。
Kafka 0.9、0.10 Flink启用Checkpoint,FlinkKafkaProducer09和FlinkKafkaProducer010提供At-Least-Once语义。除此之外,还需设置以下参数:
setLogFailuresOnly(false): 若为true,Producer遇到异常时,仅记录失败时的日志,流处理程序继续。需要设置为false,当遇到异常,流处理程序失败,抛出异常恢复任务并重试发送。
setFlushOnCheckpoint(true): Checkpoint中包含Kafka Producer Buffer中的数据,设置为true, 确保Checkpoint成功前,Buffer中的所有记录都已写入Kafka。
retries: 重试次数,默认0,建议设置更大。
Kafka 0.11、1.0.0+ Flink启用Checkpoint,基于Two Phase Commit,FlinkKafkaProducer011和FlinkKafkaProducer(Kafka >=1.0.0) 默认提供Exactly-Once语义。
如需要其他语义Semantic.NONE(可能会丢或重)、Semantic.AT_LEAST_ONCE(可能会重)、Semantic.EXACTLY_ONCE(默认),可手动选择。
从Kafka 0.10.1读数据并写入到Kafka 0.11.0.3并实现PV统计
部分依赖
<!--Kafka连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!--Kafka连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.0</version>
</dependency>
代码实现
package com.bigdata.flink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
* Author: Wang Pei
* Summary: 读写Kafka
*/
public class ReadWriteKafka {
public static void main(String[] args) throws Exception{
/**解析命令行参数*/
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
//checkpoint参数
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
//fromKafka参数
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
//toKafka参数
String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");
String toKafkaTopic = parameterTool.getRequired("toKafka.topic");
//窗口参数
long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");
long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");
/**配置运行环境*/
//设置Local Web Server
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
//设置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
/**配置数据源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");
/**抽取转换*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source
.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){})
.name("Map: ExtractTransform").uid("map-id");
/**过滤掉异常数据*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap
.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null)
.name("Filter: FilterExceptionData").uid("filter-id");
/**抽取时间戳并发射水印*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(Tuple4<String, String, String, Integer> element) {
long timestamp = 0L;
try {
Date date = format.parse(element.f1);
timestamp = date.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return timestamp;
}
}).uid("watermark-id");
/**窗口统计*/
SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks
//默认用Hash方式
.keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2)
.window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))
//在每个窗口(Window)上应用WindowFunction(CustomWindowFunction)
//CustomAggFunction用于增量聚合
//在每个窗口上,先进行增量聚合(CustomAggFunction),然后将增量聚合的结果作为WindowFunction(CustomWindowFunction)的输入,计算后并输出
//具体: 可参考底层AggregateApplyWindowFunction的实现
.aggregate(new CustomAggFunction(), new CustomWindowFunction());
//aggregate.print();
/**结果输出*/
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);
kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");
FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(
toKafkaTopic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProducerProperties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);
aggregate.addSink(kafkaProducer011).name("outputToKafka");
env.execute();
}
/**
* 自定义AggregateFunction
* 增量聚合,这里实现累加效果
*/
static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
/**
* 自定义WindowFunction
* 对增量聚合的结果再做处理,并输出
*/
static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long windowStart = window.getStart();
long windowEnd = window.getEnd();
Long windowPV = input.iterator().next();
String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;
out.collect(output);
}
}
}
————————————————
版权声明:本文为CSDN博主「wangpei1949」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/wangpei1949/article/details/98476354