源文件內容示例:
http://bigdata.beiwang.cn/laoli http://bigdata.beiwang.cn/laoli http://bigdata.beiwang.cn/haiyuan http://bigdata.beiwang.cn/haiyuan
實現代碼:
object SparkSqlDemo11 {
/**
* 使用開窗函數,計算TopN
* @param args
*/
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local")
.getOrCreate()
import session.implicits._
//原數據:http://bigdata.beiwang.cn/laoli
val sourceData = session.read.textFile("E:\\北網學習\\K_第十一個月_Spark 2(2019.8)\\8.5\\teacher.log")
val df = sourceData.map(line => {
val index = line.lastIndexOf("/")
val t_name = line.substring(index + 1)
val url = new URL(line.substring(0, index))
val subject = url.getHost.split("\\.")(0)
(subject, t_name)
}).toDF("subject", "t_name")
操作01:得到所有專業下所有老師的訪問數:
df.createTempView("temp")
//獲得所有學科下老師的訪問量:
val middleData: DataFrame = session.sql("select subject,t_name,count(*) cnts from temp group by subject,t_name")
//middleData.show()
+-------+--------+----+ |subject| t_name|cnts| +-------+--------+----+ |bigdata| laoli| 2| |bigdata| haiyuan| 15| | javaee|chenchan| 6| | php| laoliu| 1| | php| laoli| 3| | javaee| laoshi| 9| |bigdata| lichen| 6| +-------+--------+----+
操作02:row_number() over()【按照老師的訪問數,降序開窗】
//再將中間值middleData注冊成一張表
middleData.createTempView("middleTemp")
//執行第二部查詢,使用row_number()開窗函數,對所有的老師的訪問數進行排序並添加編號
//開窗后生成的編號列 rn 是一個偽列,只能用於展示,不能用於查詢
//row_number() over() 函數是按照某種規則對數據進行編號,需要我們在over()中指定一個排序規則,無規則將會報錯
//此處是按照cnts列降序開窗
session.sql(
"""
|select subject,t_name,cnts,row_number() over(order by cnts desc) rn from middleTemp
""".stripMargin).show()
+-------+--------+----+---+ |subject| t_name|cnts| rn| +-------+--------+----+---+ |bigdata| haiyuan| 15| 1| | javaee| laoshi| 9| 2| | javaee|chenchan| 6| 3| |bigdata| lichen| 6| 4| | php| laoli| 3| 5| |bigdata| laoli| 2| 6| | php| laoliu| 1| 7| +-------+--------+----+---+
♈ 注意:over()內必須指定開窗規則,否則會拋出解析異常:
session.sql(
"""
|select subject,t_name,cnts,row_number() over() rn from middleTemp
""".stripMargin).show()
Exception in thread "main" org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$31$$anonfun$applyOrElse$12.applyOrElse(Analyzer.scala:2173) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$31$$anonfun$applyOrElse$12.applyOrElse(Analyzer.scala:2171)
操作03:row_number() over(partition by.. 【根據學科進行分區后為每個分區開窗】
//根據學科進行分區后為每個分區開窗
session.sql(
"""
|select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
""".stripMargin).show()
+-------+--------+----+---+ |subject| t_name|cnts| rn| +-------+--------+----+---+ | javaee| laoshi| 9| 1| | javaee|chenchan| 6| 2| |bigdata| haiyuan| 15| 1| |bigdata| lichen| 6| 2| |bigdata| laoli| 2| 3| | php| laoli| 3| 1| | php| laoliu| 1| 2| +-------+--------+----+---+
♎ 注意:開窗生成的列是偽列,不能用於實際操作:
//開窗形成的列是偽列,不能用於實際操作
session.sql(
"""
|select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
|where rn <=2
""".stripMargin).show()

操作04:偽列的使用:
由於開窗形成的偽列不能被直接用於查詢,那么我們可以將整個開窗語句的操作作為一個子查詢使用,那么開窗語句的結果集對於父查詢來說就是一張完整的表,這時候偽列就是一個有效的列,可以用於查詢:
//開窗生成的偽列不能用於直接查詢,但是我們可以將開窗語句的結果集作為一張表或者說一個子查詢,這時候偽列就是一個有效的列,可以進行再次嵌套查詢,
session.sql(
"""
|select * from (
|select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
|) where rn <= 2
""".stripMargin).show()
+-------+--------+----+---+ |subject| t_name|cnts| rn| +-------+--------+----+---+ | javaee| laoshi| 9| 1| | javaee|chenchan| 6| 2| |bigdata| haiyuan| 15| 1| |bigdata| lichen| 6| 2| | php| laoli| 3| 1| | php| laoliu| 1| 2| +-------+--------+----+---+
操作05:【開窗嵌套開窗】rank() over() 函數
在row_number() over() 分區+開窗的基礎上,再次進行rank() over() 按照cnts進行全部數據的開窗
//開窗嵌套開窗:
//rank() over() 函數
session.sql(
"""
|select t.*,rank() over(order by cnts desc) rn1 from (
|select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) rn from middleTemp
|) t
|where rn <= 2
""".stripMargin).show()
+-------+--------+----+---+---+ |subject| t_name|cnts| rn|rn1| +-------+--------+----+---+---+ |bigdata| haiyuan| 15| 1| 1| | javaee| laoshi| 9| 1| 2| | javaee|chenchan| 6| 2| 3| |bigdata| lichen| 6| 2| 3| | php| laoli| 3| 1| 5| | php| laoliu| 1| 2| 6| +-------+--------+----+---+---+
操作06:dense_rank() over() 函數 【三個開窗函數的業務對比】:
//dense_rank() over() 函數
//三個開窗函數的業務對比:
session.sql(
"""
|select t.*,rank() over(order by cnts desc) rank,
|row_number() over(order by cnts desc) row_n,
|dense_rank() over(order by cnts desc) dense_n
|from (
|select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) row_n_par from middleTemp
|) t
|where row_n_par <= 2
""".stripMargin).show()
+-------+--------+----+---------+----+-----+-------+ |subject| t_name|cnts|row_n_par|rank|row_n|dense_n| +-------+--------+----+---------+----+-----+-------+ |bigdata| haiyuan| 15| 1| 1| 1| 1| | javaee| laoshi| 9| 1| 2| 2| 2| | javaee|chenchan| 6| 2| 3| 3| 3| |bigdata| lichen| 6| 2| 3| 4| 3| | php| laoli| 3| 1| 5| 5| 4| | php| laoliu| 1| 2| 6| 6| 5| +-------+--------+----+---------+----+-----+-------+

操作07:整合為一句SQL完成:
//合並兩個SQL語句:
session.sql(
"""
|select t.*,rank() over(order by cnts desc) rank,
|row_number() over(order by cnts desc) row_n,
|dense_rank() over(order by cnts desc) dense_n
|from
|(select subject,t_name,cnts,row_number() over(partition by subject order by cnts desc) row_n_par from
|(select subject,t_name,count(*) cnts from temp group by subject,t_name)) t
|where row_n_par <= 2
""".stripMargin).show()
+-------+--------+----+---------+----+-----+-------+ |subject| t_name|cnts|row_n_par|rank|row_n|dense_n| +-------+--------+----+---------+----+-----+-------+ |bigdata| haiyuan| 15| 1| 1| 1| 1| | javaee| laoshi| 9| 1| 2| 2| 2| | javaee|chenchan| 6| 2| 3| 3| 3| |bigdata| lichen| 6| 2| 3| 4| 3| | php| laoli| 3| 1| 5| 5| 4| | php| laoliu| 1| 2| 6| 6| 5| +-------+--------+----+---------+----+-----+-------+
