SparkStreaming 整合kafka Demo


 

這里使用的是低級API,因為高級API非常不好用,需要繁瑣的配置,也不夠自動化,卻和低級API的效果一樣,所以這里以低級API做演示

你得有zookeeper和kafka

我這里是3台節點主機

架構圖

與高級API的區別,簡單並行(不需要創造多個輸入流,它會自動並行讀取kafka的數據),高效(不會像receiver數據被copy兩次),一次性語義(缺點:無法使用zookeeper的監控工具)

 

1.創建maven工程

首先添加pom依賴,其它運行依賴請參考 sparkStreaming整合WordCount

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.0.2</version>
</dependency>

2.啟動zookeeper集群

我把zookeeper集群弄成了個腳本,直接執行腳本啟動所有zookeeper

 

啟動成功

3.啟動kafka集群

我這里是3台主機,三台都需要

進入目錄

cd /export/servers/kafka/bin/

啟動

kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties 

 

成功

4.測試kafka

創建topic

cd /export/servers/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic kafka_spark

通過生產者發送消息

cd /export/servers/kafka_2.11-0.10.2.1
bin/kafka-console-producer.sh --broker-list node01:9092 --topic  kafka_spark

想發啥,發啥。此時通過創建AP接收生產者發送的數據

編寫代碼

package SparkStreaming

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkStreamingKafka {
  def main(args: Array[String]): Unit = {
    // 1.創建SparkConf對象
    val conf: SparkConf = new SparkConf()
      .setAppName("SparkStreamingKafka_Direct")
      .setMaster("local[2]")

    // 2.創建SparkContext對象
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    // 3.創建StreamingContext對象
    /**
      * 參數說明:
      *   參數一:SparkContext對象
      *   參數二:每個批次的間隔時間
      */
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
    //設置checkpoint目錄

    ssc.checkpoint("./Kafka_Direct")

    // 4.通過KafkaUtils.createDirectStream對接kafka(采用是kafka低級api偏移量不受zk管理)
    // 4.1.配置kafka相關參數
    val kafkaParams=Map("metadata.broker.list"->"192.168.52.110:9092,192.168.52.120:9092,192.168.52.130:9092","group.id"->"kafka_Direct")
    // 4.2.定義topic
    val topics=Set("kafka_spark")

    val dstream: InputDStream[(String, String)] = KafkaUtils
      .createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

    // 5.獲取topic中的數據
    val topicData: DStream[String] = dstream.map(_._2)

    // 6.切分每一行,每個單詞計為1
    val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))

    // 7.相同單詞出現的次數累加
    val resultDS: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)

    // 8.通過Output Operations操作打印數據
    resultDS.print()

    // 9.開啟流式計算
    ssc.start()

    // 阻塞一直運行
    ssc.awaitTermination()



  }
}

 

生產者生產數據

API接收控制台打印計算結果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM