kafka offset-check工具失效的問題


轉載請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/5414077.html   

       由於平時業務預警等需求,針對現在公司的kafka系統部署了幾套監控系統,包括調用kafka-consumer-offset-checker.sh腳本寫的lag監控,kafkaoffsetmonitor開源監控以及kafka-manager管理系統。最近發現kafka-consumer-offset-checker.sh腳本在原本運行正常的情況下一直出現"Exiting due to:null"的錯誤,這個問題會導致腳本直接退出無法獲取完整的partition的lag信息導致報警失效。嘗試把監控程序部署到其他機器又發現腳本可以正常運行。

  為了搞明白問題,直接把kafka-consumer-offset-checker.sh腳本調用的kafka類ConsumerOffsetChecker拿出來進行研究,發現最后輸出lag結果的方法如下

  private def processPartition(zkUtils: ZkUtils,
                               group: String, topic: String, pid: Int) {
    val topicPartition = TopicAndPartition(topic, pid)
    val offsetOpt = offsetMap.get(topicPartition)
    val groupDirs = new ZKGroupTopicDirs(group, topic)
    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1
    zkUtils.getLeaderForPartition(topic, pid) match {
      case Some(bid) =>
        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
        consumerOpt match {
          case Some(consumer) =>
            val topicAndPartition = TopicAndPartition(topic, pid)
            val request =
              OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
            val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

            val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
                                                                   owner match {case Some(ownerStr) => ownerStr case None => "none"}))
          case None => // ignore
        }
      case None =>
        println("No broker for partition %s - %s".format(topic, pid))
    }
  }

 

其中函數processPartition通過傳入的group,topic,pid三個參數唯一確定需要計算的lag。

val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head 獲取logSize

val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) 用logSize減去offsetOpt這個map里對應的partition的offset得到lag。

 

把kafka這個類的源碼搞到intellij idea在本地進行單步調試發現同樣出現了Exiting due to:null的問題,並且永遠是運行到某一特定分區后就問出題,調試到

val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head這個代碼報錯,嘗試加入try catch並打印對應bid

val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
println(" brokerid ",bid)
.............
............

try {

              //val

              val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

              val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
              println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
                owner match { case Some(ownerStr) => ownerStr case None => "none" }))
            }
            catch {
              case ex: Exception => //ignore
            }

  

 

研究發現對於不同的topic,出現問題的分區對應的broker id都是一樣的,至此懷疑是代碼環境與broker服務器之間的連通性出現問題,查了下本機以及監控環境的host配置的都是不全的,把host補全后問題解決。

 

后續發現kafkaoffsetmonitor以及kafka-manager出現的lag查詢頁面出現的分區顯示不全或者數據為空的情況都通過補全host解決了。

 

吐槽一下kafka對於host的強依賴。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM