1.說明
雖然DStream可以轉換成RDD,但是如果比較復雜,可以考慮使用SparkSQL。
2.集成方式
Streaming和Core整合:
transform或者foreachRDD方法
Core和SQL整合:
RDD <==> DataFrame 互換
3.程序
1 package com.sql.it 2 import org.apache.spark.sql.SQLContext 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.kafka.KafkaUtils 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 import org.apache.spark.{SparkConf, SparkContext} 7 object StreamingSQL { 8 def main(args: Array[String]): Unit = { 9 val conf = new SparkConf() 10 .setAppName("StreamingWindowOfKafka22") 11 .setMaster("local[*]") 12 val sc = SparkContext.getOrCreate(conf) 13 val ssc = new StreamingContext(sc, Seconds(5)) 14 // 當調用updateStateByKey函數API的時候,必須給定checkpoint dir 15 // 路徑對應的文件夾不能存在 16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351") 17 18 val kafkaParams = Map( 19 "group.id" -> "streaming-kafka-78912151", 20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka", 21 "auto.offset.reset" -> "smallest" 22 ) 23 val topics = Map("beifeng" -> 4) // topics中value是讀取數據的線程數量,所以必須大於等於1 24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( 25 ssc, // 給定SparkStreaming上下文 26 kafkaParams, // 給定連接kafka的參數信息 ===> 通過Kafka HighLevelConsumerAPI連接 27 topics, // 給定讀取對應topic的名稱以及讀取數據的線程數量 28 StorageLevel.MEMORY_AND_DISK_2 // 指定數據接收器接收到kafka的數據后保存的存儲級別 29 ).map(_._2) 30 31 /** 32 * transform:將DStream的操作轉換為RDD的操作,調用該api最終只需要返回一個新的RDD即可 33 */ 34 dstream.transform(rdd => { 35 // 使用sql統計wordcoount 36 val sqlContext = SQLContextSingelton.getSQLContext(rdd.sparkContext) 37 import sqlContext.implicits._ 38 val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1))) 39 procedRDD.toDF("word", "c").registerTempTable("tb_word") 40 val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => { 41 val word = row.getAs[String]("word") 42 val count = row.getAs[Long]("vc") 43 (word, count) 44 }) 45 46 resultRDD 47 }).print() 48 49 // 啟動開始處理 50 ssc.start() 51 ssc.awaitTermination() // 等等結束,監控一個線程的中斷操作 52 } 53 } 54 55 object SQLContextSingelton { 56 @transient private var instance: SQLContext = _ 57 58 def getSQLContext(sc: SparkContext): SQLContext = { 59 if (instance == null) { 60 synchronized[SQLContext] { 61 if (instance == null) { 62 instance = new SQLContext(sc) 63 } 64 instance 65 } 66 } 67 instance 68 } 69 }
4.效果