spark streaming向RDD和DataFrame轉換


Data streaming轉為DataFrame,不能直接一步轉到DF,需要先轉為RDD,然后再轉到DF,我們用流式處理數據后,再通過spark sql實時獲取我們想要的結果。

1.首先老規矩,創建spark上下文對象,spark SQL和spark Streaming,再創建個socket在Linux端打入數據。

1 val conf = new SparkConf().setAppName("Demo04DSToRDDToDF").setMaster("local[2]")
2 
3     conf.set("spark.sql.shuffle.partitions", "1")
4     val sc = new SparkContext(conf)
5     val sqlContext = new SQLContext(sc)
6 
7     val ssc = new StreamingContext(sc,Durations.seconds(5))
8 
9     val lines = ssc.socketTextStream("master",8888)

2.首先用foreachRDD方法把spark streaming轉為RDD

 1     /**
 2       *
 3       * DS  -> RDD -> DF
 4       *
 5       */
 6     lines.foreachRDD(rdd => {
 7      val stuRDD = rdd.map(line => {
 8         val split = line.split(",")
 9        (split(0),split(1),split(2).toInt,split(3),split(4))
10       })

3.導入sqlContext隱式轉換,將RDD To成DF,同時傳入column對應RDD返回值

 1       //導入隱式轉換,將RDD轉換為DF
 2       import sqlContext.implicits._
 3 
 4       val stuDF = stuRDD.toDF("id","name","age","gender","clazz")
 5       //注冊成表
 6       stuDF.registerTempTable("student")
 7 
 8       val result = sqlContext.sql("select clazz,count(1) from student group by clazz")
 9 
10       result.show()
11       result.write.mode(SaveMode.Append).json("Spark/data/dsondf")

總結,這里的sqlContext必須自己創建好,原來我還以為是導包的時候,類都已經封裝好了的,直接import就行了,報的數組下標越界,懵了半天。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM