flink系列-12、Flink 的 Connectors & Flink Kafka Connectors


  • Flink 里面预定义了一些 source 和 sink。
  • Flink 内部也提供了一些 Boundled connectors。
  • 通过异步 IO 方式
  • 自定义 Source & Sink。

一、Flink 预定义 & 自定义 Source 和 Sink

https://www.cnblogs.com/xiexiandong/p/12770187.html

二、Flink 绑定的connectors

需要专门引入对应的依赖,主要是实现外部数据进出Flink。

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Apache Bahir
  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

三、Flink kafka Connector

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>
Flink-Kafka-Consumer
def main(args: Array[String]): Unit = {
    // kafka 配置
    val kafkaConsumerProps: Properties = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
    kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
    // 获取 kafkaConsumer
    val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)

    val logPath: String = "/tmp/logs/flink_log"
    val conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 添加数据源
    val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)

    sourceDataStream.print()

    // 提交运行
    env.execute("FlinkKafkaExample")
  }

创建 FlinkKafkaConsumer:
FlinkKafkaConsumer011

三个构造参数:

  • 要消费的topic(topic name / topic list/正表达式)
  • DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的数据))
  • Kafka consumer的属性,其中三个属性必须提供:
    • bootstrap.servers(逗号分隔的Kafka broker列表)
    • group.id(consumer group id)

1、反序列化数据

  • 因为 kafka 中数据都是以二进制 byte 形式存储的。读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象。具体需要实现一个 schema 类,定义如何序列化和反序列数据。
  • 反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数,如果是反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口。
    •  // kafka 数据类型样例类
      case class KafkaEvent(message: String, eventTime: Long)
      
      //实现 DeserializationSchema 用来反序列化kafka中的数据,kafka 中的数据按照二进制存储,系列化为 java/scala对象
      //重写 deserialize(message: Array[Byte]) 方法
      //如果 kafka 中的数据是按照 KV 格式存储的,需要实现 KeyedDeserializationSchema
      class KafkaEventDeserializationSchema extends DeserializationSchema[KafkaEvent] {
        // 标记为无界流
        override def isEndOfStream(nextElement: KafkaEvent): Boolean = false
        
        override def deserialize(message: Array[Byte]): KafkaEvent = {
          KafkaEvent(new String(message), System.currentTimeMillis())
        }
      
        // 得到自定义序列化类型
        override def getProducedType: TypeInformation[KafkaEvent] = TypeInformation.of(new TypeHint[KafkaEvent] {})
      }
      
  • 另外 Flink 中也提供了一些常用的序列化反序列化的 schema 类。
    • 例如,SimpleStringSchema,按字符串方式进行序列化、反序列化。
    • TypeInformationSerializationSchema,它可根据 Flink 的 TypeInformation 信息来推断出需要选择的 schema。
    • JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,并返回 ObjectNode,可以使用 .get(“property”) 方法来访问相应字段。

2、消费起始位置设置

在构造好的 FlinkKafkaConsumer 类后面调用如下相应函数,设置合适的起始位置。

  • setStartFromGroupOffsets,也是默认的策略,从 group offset 位置读取数据,group offset 指的是 kafka broker 端记录的某个 group 的最后一次的消费位置。但是 kafka broker 端没有该 group 信息,会根据 kafka 的参数”auto.offset.reset”的设置来决定从哪个位置开始消费。
  • setStartFromEarliest,从 kafka 最早的位置开始读取。
  • setStartFromLatest,从 kafka 最新的位置开始读取。
  • setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取。Kafka 时戳,是指 kafka 为每条消息增加另一个时戳。该时戳可以表示消息在 proudcer 端生成时的时间、或进入到 kafka broker 时的时间。
  • setStartFromSpecificOffsets,从指定分区的 offset 位置开始读取,如指定的 offsets 中不存某个分区,该分区从 group offset 位置开始读取。此时需要用户给定一个具体的分区、offset 的集合。
  • 需要注意的是,因为 Flink 框架有容错机制,如果作业故障,如果作业开启 checkpoint,会从上一次 checkpoint 状态开始恢复。或者在停止作业的时候主动做 savepoint,启动作业时从 savepoint 开始恢复。这两种情况下恢复作业时,作业消费起始位置是从之前保存的状态中恢复,与上面提到跟 kafka 这些单独的配置无关
    kafka 设置消费位置
情形 谁决定起始位置
第一次启动, 无savepoint(常规情况) 由消费模式决定
通过savepoint启动(应用升级,比如加 大并行度) 由savepoint记录的offset决定
有checkpoint,失败后,job恢复的情况 由checkpoint的snapshoot中记录的offset决定
无checkpoint,失败后,job恢复的情况 由消费模式决定

3、topic 和 partition 动态发现

实际的生产环境中可能有这样一些需求,比如:

  • 场景一,有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。
  • 场景二,作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition。

针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。

kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000")
val topicPattern: Pattern = java.util.regex.Pattern.compile("test-topic-[0-9]")
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topicPattern, new SimpleStringSchema(), kafkaConsumerProps)
  • 针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。
  • 针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的 partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。

4、commit offset 方式

Flink kafka consumer commit offset 方式需要区分是否开启了 checkpoint。

  • 如果 checkpoint 关闭,commit offset 要依赖于 kafka 客户端的 auto commit。需设置 enable.auto.commit,auto.commit.interval.ms 参数到 consumer properties,就会按固定的时间间隔定期 auto commit offset 到 kafka。
enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。
  • 如果开启 checkpoint,这个时候作业消费的 offset 是 Flink 在 state 中自己管理和容错。此时提交 offset 到 kafka,一般都是作为外部进度的监控,想实时知道作业消费的位置和 lag 情况。此时需要 setCommitOffsetsOnCheckpoints 为 true 来设置当 checkpoint 成功时提交 offset 到 kafka。此时 commit offset 的间隔就取决于 checkpoint 的间隔,所以此时从 kafka 一侧看到的 lag 可能并非完全实时,如果 checkpoint 间隔比较长 lag 曲线可能会是一个锯齿状。

KakfaProducer 的配置
Flink-Kafka-Producer

def main(args: Array[String]): Unit = {
    // 获取 kafkaConsumer
    val kafkaConsumerProps: Properties = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
    kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
    val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)

    // 获取 KafkaProducer
    var kafkaProducerProps: Properties = new Properties()
    kafkaProducerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
    val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](KafkaConfig.PRODUCER_TOPIC, new SimpleStringSchema(), kafkaProducerProps)

    val logPath: String = "/tmp/logs/flink_log"
    val conf: Configuration = new Configuration()
    // 开启spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否则打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 获取本地运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)

    val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
    // 结果输出
    sourceDataStream.print()
    // 结果存入 kafka
    sourceDataStream.addSink(flinkKafkaProducer)
    // 开启任务
    env.execute("FlinkKafkaProducerExample")

  }
FlinkKafkaProducer011

1、Producer 分区

  • 使用 FlinkKafkaProducer 往 kafka 中写数据时,如果不单独设置 partition 策略,会默认使用 FlinkFixedPartitioner(此时无论是带 key 的数据,还是不带 key 都采用),该 partitioner 分区的方式是 task 所在的并发 id 对 topic 总 partition 数取余:parallelInstanceId % partitions.length。
    • 此时如果 sink 为 4,paritition 为 1,则 4 个 task 往同一个 partition 中写数据。
    • 但当 sink task < partition 个数时会有部分 partition 没有数据写入,例如 sink task 为2,partition 总数为 4,则后面两个 partition 将没有数据写入。
    • FlinkKafkaSink
  • 如果构建 FlinkKafkaProducer 时,partition 设置为 null,此时会使用 kafka producer 默认分区方式
    • 非 key 写入的情况下,使用 round-robin 的方式进行分区,每个 task 都会轮循的写下游的所有 partition。该方式下游的 partition 数据会比较均衡,但是缺点是 partition 个数过多的情况下需要维持过多的网络连接,即每个 task 都会维持跟所有 partition 所在 broker 的连接。
    • 带 key 的数据会根据 key,相同 key 数据分区的相同的 partition,如果 key 为 null,再轮询写。
    • FlinkKafkaSink

2、容错

Flink kafka 09、010 版本下:

  • 能达到 at-least-once 语义(数据有可能重复),通过 setLogFailuresOnly 为 false,setFlushOnCheckpoint 为 true 来控制。
    • setLogFailuresOnly,默认为 false,是控制写 kafka 失败时,是否只打印失败的 log 不抛异常让作业停止。
    • setFlushOnCheckpoint,默认为 true,是控制是否在 checkpoint 时 fluse 数据到 kafka,保证数据已经写到 kafka。否则数据有可能还缓存在 kafka 客户端的 buffer 中,并没有真正写出到 kafka,此时作业挂掉数据即丢失,不能做到至少一次的语义。

Flink kafka 011 版本下:


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM