Flink Kafka consumer的消费策略配置


val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello", valueDeserializer, kafkaProps)
// 指定消费策略
helloStream.setStartFromEarliest() // - 从最早的记录开始;
helloStream.setStartFromLatest() //- 从最新记录开始;
helloStream.setStartFromTimestamp(null); // 从指定的epoch时间戳(毫秒)开始;
helloStream.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
val specificStartOffsets = new mutable.HashMap[KafkaTopicPartition,Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L) // 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) // 第三个分区从43L开始
helloStream.setStartFromSpecificOffsets(specificStartOffsets)

// Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM