DSL的语法
SQL中的执行顺序
from --> join --> on --> where --> group by --> having --> select --> order by --> limit
在DSL中没有having,where可以放在分组前,也可以放在分组后
目录
select()----选择
通过列名获取列
df.select("id","name") .show()
通过列表达式获取列,可以对列进行加工处理
df.select($"id",$"age"+1 as "addAge")//可以直接在后面取个别名 .show()
package com.shujia.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo5DSL {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local")
.appName("Demo5DSL")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df: DataFrame = spark
.read
.format("json")
.load("data/students.json")
/**
* select 选择
*/
//通过列名获取列
df.select("id","name")
.show()
//执行结果
+----------+------+
| id| name|
+----------+------+
|1500100001|施笑槐|
|1500100002|吕金鹏|
|1500100003|单乐蕊|
|1500100004|葛德曜|
...
+----------+------+
only showing top 20 rows
//通过列表达式获取列,可以对列进行加工处理
df.select($"id",$"age"+1 as "addAge")//可以直接在后面取个别名
.show()
//执行结果
+----------+------+
| id|addAge|
+----------+------+
|1500100001| 23|
|1500100002| 25|
|1500100003| 23|
...
+----------+------+
only showing top 20 rows
}
}
where、filter----过滤,两者用法一样
通过列表达式进行过滤
通过字符串表达式--写SQL过滤
/**
* where、filter----过滤,两者用法一样
* 等于 ===
* 不等于 =!=
*/
//通过列表达式,过滤出年龄大于22的男生
df.where($"age" > 22 and $"gender" === "男")
.show()
//字符串表达式--写SQL,过滤出年龄大于22的男生
df.filter("age > 22 and gender == '男' ")
.show()
//执行结果
+---+--------+------+----------+------+
|age| clazz|gender| id| name|
+---+--------+------+----------+------+
| 24|文科六班| 男|1500100002|吕金鹏|
| 24|理科三班| 男|1500100004|葛德曜|
| 23|理科六班| 男|1500100010|羿彦昌|
| 24|文科二班| 男|1500100013|逯君昊|
| 23|理科五班| 男|1500100014|羿旭炎|
...
+---+--------+------+----------+------+
only showing top 20 rows
groupBy()----分组
count()----聚合
agg(count())----聚合
/**
* groupBy----分组
* count()----聚合
*/
df.groupBy($"clazz")//按照指定的字段分组
.count() //聚合
.show()
//执行结果
+--------+-----+
| clazz|count|
+--------+-----+
|文科六班| 104|
|理科二班| 79|
|文科四班| 81|
|理科六班| 92|
|理科四班| 91|
|文科三班| 94|
|文科一班| 72|
|文科二班| 87|
|文科五班| 84|
|理科一班| 78|
|理科三班| 68|
|理科五班| 70|
+--------+-----+
/**
* 分组之后使用agg()进行聚合,agg()中可以使用count()、avg()...
* 使用agg()可以起别名,分组后的列和聚合后的列都会保留
* SQL中的聚合函数在这里都可以使用
*/
df.groupBy($"clazz")
.agg(count($"clazz") as "num",avg($"age") as "avgAge")
.show()
//执行结果
+--------+---+------------------+
| clazz|num| avgAge|
+--------+---+------------------+
|文科六班|104| 22.60576923076923|
|理科二班| 79|22.556962025316455|
|文科四班| 81|22.506172839506174|
|理科六班| 92| 22.48913043478261|
|理科四班| 91| 22.63736263736264|
|文科三班| 94|22.680851063829788|
|文科一班| 72|22.416666666666668|
|文科二班| 87|22.379310344827587|
|文科五班| 84| 22.30952380952381|
|理科一班| 78|22.333333333333332|
|理科三班| 68|22.676470588235293|
|理科五班| 70|22.642857142857142|
+--------+---+------------------+
sort----排序
/**
* sort----排序
*/
df.sort($"age")//按照年龄进行升序排序
.show()
df.sort($"age".desc)//按照年龄进行降序排序
.show()
//升序结果
+---+--------+------+----------+------+
|age| clazz|gender| id| name|
+---+--------+------+----------+------+
| 21|理科二班| 男|1500100056|古鸿信|
| 21|理科四班| 男|1500100135|庾振海|
| 21|文科四班| 女|1500100061|路紫萱|
...
//降序结果
+---+--------+------+----------+------+
|age| clazz|gender| id| name|
+---+--------+------+----------+------+
| 24|文科五班| 女|1500100051|江寄容|
| 24|文科六班| 男|1500100140|郁运发|
| 24|文科一班| 男|1500100055|卫鸿熙|
...
join----关联
/**
* join
*/
//读取分数文件,构建分数DF
val scoDF: DataFrame = spark
.read
.format("csv")
.option("sep",",")
.schema("id STRING,cId STRING,sco DOUBLE")
.load("data/score.txt")
//关联:当列名一样时(默认关联方式是inner join)
df.join(scoDF,"id").show()
//关联:当列名不一样(默认关联方式是inner join)
df.join(scoDF,$"id"===$"sId").show()
//指定关联方式(需要使用到集合)
df.join(scoDF,List("id"),"inner").show()
窗口函数
withColumn ----通过一个表达式给DF增加一个列
/**
* 窗口函数
* 需求:取出每个班级年龄最大的前10个学生
*/
//通过写SQL的方式
//在spark中想要写SQL,需要先创表
df.createOrReplaceTempView("student")
spark.sql(
"""
|
|select * from(
|select
|*,row_number() over(partition by clazz order by age desc) as t1
|from
|student
|) as t2
|where t2.t1<=10
|""".stripMargin)
.show(10)
/**
* 通过DSL的方式
* withColumn :通过一个表达式给DF增加一个列
*/
df.withColumn("t1",row_number().over((Window.partitionBy($"clazz").orderBy($"age".desc))))
.where($"t1"<=10)
.show(10)
//执行结果
+---+--------+------+----------+------+---+
|age| clazz|gender| id| name| t1|
+---+--------+------+----------+------+---+
| 24|文科六班| 男|1500100002|吕金鹏| 1|
| 24|文科六班| 男|1500100031|麻智刚| 2|
| 24|文科六班| 男|1500100140|郁运发| 3|
| 24|文科六班| 女|1500100173|连采波| 4|
| 24|文科六班| 女|1500100279|蔚盼曼| 5|
| 24|文科六班| 男|1500100312|平鸿轩| 6|
| 24|文科六班| 男|1500100318|昝鸿振| 7|
| 24|文科六班| 女|1500100330|隆以旋| 8|
| 24|文科六班| 女|1500100341|闵惜萍| 9|
| 24|文科六班| 男|1500100345|廖昆纶| 10|
+---+--------+------+----------+------+---+
only showing top 10 rows