flink 根據時間消費kafka


經常遇到這樣的場景,13點-14點的時候flink程序發生了故障,或者集群崩潰,導致實時程序掛掉1小時,程序恢復的時候想把程序倒回13點或者更前,重新消費kafka中的數據.

下面的代碼就是根據指定時間戳(也可以換算成時間)開始消費數據,支持到這樣就靈活了,可以在啟動命令中加個參數,然后再配個守護程序來控制程序.


flink代碼

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.cdp.kafka.KafkaOffsetFind

object flinkkafka1 {
  def main(args: Array[String]): Unit = {

    /** ***************************************************************************************************************
      * kafka info
      */
    val zkCluster = "localhost:2181"
    val kafkaCluster = "localhost:9092"
    val topic = "cdp20"
    val timestamp = 1519804800000L

    /** ***************************************************************************************************************
      * flink env
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    /** ***************************************************************************************************************
      * create kafka stream
      */
    val props = new Properties()
    props.setProperty("bootstrap.servers", kafkaCluster)
    props.setProperty("zookeeper.connect", zkCluster)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("group.id", "cdp20-c1")


    /* ***********************************************************************************************************
     * stream
     */
    //找到時間戳對應偏移量
    val offsetFinder = new KafkaOffsetFind[String]
    val offset = offsetFinder.useTimestamp(timestamp,topic,props)
    print(offset)
    
    val kafkaOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]
    for (o <- offset) {
      kafkaOffsets.put(new KafkaTopicPartition(topic, o._1), o._2)
    }

    //創建根據時間消費kafka的數據流
    val kafkaTime = env
      .addSource {
        new FlinkKafkaConsumer010[String](topic,
          new KeyedDeserializationSchemaWithKey(new DefaultStringDeserializer),
          props)
          .setStartFromSpecificOffsets(kafkaOffsets)
      }

    /** ***************************************************************************************************************
      * exec
      */

    kafkaTime.print()


    /** ***************************************************************************************************************
      * flink execute
      */
    env.execute("flink-kafka")

  }
}

kafka根據時間找偏移量代碼

import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._

/* ***********************************************************************************************************
 * 作者:陳大炮
 * 時間:2018-02-28
 * 內容:根據時間消費kafka
 *      使用unix時間戳,查找kafka分區對應的偏移量
 */

class KafkaOffsetFind[T] {

  //超時時間
  val POLL_TIMEOUT = 2000

  //使用時間查詢
  def useTimestamp(timestamp: Long, topic: String, kafkaProps: Properties): List[(Int, Long)] = {

    //創建消費者,獲得消費者分區
    val consumer = createConsumer(kafkaProps)
    consumer.subscribe(util.Arrays.asList(topic))
    consumer.poll(POLL_TIMEOUT)
    val partitions = consumer.assignment().asScala.toList

    //拼出一個查詢map
    val findMap = new util.HashMap[TopicPartition, java.lang.Long]
    partitions
      .foreach {
        c =>
          findMap.put(new TopicPartition(topic, c.partition()), timestamp)
      }

    //使用查詢map去獲得偏移量
    val offsetMap = consumer.offsetsForTimes(findMap)

    //返回前關閉下消費者
    consumer.close()

    //返回分區號和對應的偏移量
    partitions.map {
      p =>
        (p.partition(), offsetMap.get(new TopicPartition(topic, 0)).offset())
    }

  }

  //創建消費者
  protected def createConsumer(kafkaProps: Properties): KafkaConsumer[String, T] = {
    val props = kafkaProps.clone().asInstanceOf[Properties]
    props.put("enable.auto.commit", "false")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    new KafkaConsumer[String, T](props)
  }
}

注意事項(由漂泊的美好提供)
1.使用KafkaConsumer.offsetsForTimes要確認集群已開啟log.message.timestamp.type參數
2.client端要使用0.10.*的客戶端發送數據,使用低版本會造成數據格式不同問題


參考內容
http://blog.csdn.net/forrest_ou/article/details/78978575
https://github.com/noris-network/KafkaOffsetFinder


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM