spark開窗函數


源文件內容示例:

http://bigdata.beiwang.cn/laoli
http://bigdata.beiwang.cn/laoli
http://bigdata.beiwang.cn/haiyuan
http://bigdata.beiwang.cn/haiyuan

 

實現代碼:

object SparkSqlDemo11 {
  /**
    * 使用開窗函數,計算TopN
    * @param args
    */
  def main(args: Array[String]): Unit = {

    val session = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    import session.implicits._

    //原數據:http://bigdata.beiwang.cn/laoli
    val sourceData = session.read.textFile("E:\\北網學習\\K_第十一個月_Spark 2(2019.8)\\8.5\\teacher.log")

    val df = sourceData.map(line => {
      val index = line.lastIndexOf("/")
      val t_name = line.substring(index + 1)

      val url = new URL(line.substring(0, index))
      val subject = url.getHost.split("\\.")(0)

      (subject, t_name)
    }).toDF("subject", "t_name")

 

操作01:得到所有專業下所有老師的訪問數:

    df.createTempView("temp")

    //獲得所有學科下老師的訪問量:
    val middleData: DataFrame = session.sql("select subject,t_name,count(*) cnts from temp group by subject,t_name")

    //middleData.show()

+-------+--------+----+
|subject|  t_name|cnts|
+-------+--------+----+
|bigdata|   laoli|   2|
|bigdata| haiyuan|  15|
| javaee|chenchan|   6|
|    php|  laoliu|   1|
|    php|   laoli|   3|
| javaee|  laoshi|   9|
|bigdata|  lichen|   6|
+-------+--------+----+

 

操作02:row_number() over()【按照老師的訪問數,降序開窗】

//再將中間值middleData注冊成一張表
middleData.createTempView("middleTemp")

//執行第二部查詢,使用row_number()開窗函數,對所有的老師的訪問數進行排序並添加編號
//開窗后生成的編號列 rn 是一個偽列,只能用於展示,不能用於查詢
//row_number() over() 函數是按照某種規則對數據進行編號,需要我們在over()中指定一個排序規則,無規則將會報錯
//此處是按照cnts列降序開窗
session.sql(
  """
    |select subject,t_name,cnts,row_number() over(order by cnts desc) rn from middleTemp
  """.stripMargin).show()

+-------+--------+----+---+
|subject|  t_name|cnts| rn|
+-------+--------+----+---+
|bigdata| haiyuan|  15|  1|
| javaee|  laoshi|   9|  2|
| javaee|chenchan|   6|  3|
|bigdata|  lichen|   6|  4|
|    php|   laoli|   3|  5|
|bigdata|   laoli|   2|  6|
|    php|  laoliu|   1|  7|
+-------+--------+----+---+  

 

♈ 注意:over()內必須指定開窗規則,否則會拋出解析異常:

session.sql(
  """
    |select subject,t_name,cnts,row_number() over() rn from middleTemp
  """.stripMargin).show()

Exception in thread "main" org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$31$$anonfun$applyOrElse$12.applyOrElse(Analyzer.scala:2173)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$31$$anonfun$applyOrElse$12.applyOrElse(Analyzer.scala:2171)

 

操作03:row_number() over(partition by.. 【根據學科進行分區后為每個分區開窗】

//根據學科進行分區后為每個分區開窗
session.sql(
  """
    |select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
  """.stripMargin).show()

+-------+--------+----+---+
|subject|  t_name|cnts| rn|
+-------+--------+----+---+
| javaee|  laoshi|   9|  1|
| javaee|chenchan|   6|  2|
|bigdata| haiyuan|  15|  1|
|bigdata|  lichen|   6|  2|
|bigdata|   laoli|   2|  3|
|    php|   laoli|   3|  1|
|    php|  laoliu|   1|  2|
+-------+--------+----+---+

 

♎ 注意:開窗生成的列是偽列,不能用於實際操作:

//開窗形成的列是偽列,不能用於實際操作
session.sql(
  """
    |select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
    |where rn <=2
  """.stripMargin).show()

 

操作04:偽列的使用:

由於開窗形成的偽列不能被直接用於查詢,那么我們可以將整個開窗語句的操作作為一個子查詢使用,那么開窗語句的結果集對於父查詢來說就是一張完整的表,這時候偽列就是一個有效的列,可以用於查詢:

//開窗生成的偽列不能用於直接查詢,但是我們可以將開窗語句的結果集作為一張表或者說一個子查詢,這時候偽列就是一個有效的列,可以進行再次嵌套查詢,
session.sql(
  """
    |select * from (
    |select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
    |) where rn <= 2
  """.stripMargin).show()

+-------+--------+----+---+
|subject|  t_name|cnts| rn|
+-------+--------+----+---+
| javaee|  laoshi|   9|  1|
| javaee|chenchan|   6|  2|
|bigdata| haiyuan|  15|  1|
|bigdata|  lichen|   6|  2|
|    php|   laoli|   3|  1|
|    php|  laoliu|   1|  2|
+-------+--------+----+---+

  

操作05:【開窗嵌套開窗】rank() over() 函數

在row_number() over() 分區+開窗的基礎上,再次進行rank() over() 按照cnts進行全部數據的開窗

//開窗嵌套開窗:
//rank() over() 函數
session.sql(
  """
    |select t.*,rank() over(order by cnts desc) rn1 from (
    |select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
    |) t 
    |where rn <= 2
  """.stripMargin).show()

+-------+--------+----+---+---+
|subject|  t_name|cnts| rn|rn1|
+-------+--------+----+---+---+
|bigdata| haiyuan|  15|  1|  1|
| javaee|  laoshi|   9|  1|  2|
| javaee|chenchan|   6|  2|  3|
|bigdata|  lichen|   6|  2|  3|
|    php|   laoli|   3|  1|  5|
|    php|  laoliu|   1|  2|  6|
+-------+--------+----+---+---+

  

操作06:dense_rank() over() 函數 【三個開窗函數的業務對比】:

//dense_rank() over() 函數
//三個開窗函數的業務對比:
session.sql(
  """
    |select t.*,rank() over(order by cnts desc) rank,
    |row_number() over(order by cnts desc) row_n,
    |dense_rank() over(order by cnts desc) dense_n
    |from (
    |select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) row_n_par from middleTemp
    |) t
    |where row_n_par <= 2
  """.stripMargin).show()

+-------+--------+----+---------+----+-----+-------+
|subject|  t_name|cnts|row_n_par|rank|row_n|dense_n|
+-------+--------+----+---------+----+-----+-------+
|bigdata| haiyuan|  15|        1|   1|    1|      1|
| javaee|  laoshi|   9|        1|   2|    2|      2|
| javaee|chenchan|   6|        2|   3|    3|      3|
|bigdata|  lichen|   6|        2|   3|    4|      3|
|    php|   laoli|   3|        1|   5|    5|      4|
|    php|  laoliu|   1|        2|   6|    6|      5|
+-------+--------+----+---------+----+-----+-------+

操作07:整合為一句SQL完成:

//合並兩個SQL語句:
session.sql(
  """
    |select t.*,rank() over(order by cnts desc) rank,
    |row_number() over(order by cnts desc) row_n,
    |dense_rank() over(order by cnts desc) dense_n
    |from
    |(select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) row_n_par from
    |(select subject,t_name,count(*) cnts from temp group by subject,t_name)) t
    |where row_n_par <= 2
  """.stripMargin).show()

+-------+--------+----+---------+----+-----+-------+
|subject|  t_name|cnts|row_n_par|rank|row_n|dense_n|
+-------+--------+----+---------+----+-----+-------+
|bigdata| haiyuan|  15|        1|   1|    1|      1|
| javaee|  laoshi|   9|        1|   2|    2|      2|
| javaee|chenchan|   6|        2|   3|    3|      3|
|bigdata|  lichen|   6|        2|   3|    4|      3|
|    php|   laoli|   3|        1|   5|    5|      4|
|    php|  laoliu|   1|        2|   6|    6|      5|
+-------+--------+----+---------+----+-----+-------+

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM