隨着Spark1.4.x的更新,Spark提供更高階的對象DataFrame,提供了比RDD更豐富的API操作,同時也支持RDD轉DataFrame(下面簡稱“DF”),但是要注意,不是任意類型對象組成的RDD都可以轉換成DF,,只有當組成RDD[T]的每一個T對象內部具有鮮明的字段結構時,才能隱式或者顯示地創建DF所需要的Schema(結構信息),從而進行RDD->DF轉換。下面介紹兩種常用的RDD轉DF的方式(示例Spark版本為2.2.0)。
准備文件:person3.txt
20,Michael,173,150
30,Andy,168,140
19,Justin,145,145
18,Sam,175,122
17,Jean,173,134
22,Jerry,180,99
26,Fasttson,150,145
23,Mivak,177,118
16,Heaat,168,129
36,Franc,165,138
一、使用反射機制推理出schema
使用這種方式,往往是因為RDD[T]的T對象就已經是具有典型的一維表結構的字段結構對象,因此SparkSql可以自動推斷出schema。常用的操作是使用樣例對象(case class)。
package com.cjs import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object RDDToDF1 { case class Person(name:String, age:Int) def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val conf = new SparkConf() .set("spark.sql.warehouse.dir","file:///e:/tmp/spark-warehouse") .set("spark.some.config.option","some-value") val ss = SparkSession .builder() .config(conf) .appName("RDDToDF") .master("local[2]") .getOrCreate() val path = "E:\\IntelliJ Idea\\sparkSql_practice\\src\\main\\scala\\com\\cjs\\person3.txt"
import ss.implicits._ val sc = ss.sparkContext val RDDPerson1 = sc.textFile(path) val RDDPerson = RDDPerson1.map(_.split(",")).map(p => Person(p(1),p(0).trim.toInt)) val DFPerson = RDDPerson.toDF() DFPerson.printSchema() DFPerson.select($"name",$"age").show() } }
運行結果:
二、自定義schema
這種方式是通過編程接口(StructType),允許創建一個schema,然后將其應用到RDD[Row]中。相對來說,這種方式的步驟會比較繁瑣,但更自由,更靈活。因為有很多時候樣例對象(case class)不能提前定義,或者一個文本的數據集的字段對不同的用戶來說,需要被解釋為不同的字段名,那么通過這種方式可以靈活定義schema。
package com.cjs import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ object RDDToDF2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val conf = new SparkConf() .set("spark.sql.warehouse.dir","file:///e:/tmp/spark-warehouse") .set("spqrk.some.config.option","some-value") val ss = SparkSession .builder() .config(conf) .appName("RDD_To_DF2") .master("local[2]") .getOrCreate() import ss.implicits._ val schemaString = "name,age" //1、創建schema所需要的字段名的字符串,用特殊符號連接,如“,” //2、遍歷schema的fileName,將每個fileName封裝成StructField對象,實質是定義每一個file的數據類型、屬性。 Array[StructField]
val fields = schemaString.split(",").map(fileName => StructField(fileName, StringType, nullable = true)) //3、將 Array[StructField]轉化成schema
val schema = StructType(fields) val sc = ss.sparkContext val path = "E:\\IntelliJ Idea\\sparkSql_practice\\src\\main\\scala\\com\\cjs\\person3.txt" val peopleRDD = sc.textFile(path) //4、構建RDD[Row]
val rowRDD = peopleRDD.map(_.split(",")).map(att=>Row(att(1),att(0).trim)) val peopleDF = ss.createDataFrame(rowRDD, schema) peopleDF.select($"name",$"age").show() } }
運行結果: