Dataset是一個強類型的特定領域的對象,這種對象可以函數式或者關系操作並行地轉換。每個Dataset也有一個被稱為一個DataFrame的類型化視圖,這種DataFrame是Row類型的Dataset,即Dataset[Row]
Dataset是“懶惰”的,只在執行行動操作時觸發計算。本質上,數據集表示一個邏輯計划,該計划描述了產生數據所需的計算。當執行行動操作時,Spark的查詢優化程序優化邏輯計划,並生成一個高效的並行和分布式物理計划。
示例數據字段解釋
// affairs:一年來婚外情的頻率
// gender:性別
// age:年齡
// yearsmarried:婚齡
// children:是否有小孩
// religiousness:宗教信仰程度(5分制,1分表示反對,5分表示非常信仰)
// education:學歷
// occupation:職業(逆向編號的戈登7種分類)
// rating:對婚姻的自我評分(5分制,1表示非常不幸福,5表示非常幸福)
1.導入常用的包
import scala.math._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrameReader import org.apache.spark.sql.functions._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder import org.apache.spark.sql.DataFrameStatFunctions
2.創建SparkSession,並導入示例數據
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
(0, "male", 37, 10, "no", 3, 18, 7, 4),
(0, "female", 27, 4, "no", 4, 14, 6, 4),
(0, "female", 32, 15, "yes", 1, 12, 1, 4),
(0, "male", 57, 15, "yes", 5, 18, 6, 5),
(0, "male", 22, 0.75, "no", 2, 17, 6, 3),
(0, "female", 32, 1.5, "no", 2, 17, 5, 5),
(0, "female", 22, 0.75, "no", 2, 12, 1, 3),
(0, "male", 57, 15, "yes", 2, 14, 4, 4),
(0, "female", 32, 15, "yes", 4, 16, 1, 2),
(0, "male", 22, 1.5, "no", 4, 14, 4, 5))
val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
data.printSchema()
root
|-- affairs: double (nullable = false)
|-- gender: string (nullable = true)
|-- age: double (nullable = false)
|-- yearsmarried: double (nullable = false)
|-- children: string (nullable = true)
|-- religiousness: double (nullable = false)
|-- education: double (nullable = false)
|-- occupation: double (nullable = false)
|-- rating: double (nullable = false)
3.操作指定的列和行
// 在Spark-shell中展示,前n條記錄
data.show(7)
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0| male|37.0| 10.0| no| 3.0| 18.0| 7.0| 4.0|
| 0.0|female|27.0| 4.0| no| 4.0| 14.0| 6.0| 4.0|
| 0.0|female|32.0| 15.0| yes| 1.0| 12.0| 1.0| 4.0|
| 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0|
| 0.0| male|22.0| 0.75| no| 2.0| 17.0| 6.0| 3.0|
| 0.0|female|32.0| 1.5| no| 2.0| 17.0| 5.0| 5.0|
| 0.0|female|22.0| 0.75| no| 2.0| 12.0| 1.0| 3.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
only showing top 7 rows
// 取前n條記錄
val data3=data.limit(5)
// 過濾
data.filter("age>50 and gender=='male' ").show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0|
| 0.0| male|57.0| 15.0| yes| 2.0| 14.0| 4.0| 4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
// 數據框的所有列
val columnArray=data.columns
columnArray: Array[String] = Array(affairs, gender, age, yearsmarried, children, religiousness, education, occupation, rating)
// 查詢某些列的數據
data.select("gender", "age", "yearsmarried", "children").show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
| male|37.0| 10.0| no|
|female|27.0| 4.0| no|
|female|32.0| 15.0| yes|
+------+----+------------+--------+
only showing top 3 rows
val colArray=Array("gender", "age", "yearsmarried", "children")
colArray: Array[String] = Array(gender, age, yearsmarried, children)
data.selectExpr(colArray:_*).show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
| male|37.0| 10.0| no|
|female|27.0| 4.0| no|
|female|32.0| 15.0| yes|
+------+----+------------+--------+
only showing top 3 rows
// 操作指定的列,並排序
// data.selectExpr("gender", "age+1","cast(age as bigint)").orderBy($"gender".desc, $"age".asc).show
data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc).show
+------+----+----+
|gender|age1|age2|
+------+----+----+
| male|23.0| 22|
| male|23.0| 22|
| male|38.0| 37|
| male|58.0| 57|
| male|58.0| 57|
|female|23.0| 22|
|female|28.0| 27|
|female|33.0| 32|
|female|33.0| 32|
|female|33.0| 32|
+------+----+----+
4.查看SparkSQL邏輯和物理執行計划
val data4=data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc)
data4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gender: string, age1: double ... 1 more field]
// 查看物理執行計划
data4.explain()
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
+- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
+- LocalTableScan [gender#20, age1#135, age2#136L, age#21]
// 查看邏輯和物理執行計划
data4.explain(extended=true)
== Parsed Logical Plan ==
'Sort ['gender DESC, 'age ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L]
+- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupation#2
6, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17]
== Analyzed Logical Plan ==
gender: string, age1: double, age2: bigint
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L, age#21]
+- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupatio
n#26, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17]
== Optimized Logical Plan ==
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
+- LocalRelation [gender#20, age1#135, age2#136L, age#21]
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
+- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
+- LocalTableScan [gender#20, age1#135, age2#136L, age#21]
