DSL的语法


DSL的语法

SQL中的执行顺序

from --> join --> on --> where --> group by --> having --> select --> order by --> limit

在DSL中没有having,where可以放在分组前,也可以放在分组后

select()----选择
通过列名获取列

df.select("id","name") ​ .show()

通过列表达式获取列,可以对列进行加工处理

df.select($"id",$"age"+1 as "addAge")//可以直接在后面取个别名 ​ .show()

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo5DSL {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo5DSL")
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val df: DataFrame = spark
      .read
      .format("json")
      .load("data/students.json")

    /**
     * select 选择
     */
    //通过列名获取列
    df.select("id","name")
      .show()
      
//执行结果
+----------+------+
|        id|  name|
+----------+------+
|1500100001|施笑槐|
|1500100002|吕金鹏|
|1500100003|单乐蕊|
|1500100004|葛德曜|
...
+----------+------+
only showing top 20 rows   
      
    //通过列表达式获取列,可以对列进行加工处理
    df.select($"id",$"age"+1 as "addAge")//可以直接在后面取个别名
      .show()
      
//执行结果
+----------+------+
|        id|addAge|
+----------+------+
|1500100001|    23|
|1500100002|    25|
|1500100003|    23|
...
+----------+------+
only showing top 20 rows
      
  }
}
where、filter----过滤,两者用法一样
通过列表达式进行过滤
通过字符串表达式--写SQL过滤
	/**
     * where、filter----过滤,两者用法一样
     * 等于 ===
     * 不等于 =!=
     */
    //通过列表达式,过滤出年龄大于22的男生
    df.where($"age" > 22 and $"gender" === "男")
      .show()
    //字符串表达式--写SQL,过滤出年龄大于22的男生
    df.filter("age > 22 and gender ==  '男' ")
      .show()

//执行结果
+---+--------+------+----------+------+
|age|   clazz|gender|        id|  name|
+---+--------+------+----------+------+
| 24|文科六班|    男|1500100002|吕金鹏|
| 24|理科三班|    男|1500100004|葛德曜|
| 23|理科六班|    男|1500100010|羿彦昌|
| 24|文科二班|    男|1500100013|逯君昊|
| 23|理科五班|    男|1500100014|羿旭炎|
...
+---+--------+------+----------+------+
only showing top 20 rows
groupBy()----分组
count()----聚合
agg(count())----聚合
	/**
     * groupBy----分组
     * count()----聚合
     */
      df.groupBy($"clazz")//按照指定的字段分组
        .count()	//聚合
        .show()

//执行结果
+--------+-----+
|   clazz|count|
+--------+-----+
|文科六班|  104|
|理科二班|   79|
|文科四班|   81|
|理科六班|   92|
|理科四班|   91|
|文科三班|   94|
|文科一班|   72|
|文科二班|   87|
|文科五班|   84|
|理科一班|   78|
|理科三班|   68|
|理科五班|   70|
+--------+-----+

   /**
     * 分组之后使用agg()进行聚合,agg()中可以使用count()、avg()...
     * 使用agg()可以起别名,分组后的列和聚合后的列都会保留
     * SQL中的聚合函数在这里都可以使用
     */
    df.groupBy($"clazz")
      .agg(count($"clazz") as "num",avg($"age") as "avgAge")
      .show()

//执行结果
+--------+---+------------------+
|   clazz|num|            avgAge|
+--------+---+------------------+
|文科六班|104| 22.60576923076923|
|理科二班| 79|22.556962025316455|
|文科四班| 81|22.506172839506174|
|理科六班| 92| 22.48913043478261|
|理科四班| 91| 22.63736263736264|
|文科三班| 94|22.680851063829788|
|文科一班| 72|22.416666666666668|
|文科二班| 87|22.379310344827587|
|文科五班| 84| 22.30952380952381|
|理科一班| 78|22.333333333333332|
|理科三班| 68|22.676470588235293|
|理科五班| 70|22.642857142857142|
+--------+---+------------------+
sort----排序
/**
 * sort----排序
 */
df.sort($"age")//按照年龄进行升序排序
  .show()
df.sort($"age".desc)//按照年龄进行降序排序
  .show()

//升序结果
+---+--------+------+----------+------+
|age|   clazz|gender|        id|  name|
+---+--------+------+----------+------+
| 21|理科二班|    男|1500100056|古鸿信|
| 21|理科四班|    男|1500100135|庾振海|
| 21|文科四班|    女|1500100061|路紫萱|
...

//降序结果
+---+--------+------+----------+------+
|age|   clazz|gender|        id|  name|
+---+--------+------+----------+------+
| 24|文科五班|    女|1500100051|江寄容|
| 24|文科六班|    男|1500100140|郁运发|
| 24|文科一班|    男|1500100055|卫鸿熙|
...
join----关联
/**
 * join
 */
//读取分数文件,构建分数DF
val scoDF: DataFrame = spark
  .read
  .format("csv")
  .option("sep",",")
  .schema("id STRING,cId STRING,sco DOUBLE")
  .load("data/score.txt")
//关联:当列名一样时(默认关联方式是inner join)
df.join(scoDF,"id").show()
//关联:当列名不一样(默认关联方式是inner join)
df.join(scoDF,$"id"===$"sId").show()
//指定关联方式(需要使用到集合)
df.join(scoDF,List("id"),"inner").show()
窗口函数
withColumn ----通过一个表达式给DF增加一个列
/**
     * 窗口函数
     * 需求:取出每个班级年龄最大的前10个学生
     */
      //通过写SQL的方式
      //在spark中想要写SQL,需要先创表
    df.createOrReplaceTempView("student")
    spark.sql(
      """
        |
        |select * from(
        |select
        |*,row_number() over(partition by clazz order by age desc) as t1
        |from
        |student
        |) as t2
        |where t2.t1<=10
        |""".stripMargin)
      .show(10)


    /**
     * 通过DSL的方式
     * withColumn :通过一个表达式给DF增加一个列
     */
      df.withColumn("t1",row_number().over((Window.partitionBy($"clazz").orderBy($"age".desc))))
        .where($"t1"<=10)
        .show(10)

//执行结果
+---+--------+------+----------+------+---+
|age|   clazz|gender|        id|  name| t1|
+---+--------+------+----------+------+---+
| 24|文科六班|    男|1500100002|吕金鹏|  1|
| 24|文科六班|    男|1500100031|麻智刚|  2|
| 24|文科六班|    男|1500100140|郁运发|  3|
| 24|文科六班|    女|1500100173|连采波|  4|
| 24|文科六班|    女|1500100279|蔚盼曼|  5|
| 24|文科六班|    男|1500100312|平鸿轩|  6|
| 24|文科六班|    男|1500100318|昝鸿振|  7|
| 24|文科六班|    女|1500100330|隆以旋|  8|
| 24|文科六班|    女|1500100341|闵惜萍|  9|
| 24|文科六班|    男|1500100345|廖昆纶| 10|
+---+--------+------+----------+------+---+
only showing top 10 rows


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM