- Flink 里面预定义了一些 source 和 sink。
- Flink 内部也提供了一些 Boundled connectors。
- 通过异步 IO 方式。
- 自定义 Source & Sink。
一、Flink 预定义 & 自定义 Source 和 Sink
https://www.cnblogs.com/xiexiandong/p/12770187.html
二、Flink 绑定的connectors
需要专门引入对应的依赖,主要是实现外部数据进出Flink。
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Apache Bahir
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
三、Flink kafka Connector
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
3.1、Flink-Kafka-Consumer
def main(args: Array[String]): Unit = {
// kafka 配置
val kafkaConsumerProps: Properties = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
// 获取 kafkaConsumer
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)
val logPath: String = "/tmp/logs/flink_log"
val conf: Configuration = new Configuration()
// 开启spark-webui
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//配置webui的日志文件
conf.setString("web.log.path", logPath)
// 配置 taskManager 的日志文件,否则打印日志到控制台
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
// 获取本地运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// 添加数据源
val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
sourceDataStream.print()
// 提交运行
env.execute("FlinkKafkaExample")
}
创建 FlinkKafkaConsumer:

三个构造参数:
- 要消费的topic(topic name / topic list/正表达式)
- DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的数据))
- Kafka consumer的属性,其中三个属性必须提供:
- bootstrap.servers(逗号分隔的Kafka broker列表)
- group.id(consumer group id)
1、反序列化数据
- 因为 kafka 中数据都是以二进制 byte 形式存储的。读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象。具体需要实现一个 schema 类,定义如何序列化和反序列数据。
- 反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数,如果是反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口。
-
// kafka 数据类型样例类 case class KafkaEvent(message: String, eventTime: Long) //实现 DeserializationSchema 用来反序列化kafka中的数据,kafka 中的数据按照二进制存储,系列化为 java/scala对象 //重写 deserialize(message: Array[Byte]) 方法 //如果 kafka 中的数据是按照 KV 格式存储的,需要实现 KeyedDeserializationSchema class KafkaEventDeserializationSchema extends DeserializationSchema[KafkaEvent] { // 标记为无界流 override def isEndOfStream(nextElement: KafkaEvent): Boolean = false override def deserialize(message: Array[Byte]): KafkaEvent = { KafkaEvent(new String(message), System.currentTimeMillis()) } // 得到自定义序列化类型 override def getProducedType: TypeInformation[KafkaEvent] = TypeInformation.of(new TypeHint[KafkaEvent] {}) }
-
- 另外 Flink 中也提供了一些常用的序列化反序列化的 schema 类。
- 例如,SimpleStringSchema,按字符串方式进行序列化、反序列化。
- TypeInformationSerializationSchema,它可根据 Flink 的 TypeInformation 信息来推断出需要选择的 schema。
- JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,并返回 ObjectNode,可以使用 .get(“property”) 方法来访问相应字段。
2、消费起始位置设置
在构造好的 FlinkKafkaConsumer 类后面调用如下相应函数,设置合适的起始位置。
- setStartFromGroupOffsets,也是默认的策略,从 group offset 位置读取数据,group offset 指的是 kafka broker 端记录的某个 group 的最后一次的消费位置。但是 kafka broker 端没有该 group 信息,会根据 kafka 的参数”auto.offset.reset”的设置来决定从哪个位置开始消费。
- setStartFromEarliest,从 kafka 最早的位置开始读取。
- setStartFromLatest,从 kafka 最新的位置开始读取。
- setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka 时戳,是指 kafka 为每条消息增加另一个时戳。该时戳可以表示消息在 proudcer 端生成时的时间、或进入到 kafka broker 时的时间。
- setStartFromSpecificOffsets,从指定分区的 offset 位置开始读取,如指定的 offsets 中不存某个分区,该分区从 group offset 位置开始读取。此时需要用户给定一个具体的分区、offset 的集合。
- 需要注意的是,因为 Flink 框架有容错机制,如果作业故障,如果作业开启 checkpoint,会从上一次 checkpoint 状态开始恢复。或者在停止作业的时候主动做 savepoint,启动作业时从 savepoint 开始恢复。这两种情况下恢复作业时,作业消费起始位置是从之前保存的状态中恢复,与上面提到跟 kafka 这些单独的配置无关。

| 情形 | 谁决定起始位置 |
|---|---|
| 第一次启动, 无savepoint(常规情况) | 由消费模式决定 |
| 通过savepoint启动(应用升级,比如加 大并行度) | 由savepoint记录的offset决定 |
| 有checkpoint,失败后,job恢复的情况 | 由checkpoint的snapshoot中记录的offset决定 |
| 无checkpoint,失败后,job恢复的情况 | 由消费模式决定 |
3、topic 和 partition 动态发现
实际的生产环境中可能有这样一些需求,比如:
- 场景一,有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。
- 场景二,作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition。
针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000")
val topicPattern: Pattern = java.util.regex.Pattern.compile("test-topic-[0-9]")
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topicPattern, new SimpleStringSchema(), kafkaConsumerProps)
- 针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。
- 针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的 partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。
4、commit offset 方式
Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint。
- 如果 checkpoint 关闭,commit offset 要依赖于 kafka 客户端的 auto commit。需设置 enable.auto.commit,auto.commit.interval.ms 参数到 consumer properties,就会按固定的时间间隔定期 auto commit offset 到 kafka。
enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
- 如果开启 checkpoint,这个时候作业消费的 offset 是 Flink 在 state 中自己管理和容错。此时提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况。此时需要 setCommitOffsetsOnCheckpoints 为 true 来设置当 checkpoint 成功时提交 offset 到 kafka。此时 commit offset 的间隔就取决于 checkpoint 的间隔,所以此时从 kafka 一侧看到的 lag 可能并非完全实时,如果 checkpoint 间隔比较长 lag 曲线可能会是一个锯齿状。
3.2、Flink-Kafka-Producer
def main(args: Array[String]): Unit = {
// 获取 kafkaConsumer
val kafkaConsumerProps: Properties = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)
// 获取 KafkaProducer
var kafkaProducerProps: Properties = new Properties()
kafkaProducerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](KafkaConfig.PRODUCER_TOPIC, new SimpleStringSchema(), kafkaProducerProps)
val logPath: String = "/tmp/logs/flink_log"
val conf: Configuration = new Configuration()
// 开启spark-webui
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//配置webui的日志文件
conf.setString("web.log.path", logPath)
// 配置 taskManager 的日志文件,否则打印日志到控制台
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
// 获取本地运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
// 结果输出
sourceDataStream.print()
// 结果存入 kafka
sourceDataStream.addSink(flinkKafkaProducer)
// 开启任务
env.execute("FlinkKafkaProducerExample")
}
1、Producer 分区
- 使用 FlinkKafkaProducer 往 kafka 中写数据时,如果不单独设置 partition 策略,会默认使用 FlinkFixedPartitioner(此时无论是带 key 的数据,还是不带 key 都采用),该 partitioner 分区的方式是 task 所在的并发 id 对 topic 总 partition 数取余:parallelInstanceId % partitions.length。
- 此时如果 sink 为 4,paritition 为 1,则 4 个 task 往同一个 partition 中写数据。
- 但当 sink task < partition 个数时会有部分 partition 没有数据写入,例如 sink task 为2,partition 总数为 4,则后面两个 partition 将没有数据写入。
-
- 如果构建 FlinkKafkaProducer 时,partition 设置为 null,此时会使用 kafka producer 默认分区方式
- 非 key 写入的情况下,使用 round-robin 的方式进行分区,每个 task 都会轮循的写下游的所有 partition。该方式下游的 partition 数据会比较均衡,但是缺点是 partition 个数过多的情况下需要维持过多的网络连接,即每个 task 都会维持跟所有 partition 所在 broker 的连接。
- 带 key 的数据会根据 key,相同 key 数据分区的相同的 partition,如果 key 为 null,再轮询写。
-
2、容错
Flink kafka 09、010 版本下:
- 能达到 at-least-once 语义(数据有可能重复),通过 setLogFailuresOnly 为 false,setFlushOnCheckpoint 为 true 来控制。
- setLogFailuresOnly,默认为 false,是控制写 kafka 失败时,是否只打印失败的 log 不抛异常让作业停止。
- setFlushOnCheckpoint,默认为 true,是控制是否在 checkpoint 时 fluse 数据到 kafka,保证数据已经写到 kafka。否则数据有可能还缓存在 kafka 客户端的 buffer 中,并没有真正写出到 kafka,此时作业挂掉数据即丢失,不能做到至少一次的语义。
Flink kafka 011 版本下:
- 通过两阶段提交的 sink 结合 kafka 事务的功能,可以保证端到端精准一次。
- 详细原理参考:https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

