import包:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
樣例類:
case class Person(id:Int,name:String,age:Int)
主函數:
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
val rdd: RDD[String] = sparkContext.textFile("C:\\Users\\dummy\\Desktop\\person.txt")
val lineRdd: RDD[Array[String]] = rdd.map(_.split(" "))
InferringSchema(lineRdd,sqlContext)
SpecifyingSchema(lineRdd,sqlContext)
sparkContext.stop()
}
第一種方法:(需要創建樣例類)
/**
* 通過反射推斷Schema
* @param lineRdd
* @param sqlContext
*/
def InferringSchema(lineRdd: RDD[Array[String]],sqlContext:SQLContext): Unit ={
//將RDD和case class關聯
val personRdd: RDD[Person] = lineRdd.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
//導入隱式轉換,如果不導入無法將RDD轉換成DataFrame
import sqlContext.implicits._
//將RDD轉換成DataFrame
val personDF: DataFrame = personRdd.toDF()
personDF.show()
//注冊一張臨時表
//personDF.registerTempTable("person")
//val personDF2: DataFrame = sqlContext.sql("select * from person")
//將結果以JSON的方式存儲到指定位置
//personDF2.write.json("C:\\Users\\dummy\\Desktop\\out")
//personDF2.show()
}
第二種方法:
/**
* 通過StructType直接指定Schema
* @param lineRdd
* @param sqlContext
*/
def SpecifyingSchema(lineRdd: RDD[Array[String]],sqlContext:SQLContext): Unit ={
//通過StructType直接指定每個字段的schema
val schema=StructType(
List(
/**StructField只需傳入前面兩個參數即可
* name: String,
* dataType: DataType,
* nullable: Boolean = true,
* metadata: Metadata = Metadata.empty)
*/
StructField("id",IntegerType),
StructField("name",StringType),
StructField("age",IntegerType)
)
)
val rowRdd: RDD[Row] = lineRdd.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
val personDF: DataFrame = sqlContext.createDataFrame(rowRdd,schema)
//personDF.show()
personDF.registerTempTable("person")
val personDF2: DataFrame = sqlContext.sql("select * from person")
//personDF2.write.json("C:\\Users\\dummy\\Desktop\\out")
personDF2.show()
}
對比: