spark-sql中的分析函數的使用


 分析函數的應用場景:

  (1)用於分組后組內排序

  (2)指定計算范圍

  (3)Top N

  (4)累加計算

  (5)層次計算

分析函數的一般語法:

  分析函數的語法結構一般是:

  分析函數名(參數)  over  (子partition by 句 order by 字句 rows/range 字句)

  1、分析函數名:sum、max、min、count、avg等聚合函數

           lead、lag等比較函數

           rank 等排名函數

  2、over:關鍵字,表示前面的函數是分析函數,不是普通的聚合函數

  3、分析字句:over關鍵字后面括號內的內容為分析子句,包含以下三部分內容

    • partition by :分組子句,表示分析函數的計算范圍,各組之間互不相干
    • order by:排序子句,表示分組后,組內的排序方式
    • rows/range:窗口子句,是在分組(partition by)后,表示組內的子分組(也即窗口),是分析函數的計算范圍窗口      

數據准備:

cookieid,createtime,pv
cookie1,2015-04-10,1
cookie1,2015-04-11,5
cookie1,2015-04-12,7
cookie1,2015-04-13,3
cookie1,2015-04-14,2
cookie1,2015-04-15,4
cookie1,2015-04-16,4
cookie2,2015-04-10,2
cookie2,2015-04-11,3
cookie2,2015-04-12,5
cookie2,2015-04-13,6
cookie2,2015-04-14,3
cookie2,2015-04-15,9
cookie2,2015-04-16,7
    val conf = new SparkConf()
    val ssc = new SparkSession.Builder()
      .appName(this.getClass.getSimpleName)
      .master("local[2]")
      .config(conf)
      .getOrCreate()

    val sc = ssc.sparkContext
    sc.setLogLevel("WARN")

    val df = ssc.read
      .option("header", "true")
      .option("inferschema", "true")
      .csv("file:///E:/TestFile/analyfuncdata.txt")

    df.show(false)
    df.printSchema()
    df.createOrReplaceTempView("table")
    val sql = "select * from table"
    ssc.sql(sql).show(false)

測試需求:

  1、按照cookid進行分組,createtime排序,並前后求和   

ssc.sql(
"""
|select cookieid,createtime,pv,
| sum(pv) over(partition by cookieid order by createtime) as pv1,
|from table
""".stripMargin).show

  運行結果:

   

  2、與方式1 等價的寫法

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       sum(pv) over(partition by cookieid order by createtime) as pv1,
        |       sum(pv) over(partition by cookieid order by createtime
        |       rows between unbounded preceding and current row) as pv2
        |from table
      """.stripMargin).show

  注:這里涉及到窗口子句,后面詳細敘述。

  運行結果:

  

  可以看到方式1的寫法其實是方式2的一種默認形式

  3、按照cookieid分組,不進行排序,求和

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       sum(pv) over(partition by cookieid) as pv1
        |from table
      """.stripMargin).show

   運行結果:

  

  可以看出,在不進行排序的情況下,最終的求和列是每個分組的所有值得和,並非前后值相加

  4、不進行分組,直接進行排序,求和(有問題)

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       sum(pv) over(order by createtime) as pv1
        |from table
      """.stripMargin).show

  運行結果:  

  

  由結果可以看出,如果只是按照排序,不進行分區求和,得出來的結果好像亂七八糟的,有問題,所以我一般不這么做

  5、over子句為空的情況下

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       sum(pv) over() as pv1
        |from table
      """.stripMargin).show

  運行結果:

  

  由結果看出,該種方式,其實就是對所有的行進行了求和

window子句

  前面一開始執行了一個關於窗口子句:

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       sum(pv) over(partition by cookieid order by createtime) as pv1,
        |       sum(pv) over(partition by cookieid order by createtime
        |       rows between unbounded preceding and current row) as pv2
        |from table
      """.stripMargin).show

  同一個select查詢中存在多個窗口函數時,他們相互之間是沒有影響的,每個窗口函數應用自己的規則

  rows between unbounded preceding and current row:

    • rows between ... and ...(開始到結束,位置不能交換)
    • unbounded preceding  :從第一行開始
    • current row:到當前行  

    當然,上述的從第幾行開始到第幾行是可以自定義的:

    • 首行:unbounded preceding
    • 末行:unbounded following
    • 前 n 行:n preceding
    • 后 n 行:n following   

  示例需求:

      pv:原始值

      pv1:起始行到當前行的累計值

      pv2:等同於pv1,語法不同

      pv3:僅有一個合計值

      pv4:前三行到當前行的累計值

      pv5:前三行到后一行的累計值

      pv6:當前行到最后一行的累計值

    注:這里所指的前三行,並不包含當前行本身

  運行結果:

  

row & range

  range:是邏輯窗口,是指定當前行對應值的范圍取值,列數不固定,只要行值在范圍內,對應列都包含在內

  rows:是物理窗口,根據order by子句排序后,取前n行的數據以及后n行的數據進行計算(與當前行的值無關,至於排序由的行號有關)

  需求案例:

    1、對pv進行排名,求前一名到后兩名的和

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       sum(pv) over(partition by cookieid order by pv
        |       range between 1 preceding and 2 following) as pv1
        |from table
      """.stripMargin).show

  運行結果:

  

   解釋:

 

  其他的聚合函數,用法與sum類似,比如:avg,min,max,count等

排名函數

  排序方式:

    • row_number() :順序排,忽略 並列排名
    • dense_rank()     :有並列,后面的元素接着排名
    • rank()                :有並列,后面的元素跳着排名
    • ntile(n)         :用於將分組數據按照順序切分成n片

  例:

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |       row_number() over(partition by cookieid order by pv desc) rank1,
        |       rank()       over(partition by cookieid order by pv desc) rank2,
        |       dense_rank() over(partition by cookieid order by pv desc) rank3,
        |       ntile(3)     over(partition by cookieid order by pv desc) rank4
        |from table
      """.stripMargin).show

  運行結果:

  

lag & lead

  lag(field,n):取前 n 行的值

  lead(field n):取后 n 行的值

例:

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |lag(pv) over(partition by cookieid order by pv) as col1,
        |lag(pv,1) over(partition by cookieid order by pv) as col2,
        |lag(pv,2) over(partition by cookieid order by pv) as col3
        |from table
      """.stripMargin).show

  運行結果:

  

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |lead(pv) over(partition by cookieid order by pv) as col1,
        |lead(pv,1) over(partition by cookieid order by pv) as col2,
        |lead(pv,2) over(partition by cookieid order by pv) as col3
        |from table
      """.stripMargin).show

  運行結果:

   

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |lead(pv,-2) over(partition by cookieid order by pv) as col1,
        |lag(pv,2) over(partition by cookieid order by pv) as col2
        |from table
      """.stripMargin).show

  運行結果:

  

first_value & last_value

  first_value(field) :取分組內排序后,截止到當前行的第一個值

  last_value(field) :取分組內排序后,截止到當前行的最后一個值  

例:

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |row_number() over(partition by cookieid order by pv desc) as rank1,
        |first_value(createtime) over(partition by cookieid order by pv desc) as rank2,
        |first_value(pv) over(partition by cookieid order by pv desc) as rank3
        |from table
      """.stripMargin).show

  運行結果:

  

ssc.sql(
      """
        |select cookieid,createtime,pv,
        |row_number() over(partition by cookieid order by pv desc) as rank1,
        |last_value(createtime) over(partition by cookieid order by pv desc) as rank2,
        |last_value(pv) over(partition by cookieid order by pv desc) as rank3
        |from table
      """.stripMargin).show

  運行結果:

  

cube & rollup

  cube:根據group by維度的所有組合進行聚合

  rollup:是cube的自己,以左側的維度為主,進行層級聚合

例:

ssc.sql(
      """
        |select cookieid,createtime,sum(pv)
        |from table
        |group by cube(cookieid,createtime)
        |order by 1,2
      """.stripMargin).show(100,false)

  運行結果:

  

ssc.sql(
      """
        |select cookieid,createtime,sum(pv)
        |from table
        |group by rollup(cookieid,createtime)
        |order by 1,2
      """.stripMargin).show(100,false)

  運行結果:

  

DSL

  

   import org.apache.spark.sql.expressions.Window
    import ssc.implicits._
    import org.apache.spark.sql.functions._
    val w1 = Window.partitionBy("cookieid").orderBy("createtime")
    val w2 = Window.partitionBy("cookieid").orderBy("pv")

    //聚合函數
    df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show()

    //排名
    df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show()
    df.select($"cookieid", $"pv", dense_rank().over(w2).alias("dense_rank")).show()
    df.select($"cookieid", $"pv", row_number().over(w2).alias("row_number")).show()

    //lag、lead
    df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("row_number")).show()
    df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("row_number")).show()

    //cube、rollup
    df.cube("cookieid", "createtime").agg(sum("pv")).show()
    df.rollup("cookieid", "createtime").agg(sum("pv")).show()

  運行結果:

  1、聚合函數

  

  2、排名函數:

    

  lag、lead

   

  cube、rollup

   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM