文章目录
json文件中数据
people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
DataFrames
创建DataFrames
scala> val df=spark.read.json("people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DSL风格
首先需要引入隐式转换,这样就可以将rdd的方法引入到DataFrames
import spark.implicits._
printSchema
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
select
scala> df.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.select($"name",$"age").show
+-------+----+
| name| age|
+-------+----+
|Michael|null|
| Andy| 30|
| Justin| 19|
+-------+----+
scala> df.select($"name",$"age"+1).show
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
filter
scala> df.filter($"age">20).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
groupBy
scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
SQL风格
createOrReplaceTempView(session范围可用)
df.createOrReplaceTempView("people")
createGlobalTempView(全局可用,访问需要加global_temp)
scala> df.createGlobalTempView("people")
sql
# 访问临时表
scala> spark.sql("select * from people").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
#访问全局表
scala> spark.sql("select * from global_temp.people").show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
DataSet
创建DataSet
首先需要定义case类
scala> case class Person(name: String, age: Long)
defined class Person
scala> val ds=spark.read.json("people.json").as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> ds.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+