Data streaming轉為DataFrame,不能直接一步轉到DF,需要先轉為RDD,然后再轉到DF,我們用流式處理數據后,再通過spark sql實時獲取我們想要的結果。
1.首先老規矩,創建spark上下文對象,spark SQL和spark Streaming,再創建個socket在Linux端打入數據。
1 val conf = new SparkConf().setAppName("Demo04DSToRDDToDF").setMaster("local[2]") 2 3 conf.set("spark.sql.shuffle.partitions", "1") 4 val sc = new SparkContext(conf) 5 val sqlContext = new SQLContext(sc) 6 7 val ssc = new StreamingContext(sc,Durations.seconds(5)) 8 9 val lines = ssc.socketTextStream("master",8888)
2.首先用foreachRDD方法把spark streaming轉為RDD
1 /** 2 * 3 * DS -> RDD -> DF 4 * 5 */ 6 lines.foreachRDD(rdd => { 7 val stuRDD = rdd.map(line => { 8 val split = line.split(",") 9 (split(0),split(1),split(2).toInt,split(3),split(4)) 10 })
3.導入sqlContext隱式轉換,將RDD To成DF,同時傳入column對應RDD返回值
1 //導入隱式轉換,將RDD轉換為DF 2 import sqlContext.implicits._ 3 4 val stuDF = stuRDD.toDF("id","name","age","gender","clazz") 5 //注冊成表 6 stuDF.registerTempTable("student") 7 8 val result = sqlContext.sql("select clazz,count(1) from student group by clazz") 9 10 result.show() 11 result.write.mode(SaveMode.Append).json("Spark/data/dsondf")
總結,這里的sqlContext必須自己創建好,原來我還以為是導包的時候,類都已經封裝好了的,直接import就行了,報的數組下標越界,懵了半天。
