轉載請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/5414077.html
由於平時業務預警等需求,針對現在公司的kafka系統部署了幾套監控系統,包括調用kafka-consumer-offset-checker.sh腳本寫的lag監控,kafkaoffsetmonitor開源監控以及kafka-manager管理系統。最近發現kafka-consumer-offset-checker.sh腳本在原本運行正常的情況下一直出現"Exiting due to:null"的錯誤,這個問題會導致腳本直接退出無法獲取完整的partition的lag信息導致報警失效。嘗試把監控程序部署到其他機器又發現腳本可以正常運行。
為了搞明白問題,直接把kafka-consumer-offset-checker.sh腳本調用的kafka類ConsumerOffsetChecker拿出來進行研究,發現最后輸出lag結果的方法如下
private def processPartition(zkUtils: ZkUtils, group: String, topic: String, pid: Int) { val topicPartition = TopicAndPartition(topic, pid) val offsetOpt = offsetMap.get(topicPartition) val groupDirs = new ZKGroupTopicDirs(group, topic) val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1 zkUtils.getLeaderForPartition(topic, pid) match { case Some(bid) => val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid)) consumerOpt match { case Some(consumer) => val topicAndPartition = TopicAndPartition(topic, pid) val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), owner match {case Some(ownerStr) => ownerStr case None => "none"})) case None => // ignore } case None => println("No broker for partition %s - %s".format(topic, pid)) } }
其中函數processPartition通過傳入的group,topic,pid三個參數唯一確定需要計算的lag。
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head 獲取logSize
val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) 用logSize減去offsetOpt這個map里對應的partition的offset得到lag。
把kafka這個類的源碼搞到intellij idea在本地進行單步調試發現同樣出現了Exiting due to:null的問題,並且永遠是運行到某一特定分區后就問出題,調試到
val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head這個代碼報錯,嘗試加入try catch並打印對應bid
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid)) println(" brokerid ",bid) ............. ............ try { //val val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), owner match { case Some(ownerStr) => ownerStr case None => "none" })) } catch { case ex: Exception => //ignore }
研究發現對於不同的topic,出現問題的分區對應的broker id都是一樣的,至此懷疑是代碼環境與broker服務器之間的連通性出現問題,查了下本機以及監控環境的host配置的都是不全的,把host補全后問題解決。
后續發現kafkaoffsetmonitor以及kafka-manager出現的lag查詢頁面出現的分區顯示不全或者數據為空的情況都通過補全host解決了。
吐槽一下kafka對於host的強依賴。