Scala創建SparkStreaming獲取Kafka數據代碼過程


正文

  首先打開spark官網,找一個自己用版本我選的是1.6.3的,然后進入SparkStreaming   ,通過搜索這個位置找到Kafka,

  

    點擊過去會找到一段Scala的代碼    

     import org.apache.spark.streaming.kafka._

     val kafkaStream = KafkaUtils.createStream(streamingContext, 
       [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

 

     如果想看createStream方法,可以值通過SparkStreaming中的 Where to go from here 中看到,有Java,Scala,Python的documents選擇自己編碼的一種點擊進去。我這里用的Scala,點擊KafkaUtils進去后會看到這個類中有很多的方法,其中我們要找的是createStream方法,看看有哪些重載。我們把這個方法的解釋賦值過來。

    

  defcreateStream(jssc: JavaStreamingContextzkQuorum: String, groupId: String, topics: Map[String, Integer])JavaPairReceiverInputDStream[String, String]

       Create an input stream that pulls messages from Kafka Brokers. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.

       jssc

    JavaStreamingContext object

       zkQuorum

    Zookeeper quorum (hostname:port,hostname:port,..)

       groupId

    The group id for this consumer

       topics

    Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

       returns

    DStream of (Kafka message key, Kafka message value)

 

 

    最后我們在IDEA中寫Scala獲取Kafka代碼

    

  def main(args: Array[String]): Unit = {
     val spark = SparkSession.builder()
    .appName(Constants.SPARK_APP_NAME_PRODUCT)
    .getOrCreate()
     val map = Map("topic" -> 1)
     val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
     val createStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "hadoop01:9092,hadoop02:9092,hadoop03:9092", "groupId", map, StorageLevel.MEMORY_AND_DISK_SER)
     val map1: DStream[String] = createStream.map(_._2)

  }

  

    

     簡答的代碼過程,因為還有一些后續的工作要做,所以只是簡單的寫了一些從Kafa獲取數據的代碼從官網查找的一個過程,也是懷着學習的態度與大家一起交流,希望大牛們多多指點。

 

            i want to take you to travel ,this is my current mood

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM