Flink的kafka connector文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html
Flink寫入kafka時候需要實現序列化和反序列化
部分代碼參考了
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
以及
https://juejin.im/post/5d844d11e51d4561e0516bbd https://developer.aliyun.com/article/686809
1.依賴,其中
flink-java提供了flink的java api,包括dataset執行環境,format,一些算子
https://github.com/apache/flink/tree/master/flink-java/src/main/java/org/apache/flink/api/java
flink-streaming-java提供了flink的java streaming api,包括stream執行環境,一些算子
https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api
flink-connector-kafka提供了kafka的連接器
https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-kafka
pom文件內容
<!-- log4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
2.Kafka Consumer
作為kafka consumer有幾個比較重要的配置參數
2.1 消費kafka內容並打印,這里選用SimpleStringSchema序列化方式,只會打印message
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
if (pt.getNumberOfParameters() != 1) {
throw new Exception("Missing parameters!\n" +
"Usage: --conf-file <conf-file>");
}
String confFile = pt.get("conf-file");
pt = ParameterTool.fromPropertiesFile(confFile);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(pt);
// 消費kafka
DataStream input = env
.addSource(
new FlinkKafkaConsumer(
pt.getProperties().getProperty("input.topic"),
new SimpleStringSchema(),
pt.getProperties()
)
);
// 打印
input.print();
env.execute("Kafka consumer Example");
}
idea args配置
--conf-file ./conf/xxxxx.conf
xxxxx.conf內容
# kafka source config bootstrap.servers=master:9092 input.topic=test_topic group.id=test
往kafka的topic中灌入數據,控制台會打印出剛剛輸入的數據

2.2 消費kafka內容並打印,如果想要在消費kafka的時候,得到除message之外的其他信息,比如這條消息的offset,topic,partition等,可以使用 JSONKeyValueDeserializationSchema,JSONKeyValueDeserializationSchema將以json格式來反序列化byte數組
使用 JSONKeyValueDeserializationSchema 的時候需要保證輸入kafka的數據是json格式的,否則會有報錯
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'asdg': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
實現
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer(pt.getProperties().getProperty("input.topic"),
new JSONKeyValueDeserializationSchema(false), pt.getProperties());
如果為false,輸出
2> {"value":123123}
如果為true,輸出
2> {"value":123123,"metadata":{"offset":18,"topic":"xxxxx","partition":0}}
map輸出offset
// 打印
input.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode value) {
return value.get("metadata").get("offset").asText();
}
}).print();
如果還要獲得其他信息,比如kafka消息的key,也可以自行實現 KafkaDeserializationSchema,參考如下
https://blog.csdn.net/jsjsjs1789/article/details/105099742 https://blog.csdn.net/weixin_40954192/article/details/107561435
kafka的key的用途有2個:一是作為消息的附加信息,二是可以用來決定消息應該寫到kafka的哪個partition
public class KafkaConsumerRecordDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new ConsumerRecord<String, String>(
record.topic(),
record.partition(),
record.offset(),
record.key() != null ? new String(record.key()) : null,
record.value() != null ? new String(record.value()) : null);
}
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
});
}
}
然后
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer =
new FlinkKafkaConsumer<>(pt.getProperties().getProperty("input.topic"), new KafkaConsumerRecordDeserializationSchema(), pt.getProperties());
輸出
2> ConsumerRecord(topic = xxxx, partition = 0, leaderEpoch = null, offset = 19, NoTimestampType = -1, serialized key size = -1, serialized value size = -1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1111111)
3.Kafka Producer
FlinkKafkaProducer有多個版本,參考:你真的了解Flink Kafka source嗎?

FlinkKafkaProducer可以參考
https://www.programcreek.com/java-api-examples/?api=org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
3.1 從kafka的topic讀取數據 ,之后寫到另外一個kafka的topic中,使用 SimpleStringSchema,這里FlinkKafkaProducer會顯示過期,但不影響功能
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
if (pt.getNumberOfParameters() != 1) {
throw new Exception("Missing parameters!\n" +
"Usage: --conf-file <conf-file>");
}
String confFile = pt.get("conf-file");
pt = ParameterTool.fromPropertiesFile(confFile);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(pt);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer(
pt.getProperties().getProperty("input.topic"),
new SimpleStringSchema(),
pt.getProperties()
);
// 消費kafka
DataStream input = env.addSource(consumer);
// 打印
input.print();
// producer
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
pt.getProperties().getProperty("output.topic"),
new SimpleStringSchema(),
pt.getProperties()
);
// 往kafka中寫數據
input.addSink(producer);
env.execute("Kafka consumer Example");
}
配置文件
# kafka config bootstrap.servers=localhost:9092 input.topic=thrift_log_test output.topic=test group.id=test
輸出

3.2 也可以自行實現 KafkaSerializationSchema 接口來序列化string
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
public class KafkaProducerStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
public KafkaProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
然后
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
if (pt.getNumberOfParameters() != 1) {
throw new Exception("Missing parameters!\n" +
"Usage: --conf-file <conf-file>");
}
String confFile = pt.get("conf-file");
pt = ParameterTool.fromPropertiesFile(confFile);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(pt);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// consumer
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer(
pt.getProperties().getProperty("input.topic"),
new SimpleStringSchema(),
pt.getProperties()
);
// 消費kafka
DataStream input = env.addSource(consumer);
// 打印
input.print();
// producer
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
pt.getProperties().getProperty("output.topic"),
new KafkaProducerStringSerializationSchema(pt.getProperties().getProperty("output.topic")),
pt.getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
// 往kafka中寫數據
input.addSink(producer);
env.execute("Kafka consumer Example");
}
3.3 使用 ProducerRecord<String, String> 序列化
public class KafkaProducerRecordSerializationSchema implements KafkaSerializationSchema<ProducerRecord<String, String>> {
@Override
public ProducerRecord<byte[], byte[]> serialize(ProducerRecord<String, String> element, Long timestamp) {
return new ProducerRecord<>(element.topic(), element.value().getBytes(StandardCharsets.UTF_8));
}
}
然后
public static void main(String[] args) throws Exception {
ParameterTool pt = ParameterTool.fromArgs(args);
if (pt.getNumberOfParameters() != 1) {
throw new Exception("Missing parameters!\n" +
"Usage: --conf-file <conf-file>");
}
String confFile = pt.get("conf-file");
pt = ParameterTool.fromPropertiesFile(confFile);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(pt);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// consumer
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(
pt.getProperties().getProperty("input.topic"),
new KafkaConsumerRecordDeserializationSchema(),
pt.getProperties()
);
// 消費kafka
DataStream input = env.addSource(consumer);
String outputTopic = pt.getProperties().getProperty("output.topic");
// 轉換
DataStream output = input.map(new MapFunction<ConsumerRecord<String, String>, ProducerRecord<String, String>>() {
@Override
public ProducerRecord<String, String> map(ConsumerRecord<String, String> value) throws Exception {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outputTopic, value.value());
return producerRecord;
}
});
FlinkKafkaProducer<ProducerRecord<String, String>> producer = new FlinkKafkaProducer<>(
pt.getProperties().getProperty("output.topic"),
new KafkaProducerRecordSerializationSchema(),
pt.getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
// 往kafka中寫數據
output.addSink(producer);
output.print();
env.execute("Kafka consumer Example");
}
也可以參考官方文檔
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
代碼
package com.bigdata.flink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaExampleUtil {
public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool parameterTool)
throws Exception {
if (parameterTool.getNumberOfParameters() < 5) {
System.out.println("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
"--zookeeper.connect <zk quorum> --group.id <some id>");
throw new Exception("Missing parameters!\n" +
"Usage: Kafka --input-topic <topic> --output-topic <topic> " +
"--bootstrap.servers <kafka brokers> " +
"--zookeeper.connect <zk quorum> --group.id <some id>");
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
return env;
}
}
代碼
package com.bigdata.flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class KafkaExample {
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
// 消費kafka
DataStream input = env
.addSource(new FlinkKafkaConsumer("test_topic", new SimpleStringSchema(), parameterTool.getProperties()));
// 打印
input.print();
// 往kafka中寫數據
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
parameterTool.getProperties().getProperty("bootstrap.servers"), // broker list
"test_source", // target topic
new SimpleStringSchema()); // serialization schema
input.map(line -> line + "test").addSink(myProducer);
env.execute("Modern Kafka Example");
}
}
配置
--input-topic test_topic --output-topic test_source --bootstrap.servers master:9092 --zookeeper.connect master:2181 --group.id test_group
往kafka topic中放數據
/opt/cloudera/parcels/KAFKA/bin/kafka-console-producer --broker-list master:9092 --topic test_topic
輸出

消費flink程序寫入的topic
/opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --bootstrap-server master:9092 --topic test_source
輸出

4.數據重復問題
Flink自帶Exactly Once語義,對於支持事務的存儲,可以實現數據的不重不丟。Kafka在0.11.0版本的時候,支持了事務,參考:【干貨】Kafka 事務特性分析
要使用Flink實現Exactly Once,需要注意,參考:Flink exactly-once 實戰筆記
1. kafka的Producer寫入數據的時候需要通過事務來寫入,即使用Exactly-once語義的FlinkKafkaProducer;
2. 是kafka的consumer消費的時候,需要給消費者加上參數isolation.level=read_committed來保證未commit的消息對消費者不可見
Kafka端到端一致性需要注意的點,參考:Flink Kafka端到端精准一致性測試
1. Flink任務需要開啟checkpoint配置為CheckpointingMode.EXACTLY_ONCE
2. Flink任務FlinkKafkaProducer需要指定參數Semantic.EXACTLY_ONCE
3. Flink任務FlinkKafkaProducer配置需要配置transaction.timeout.ms,checkpoint間隔(代碼指定)<transaction.timeout.ms(默認為1小時)<transaction.max.timeout.ms(默認為15分鍾)
4. 消費端在消費FlinkKafkaProducer的topic時需要指定isolation.level(默認為read_uncommitted)為read_committed
關於flink讀寫kafka Exactly Once的最佳實踐,參考:Best Practices for Using Kafka Sources/Sinks in Flink Jobs
1. 在FlinkKafkaProducer開啟了Semantic.EXACTLY_ONCE之后,如果遇到一下報錯
Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms).
則需要調小Producer的transaction.timeout.ms參數,其默認值為1 hour,比如調整成
transaction.timeout.ms=300000
2. 開啟Semantic.EXACTLY_ONCE之后,需要保證transactional.id是唯一的
3. 設置做checkpoint的間隔時間,比如
StreamExecutionEnvironment env = ...; env.enableCheckpointing(1000); // unit is millisecond
4. 並發checkpoint,默認的FlinkKafkaProducer有一個5個KafkaProducers的線程池,支持並發做4個checkpoint
5. 需要注意kafka connect的版本
6. 當kafka集群不可用的時候,避免刷日志
min.insync.replicas
reconnect.backoff.max.ms reconnect.backoff.ms
關於kafka的事務機制和read_committed,參考:Kafka Exactly-Once 之事務性實現
5.flink讀寫kafka端到端exactly once
socket stream 寫入數據 -> flink讀取socket流式數據 -> 事務寫kafka -> flink使用isolation.level=read_committed來消費kafka數據 -> console打印數據
由於使用了checkpoint機制,在消費kafka的時候,只有當flink周期性做checkpoint成功后,才會提交offset;如果當flink任務掛掉的時候,對於未提交事務的消息,消費者是不可見的
