這里使用的是低級API,因為高級API非常不好用,需要繁瑣的配置,也不夠自動化,卻和低級API的效果一樣,所以這里以低級API做演示
你得有zookeeper和kafka
我這里是3台節點主機
架構圖
與高級API的區別,簡單並行(不需要創造多個輸入流,它會自動並行讀取kafka的數據),高效(不會像receiver數據被copy兩次),一次性語義(缺點:無法使用zookeeper的監控工具)
1.創建maven工程
首先添加pom依賴,其它運行依賴請參考 sparkStreaming整合WordCount
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.0.2</version> </dependency>
2.啟動zookeeper集群
我把zookeeper集群弄成了個腳本,直接執行腳本啟動所有zookeeper
啟動成功
3.啟動kafka集群
我這里是3台主機,三台都需要
進入目錄
cd /export/servers/kafka/bin/
啟動
kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
成功
4.測試kafka
創建topic
cd /export/servers/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic kafka_spark
通過生產者發送消息
cd /export/servers/kafka_2.11-0.10.2.1
bin/kafka-console-producer.sh --broker-list node01:9092 --topic kafka_spark
想發啥,發啥。此時通過創建AP接收生產者發送的數據
編寫代碼
package SparkStreaming import kafka.serializer.StringDecoder import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SparkStreamingKafka { def main(args: Array[String]): Unit = { // 1.創建SparkConf對象 val conf: SparkConf = new SparkConf() .setAppName("SparkStreamingKafka_Direct") .setMaster("local[2]") // 2.創建SparkContext對象 val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") // 3.創建StreamingContext對象 /** * 參數說明: * 參數一:SparkContext對象 * 參數二:每個批次的間隔時間 */ val ssc: StreamingContext = new StreamingContext(sc,Seconds(5)) //設置checkpoint目錄 ssc.checkpoint("./Kafka_Direct") // 4.通過KafkaUtils.createDirectStream對接kafka(采用是kafka低級api偏移量不受zk管理) // 4.1.配置kafka相關參數 val kafkaParams=Map("metadata.broker.list"->"192.168.52.110:9092,192.168.52.120:9092,192.168.52.130:9092","group.id"->"kafka_Direct") // 4.2.定義topic val topics=Set("kafka_spark") val dstream: InputDStream[(String, String)] = KafkaUtils .createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) // 5.獲取topic中的數據 val topicData: DStream[String] = dstream.map(_._2) // 6.切分每一行,每個單詞計為1 val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1)) // 7.相同單詞出現的次數累加 val resultDS: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) // 8.通過Output Operations操作打印數據 resultDS.print() // 9.開啟流式計算 ssc.start() // 阻塞一直運行 ssc.awaitTermination() } }
生產者生產數據
API接收控制台打印計算結果