關於kafka定期清理日志后再消費報錯kafka.common.OffsetOutOfRangeException的解決


 

 

 環境:

kafka  0.10

spark  2.1.0

zookeeper  3.4.5-cdh5.14.0

公司阿里雲測試機,十月一放假前,沒有在繼續消費,假期過后回來再使用spark streaming消費某個消費組下的kafka時報錯如下:

As I regularly kill the servers running Kafka and the producers feeding it (yes, just for fun), things sometimes go a bit crazy, not entirely sure why but I got the error:

kafka.common.OffsetOutOfRangeError: FetchResponse(topic='my_messages', partition=0, error=1, highwaterMark=-1, messages=)
To fix it I added the “seek” setting:

consumer.seek(0,2)

 

出現問題的原因:

kafka會定時清理日志

當我們的任務開始的時候,如果之前消費過某個topic,那么這個topic會在zk上設置offset,我們一般會去獲取這個offset來繼續從上次結束的地方繼續消費,但是kafka定時清理日志的功能,比如定時一天一清理,那么如果你的offset是前天消費的offset,那么這個時候你再去消費,自然而然的你的offset肯定已經不在有效范圍內,所以就報OffsetOutOfRangeException了

解決:

需要在發現zk_offset<earliest_offset>時矯正zk_offset為合法值

 

前期完整代碼

https://www.cnblogs.com/niutao/p/10547831.html

 

改正后的關鍵代碼:

/**
* 獲取最小offset
* Returns the earliest (lowest) available offsets, taking new partitions into account.
*
* @param kafkaParams kafka客戶端配置
* @param topics 獲取獲取offset的topic
*/
def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
consumer.subscribe(topics)
val notOffsetTopicPartition = mutable.Set[TopicPartition]()
try {
consumer.poll(0)
} catch {
case ex: NoOffsetForPartitionException =>
log.warn(s"consumer topic partition offset not found:${ex.partition()}")
notOffsetTopicPartition.add(ex.partition())
}
val parts = consumer.assignment().toSet
consumer.pause(parts)
consumer.seekToBeginning(parts)
consumer.pause(parts)
val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
consumer.unsubscribe()
consumer.close()
offsets
}

 

/**
    * 獲取最大offset
    * Returns the latest (highest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客戶端配置
    * @param topics      需要獲取offset的topic
    **/
  def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()
    try {
      consumer.poll(0)
    } catch {
      case ex: NoOffsetForPartitionException =>
        log.warn(s"consumer topic partition offset not found:${ex.partition()}")
        notOffsetTopicPartition.add(ex.partition())
    }
    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    consumer.seekToEnd(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

 

val earliestOffsets = getEarliestOffsets(kafkaParams , topics)
    val latestOffsets = getLatestOffsets(kafkaParams , topics)
    for((k,v) <- topicPartOffsetMap.toMap){
      val current = v
      val earliest = earliestOffsets.get(k).get
      val latest = latestOffsets.get(k).get
      if (current > latest || current < earliest) {
        log.warn("矯正offset: " + current +" -> "+ earliest);
        topicPartOffsetMap.put(k , earliest)
      }
    }

 

完整代碼,拿去直接用就可以了

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, NoOffsetForPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat

import scala.collection.JavaConversions._
import scala.collection.mutable
/**
  * Kafka的連接和Offset管理工具類
  *
  * @param zkHosts     Zookeeper地址
  * @param kafkaParams Kafka啟動參數
  */
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
  //Logback日志對象,使用slf4j框架
  @transient private lazy val log = LoggerFactory.getLogger(getClass)
  //建立ZkUtils對象所需的參數
  val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
  //  zkClient.setZkSerializer(new MyZkSerializer())
  //ZkUtils對象,用於訪問Zookeeper
  val zkUtils = new ZkUtils(zkClient, zkConnection, false)


  /**
    * 獲取最小offset
    * Returns the earliest (lowest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客戶端配置
    * @param topics      獲取獲取offset的topic
    */
  def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()
    try {
      consumer.poll(0)
    } catch {
      case ex: NoOffsetForPartitionException =>
        log.warn(s"consumer topic partition offset not found:${ex.partition()}")
        notOffsetTopicPartition.add(ex.partition())
    }
    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    consumer.seekToBeginning(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 獲取最大offset
    * Returns the latest (highest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客戶端配置
    * @param topics      需要獲取offset的topic
    **/
  def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()
    try {
      consumer.poll(0)
    } catch {
      case ex: NoOffsetForPartitionException =>
        log.warn(s"consumer topic partition offset not found:${ex.partition()}")
        notOffsetTopicPartition.add(ex.partition())
    }
    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    consumer.seekToEnd(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 獲取消費者當前offset
    *
    * @param consumer   消費者
    * @param partitions topic分區
    * @return
    */
  def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }
  /**
    * 從Zookeeper讀取Kafka消息隊列的Offset
    *
    * @param topics  Kafka話題
    * @param groupId Kafka Group ID
    * @return 返回一個Map[TopicPartition, Long],記錄每個話題每個Partition上的offset,如果還沒消費,則offset為0
    */
  def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
    val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)
    // /consumers/<groupId>/offsets/<topic>/
    partitionMap.foreach(topicPartitions => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
      topicPartitions._2.foreach(partition => {
        val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
        val tryGetKafkaOffset = Try {
          val offsetStatTuple = zkUtils.readData(offsetPath)
          if (offsetStatTuple != null) {
            log.info("查詢Kafka消息偏移量詳情: 話題:{}, 分區:{}, 偏移量:{}, ZK節點路徑:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
          }
        }
        if(tryGetKafkaOffset.isFailure){
          //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
          val consumer = new KafkaConsumer[String, Object](kafkaParams)
          val partitionList = List(new TopicPartition(topicPartitions._1, partition))
          consumer.assign(partitionList)
          val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
          consumer.close()
          log.warn("查詢Kafka消息偏移量詳情: 沒有上一次的ZK節點:{}, 話題:{}, 分區:{}, ZK節點路徑:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
          topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
        }
      })
    })
    //TODO  解決kafka中數據還沒來得及消費,數據就已經丟失或者過期了#########################
    //Offsets out of range with no configured reset policy for partition
    //獲取EarliestOffsets
    val earliestOffsets = getEarliestOffsets(kafkaParams , topics)
    val latestOffsets = getLatestOffsets(kafkaParams , topics)
    for((k,v) <- topicPartOffsetMap.toMap){
      val current = v
      val earliest = earliestOffsets.get(k).get
      val latest = latestOffsets.get(k).get
      if (current > latest || current < earliest) {
        log.warn("矯正offset: " + current +" -> "+ earliest);
        topicPartOffsetMap.put(k , earliest)
      }
    }

    topicPartOffsetMap.toMap
  }

  //#########################################################
  /**
    * 包裝createDirectStream方法,支持Kafka Offset,用於創建Kafka Streaming流
    *
    * @param ssc    Spark Streaming Context
    * @param topics Kafka話題
    * @tparam K Kafka消息Key類型
    * @tparam V Kafka消息Value類型
    * @return Kafka Streaming流
    */
  def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
    val groupId = kafkaParams("group.id").toString
    //TODO
        val storedOffsets: Map[TopicPartition, Long] = readOffsets(topics, groupId)
//    val storedOffsets: Map[TopicPartition, Long] = getCurrentOffset(kafkaParams , topics)
    log.info("Kafka消息偏移量匯總(格式:(話題,分區號,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
    val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
    kafkaStream
  }

  /**
    * 保存Kafka消息隊列消費的Offset
    *
    * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
    * @param storeEndOffset true=保存結束offset, false=保存起始offset
    */
  def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
    val groupId = kafkaParams("group.id").toString
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsList.foreach(or => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
      val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
      zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
      log.debug("保存Kafka消息偏移量詳情: 話題:{}, 分區:{}, 偏移量:{}, ZK節點路徑:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
    })
  }


}
kafka的offset管理代碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM