val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello", valueDeserializer, kafkaProps)
// 指定消費策略
helloStream.setStartFromEarliest() // - 從最早的記錄開始;
helloStream.setStartFromLatest() //- 從最新記錄開始;
helloStream.setStartFromTimestamp(null); // 從指定的epoch時間戳(毫秒)開始;
helloStream.setStartFromGroupOffsets(); // 默認行為,從上次消費的偏移量進行繼續消費。
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
val specificStartOffsets = new mutable.HashMap[KafkaTopicPartition,Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) // 第一個分區從23L開始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L) // 第二個分區從31L開始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) // 第三個分區從43L開始
helloStream.setStartFromSpecificOffsets(specificStartOffsets)
// Kafka支持Topic自動發現,也就是用正則的方式創建FlinkKafkaConsumer