RDD與DataFrame的轉換


RDD與DataFrame轉換
1. 通過反射的方式來推斷RDD元素中的元數據。因為RDD本身一條數據本身是沒有元數據的,例如Person,而Person有name,id等,而record是不知道這些的,但是變成DataFrame背后一定知道,通過反射的方式就可以了解到背后這些元數據,進而轉換成DataFrame。
如何反射?
Scala: 通過case class映射,在case class里面說我們這個RDD里面每個record的不同列的元數據是什么。(廢棄)
當樣本類不能提前確定時(例如,當記錄的結構由字符串或文本數據集編碼而成,它在解析時,字段將會對不同的用戶有不同的投影結果),SchemaRDD 可以由以下三個步驟創建:
當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:


 //   從原來的RDD創建一個Row格式的RDD
 //    創建與RDD 中Rows結構匹配的StructType,通過該StructType創建表示RDD 的Schema
 //   通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD 的Schema
val conf = new SparkConf().setMaster ("local").setAppName ("Test1") val sc = new SparkContext (conf) val sqlContext = new SQLContext(sc) // import sqlContext.implicits._ case class Person(name:String,age:Int) val people = sc.textFile ("d:/people.txt") val schemaString = "name age" val schema = StructType ( schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)) ) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val peopleSchemaRDD = sqlContext.createDataFrame(rowRDD, schema) peopleSchemaRDD .registerTempTable("people" ) val results = sqlContext . sql ("SELECT name FROM people" ) results.printSchema() println(results.count()) results.map(t => "Name: " + t(0)).collect().foreach(println)

 


//1.利用反射來推斷包含特定類型對象的RDD的schema。這種方法會簡化代碼並且在你已經知道schema的時候非常適用。

//2.   先創建一個bean類,然后將Rdd轉換成DataFrame
 case class Person(name: String, age: Int)
  def main (args : Array[String]) : Unit =
  {
    val conf = new SparkConf().setMaster ("local").setAppName ("Test1")
    val sc = new SparkContext (conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val people = sc.textFile("d:/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
    teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM