Error- Overloaded method value createDirectStream in error Spark Streaming打包報錯


直接上代碼

StreamingExamples.setStreamingLogLevels()
    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    // 創建conf,spark streaming至少要啟動兩個線程,一個負責接受數據,一個負責處理數據
    val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")

    // 創建StreamingContext,每隔2秒產生一個批次
    val ssc = new StreamingContext(conf, Seconds(2));

    val topicsSet = topics.split(",").toSet

    // 配置Kafka參數
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    // 用直連方式讀取Kafka數據,在Kafka中讀取偏移量
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,// 位置策略(如果Kafka和spark程序在同一台機器,會從最優位置讀取數據【當前位置】)
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 訂閱策略(可以指定用正則的方式讀取topic【topic-*】)

    //====================在下面寫業務邏輯============================
    val lines = messages.map(_.value())
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x=>(x, 1L)).reduceByKey(_+_)
    wordCounts.print()
    //====================在上面寫業務邏輯============================

    ssc.start()
    ssc.awaitTermination()

  打包報錯

Error:(44, 49) overloaded method value createDirectStream with alternatives:
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig: org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] <and>
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.api.java.JavaInputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] <and>
  (ssc: org.apache.spark.streaming.StreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String],perPartitionConfig: org.apache.spark.streaming.kafka010.PerPartitionConfig)org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] <and>
  (ssc: org.apache.spark.streaming.StreamingContext,locationStrategy: org.apache.spark.streaming.kafka010.LocationStrategy,consumerStrategy: org.apache.spark.streaming.kafka010.ConsumerStrategy[String,String])org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]]
 cannot be applied to (org.apache.spark.streaming.StreamingContext, org.apache.spark.streaming.kafka010.LocationStrategy, org.apache.spark.streaming.kafka010.ConsumerStrategy[Nothing,Nothing])
    val messages = KafkaUtils.createDirectStream[String, String](

這是一個很長的信息,說主題需要設置[字符串],而不是設置[字符]。

我能看到解決這個問題的最佳方法是:

val topicsSet = topics.toString.split(",").toSet

但是,如果你真的只有一個主題,那么只需按照上面的Set(topics)將字符串拆分成一組單個字符。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM