DSL的語法


DSL的語法

SQL中的執行順序

from --> join --> on --> where --> group by --> having --> select --> order by --> limit

在DSL中沒有having,where可以放在分組前,也可以放在分組后

select()----選擇
通過列名獲取列

df.select("id","name") ​ .show()

通過列表達式獲取列,可以對列進行加工處理

df.select($"id",$"age"+1 as "addAge")//可以直接在后面取個別名 ​ .show()

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo5DSL {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo5DSL")
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val df: DataFrame = spark
      .read
      .format("json")
      .load("data/students.json")

    /**
     * select 選擇
     */
    //通過列名獲取列
    df.select("id","name")
      .show()
      
//執行結果
+----------+------+
|        id|  name|
+----------+------+
|1500100001|施笑槐|
|1500100002|呂金鵬|
|1500100003|單樂蕊|
|1500100004|葛德曜|
...
+----------+------+
only showing top 20 rows   
      
    //通過列表達式獲取列,可以對列進行加工處理
    df.select($"id",$"age"+1 as "addAge")//可以直接在后面取個別名
      .show()
      
//執行結果
+----------+------+
|        id|addAge|
+----------+------+
|1500100001|    23|
|1500100002|    25|
|1500100003|    23|
...
+----------+------+
only showing top 20 rows
      
  }
}
where、filter----過濾,兩者用法一樣
通過列表達式進行過濾
通過字符串表達式--寫SQL過濾
	/**
     * where、filter----過濾,兩者用法一樣
     * 等於 ===
     * 不等於 =!=
     */
    //通過列表達式,過濾出年齡大於22的男生
    df.where($"age" > 22 and $"gender" === "男")
      .show()
    //字符串表達式--寫SQL,過濾出年齡大於22的男生
    df.filter("age > 22 and gender ==  '男' ")
      .show()

//執行結果
+---+--------+------+----------+------+
|age|   clazz|gender|        id|  name|
+---+--------+------+----------+------+
| 24|文科六班|    男|1500100002|呂金鵬|
| 24|理科三班|    男|1500100004|葛德曜|
| 23|理科六班|    男|1500100010|羿彥昌|
| 24|文科二班|    男|1500100013|逯君昊|
| 23|理科五班|    男|1500100014|羿旭炎|
...
+---+--------+------+----------+------+
only showing top 20 rows
groupBy()----分組
count()----聚合
agg(count())----聚合
	/**
     * groupBy----分組
     * count()----聚合
     */
      df.groupBy($"clazz")//按照指定的字段分組
        .count()	//聚合
        .show()

//執行結果
+--------+-----+
|   clazz|count|
+--------+-----+
|文科六班|  104|
|理科二班|   79|
|文科四班|   81|
|理科六班|   92|
|理科四班|   91|
|文科三班|   94|
|文科一班|   72|
|文科二班|   87|
|文科五班|   84|
|理科一班|   78|
|理科三班|   68|
|理科五班|   70|
+--------+-----+

   /**
     * 分組之后使用agg()進行聚合,agg()中可以使用count()、avg()...
     * 使用agg()可以起別名,分組后的列和聚合后的列都會保留
     * SQL中的聚合函數在這里都可以使用
     */
    df.groupBy($"clazz")
      .agg(count($"clazz") as "num",avg($"age") as "avgAge")
      .show()

//執行結果
+--------+---+------------------+
|   clazz|num|            avgAge|
+--------+---+------------------+
|文科六班|104| 22.60576923076923|
|理科二班| 79|22.556962025316455|
|文科四班| 81|22.506172839506174|
|理科六班| 92| 22.48913043478261|
|理科四班| 91| 22.63736263736264|
|文科三班| 94|22.680851063829788|
|文科一班| 72|22.416666666666668|
|文科二班| 87|22.379310344827587|
|文科五班| 84| 22.30952380952381|
|理科一班| 78|22.333333333333332|
|理科三班| 68|22.676470588235293|
|理科五班| 70|22.642857142857142|
+--------+---+------------------+
sort----排序
/**
 * sort----排序
 */
df.sort($"age")//按照年齡進行升序排序
  .show()
df.sort($"age".desc)//按照年齡進行降序排序
  .show()

//升序結果
+---+--------+------+----------+------+
|age|   clazz|gender|        id|  name|
+---+--------+------+----------+------+
| 21|理科二班|    男|1500100056|古鴻信|
| 21|理科四班|    男|1500100135|庾振海|
| 21|文科四班|    女|1500100061|路紫萱|
...

//降序結果
+---+--------+------+----------+------+
|age|   clazz|gender|        id|  name|
+---+--------+------+----------+------+
| 24|文科五班|    女|1500100051|江寄容|
| 24|文科六班|    男|1500100140|郁運發|
| 24|文科一班|    男|1500100055|衛鴻熙|
...
join----關聯
/**
 * join
 */
//讀取分數文件,構建分數DF
val scoDF: DataFrame = spark
  .read
  .format("csv")
  .option("sep",",")
  .schema("id STRING,cId STRING,sco DOUBLE")
  .load("data/score.txt")
//關聯:當列名一樣時(默認關聯方式是inner join)
df.join(scoDF,"id").show()
//關聯:當列名不一樣(默認關聯方式是inner join)
df.join(scoDF,$"id"===$"sId").show()
//指定關聯方式(需要使用到集合)
df.join(scoDF,List("id"),"inner").show()
窗口函數
withColumn ----通過一個表達式給DF增加一個列
/**
     * 窗口函數
     * 需求:取出每個班級年齡最大的前10個學生
     */
      //通過寫SQL的方式
      //在spark中想要寫SQL,需要先創表
    df.createOrReplaceTempView("student")
    spark.sql(
      """
        |
        |select * from(
        |select
        |*,row_number() over(partition by clazz order by age desc) as t1
        |from
        |student
        |) as t2
        |where t2.t1<=10
        |""".stripMargin)
      .show(10)


    /**
     * 通過DSL的方式
     * withColumn :通過一個表達式給DF增加一個列
     */
      df.withColumn("t1",row_number().over((Window.partitionBy($"clazz").orderBy($"age".desc))))
        .where($"t1"<=10)
        .show(10)

//執行結果
+---+--------+------+----------+------+---+
|age|   clazz|gender|        id|  name| t1|
+---+--------+------+----------+------+---+
| 24|文科六班|    男|1500100002|呂金鵬|  1|
| 24|文科六班|    男|1500100031|麻智剛|  2|
| 24|文科六班|    男|1500100140|郁運發|  3|
| 24|文科六班|    女|1500100173|連采波|  4|
| 24|文科六班|    女|1500100279|蔚盼曼|  5|
| 24|文科六班|    男|1500100312|平鴻軒|  6|
| 24|文科六班|    男|1500100318|昝鴻振|  7|
| 24|文科六班|    女|1500100330|隆以旋|  8|
| 24|文科六班|    女|1500100341|閔惜萍|  9|
| 24|文科六班|    男|1500100345|廖昆綸| 10|
+---+--------+------+----------+------+---+
only showing top 10 rows


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM