Spark Streaming + Kafka java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative


20/04/28 19:40:00 ERROR JobScheduler: Error generating jobs for time 1588074000000 ms
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:233)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
	at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:285)
	at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)

 Spark streaming2.2.0 + kafka_2.11_0.10.0.1

  設置 enable.auto.commit 為 false,通過ZK手動維護offset,程序正常運行,分別通過zkClint和kafka腳本查看偏移量,發現kafka中偏移量確實沒有提交,zk中每個批次正常提交,程序stop,然后再次啟動報上圖錯誤。

異常原因:

  定位代碼:

    

  此處判斷了numRecords>=0,否則會拋出異常

 

  

 

 

   rdd.count的邏輯

  

 

  

  fromOffset來自zk中保存;
  untilOffset通過DirectKafkaInputDStream第211行

  

 

  計算得到最新的offset,然后使用spark.streaming.kafka.maxRatePerPartition做clamp,得到允許的最大untilOffsets,而此時kafka中offset並沒有提交,偏移量小於zk中的偏移量,導致計算的numRecords為負數。

  解決辦法:

  手動設置zk中偏移量和kafka中相同,並且在kafka異步提交偏移量。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM