Spark SQL:DataFrames/DataSet的簡單使用


json文件中數據

people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

DataFrames

創建DataFrames

scala> val df=spark.read.json("people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DSL風格

首先需要引入隱式轉換,這樣就可以將rdd的方法引入到DataFrames

import spark.implicits._
printSchema
scala> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
select

scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

scala> df.select($"name",$"age").show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

scala> df.select($"name",$"age"+1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
filter
scala> df.filter($"age">20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
groupBy
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

SQL風格

createOrReplaceTempView(session范圍可用)
df.createOrReplaceTempView("people")
createGlobalTempView(全局可用,訪問需要加global_temp)
scala> df.createGlobalTempView("people")

sql
# 訪問臨時表
scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

#訪問全局表
scala> spark.sql("select * from global_temp.people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DataSet

創建DataSet

首先需要定義case類

scala> case class Person(name: String, age: Long)
defined class Person

scala> val ds=spark.read.json("people.json").as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

scala> ds.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM