對應出異常的代碼是:val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
做foreachRDD的DStream必須是直接從KafkaUtils.createDirectStream拿到的,才能轉換為kafkaRDD. 后面做其他操作的時候會把kafkaRDD轉換為非kafkaRDD。也就是說這個HasOffsetRanges接口,只有kafkaRDD這個實現類。所以如果從kafka拿到DStream,后面需要使用foreachRDD,那么這個DStream必須是直接從KafkaUtils.createDirectStream拿到,中間不能再做其他的操作。