方式一: 通過case class創建DataFrames(反射)
TestDataFrame1.scala
package com.bky
// 隱式類的導入
// 定義case class,相當於表結構
case class Dept(var id:Int, var position:String, var location:String)
// 需要導入SparkSession這個包
import org.apache.spark.sql.SparkSession
/**
* 方式一: 通過case class創建DataFrames(反射)
*/
object TestDataFrame1 {
def main(args: Array[String]): Unit = {
/**
* 直接使用SparkSession進行文件的創建。
* 封裝了SparkContext,SparkConf,SQLContext,
* 為了向后兼容,SQLContext和HiveContext也被保存了下來
*/
val spark = SparkSession
.builder() //構建sql
.appName("TestDataFrame1") // 設置文件名
.master("local[2]") // 設置executor
.getOrCreate() //獲取或創建
import spark.implicits._ // 隱式轉換
// 將本地的數據讀入RDD,將RDD與case class關聯
val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
.map(line => Dept(line.split("\t")(0).toInt,
line.split("\t")(1),
line.split("\t")(2).trim))
// 將RDD轉換成DataFrames(反射)
val df = deptRDD.toDF()
// 將DataFrames創建成一個臨時的視圖
df.createOrReplaceTempView("dept")
// 使用SQL語句進行查詢
spark.sql("select * from dept").show()
}
}
精簡版 TestDataFrame1.scala
package com.bky
import org.apache.spark.sql.SparkSession
object TestDataFrame1 extends App {
val spark = SparkSession
.builder() //構建sql
.appName("TestDataFrame1")
.master("local[2]")
.getOrCreate()
import spark.implicits._
val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
.map(line => Dept(line.split("\t")(0).toInt,
line.split("\t")(1),
line.split("\t")(2).trim))
val df = deptRDD.toDF()
df.createOrReplaceTempView("dept")
spark.sql("select * from dept").show()
}
case class Dept(var id:Int, var position:String, var location:String)
方式二:通過創建structType創建DataFrames(編程接口)
TestDataFrame2.scala
package com.bky
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
/**
*
* 方式二:通過創建structType創建DataFrames(編程接口)
*/
object TestDataFrame2 extends App {
val spark = SparkSession
.builder()
.appName("TestDataFrame2")
.master("local[2]")
.getOrCreate()
/**
* 將RDD數據映射成Row,需要導入import org.apache.spark.sql.Row
*/
import spark.implicits._
val path = "/Users/hadoop/data/dept.txt"
val fileRDD = spark.read.textFile(path)
val rowRDD= fileRDD.map(line => {
val fields = line.split("\t")
Row(fields(0).toInt, fields(1), fields(2).trim)
})
// 創建StructType來定義結構
val innerStruct = StructType(
// 字段名,字段類型,是否可以為空
StructField("id", IntegerType, true) ::
StructField("position", StringType, true) ::
StructField("location", StringType, true) :: Nil
)
val df = spark.createDataFrame(innerStruct)
df.createOrReplaceTempView("dept")
spark.sql("select * from dept").show()
}
方式三:通過json文件創建DataFrames
TestDataFrame3.scala
package com.bky
import org.apache.spark.sql.SparkSession
/**
* 方式三:通過json文件創建DataFrames
*/
object TestDataFrame3 extends App {
val spark = SparkSession
.builder()
.master("local[2]")
.appName("TestDataFrame3")
.getOrCreate()
val path = "/Users/hadoop/data/test.json"
val fileRDD = spark.read.json(path)
fileRDD.createOrReplaceTempView("test")
spark.sql("select * from test").show()
}