分析函數的應用場景:
(1)用於分組后組內排序
(2)指定計算范圍
(3)Top N
(4)累加計算
(5)層次計算
分析函數的一般語法:
分析函數的語法結構一般是:
分析函數名(參數) over (子partition by 句 order by 字句 rows/range 字句)
1、分析函數名:sum、max、min、count、avg等聚合函數
lead、lag等比較函數
rank 等排名函數
2、over:關鍵字,表示前面的函數是分析函數,不是普通的聚合函數
3、分析字句:over關鍵字后面括號內的內容為分析子句,包含以下三部分內容
-
- partition by :分組子句,表示分析函數的計算范圍,各組之間互不相干
- order by:排序子句,表示分組后,組內的排序方式
- rows/range:窗口子句,是在分組(partition by)后,表示組內的子分組(也即窗口),是分析函數的計算范圍窗口
數據准備:
cookieid,createtime,pv cookie1,2015-04-10,1 cookie1,2015-04-11,5 cookie1,2015-04-12,7 cookie1,2015-04-13,3 cookie1,2015-04-14,2 cookie1,2015-04-15,4 cookie1,2015-04-16,4 cookie2,2015-04-10,2 cookie2,2015-04-11,3 cookie2,2015-04-12,5 cookie2,2015-04-13,6 cookie2,2015-04-14,3 cookie2,2015-04-15,9 cookie2,2015-04-16,7
val conf = new SparkConf() val ssc = new SparkSession.Builder() .appName(this.getClass.getSimpleName) .master("local[2]") .config(conf) .getOrCreate() val sc = ssc.sparkContext sc.setLogLevel("WARN") val df = ssc.read .option("header", "true") .option("inferschema", "true") .csv("file:///E:/TestFile/analyfuncdata.txt") df.show(false) df.printSchema() df.createOrReplaceTempView("table") val sql = "select * from table" ssc.sql(sql).show(false)
測試需求:
1、按照cookid進行分組,createtime排序,並前后求和
ssc.sql(
"""
|select cookieid,createtime,pv,
| sum(pv) over(partition by cookieid order by createtime) as pv1,
|from table
""".stripMargin).show
運行結果:
2、與方式1 等價的寫法
ssc.sql( """ |select cookieid,createtime,pv, | sum(pv) over(partition by cookieid order by createtime) as pv1, | sum(pv) over(partition by cookieid order by createtime | rows between unbounded preceding and current row) as pv2 |from table """.stripMargin).show
注:這里涉及到窗口子句,后面詳細敘述。
運行結果:
可以看到方式1的寫法其實是方式2的一種默認形式
3、按照cookieid分組,不進行排序,求和
ssc.sql( """ |select cookieid,createtime,pv, | sum(pv) over(partition by cookieid) as pv1 |from table """.stripMargin).show
運行結果:
可以看出,在不進行排序的情況下,最終的求和列是每個分組的所有值得和,並非前后值相加
4、不進行分組,直接進行排序,求和(有問題)
ssc.sql( """ |select cookieid,createtime,pv, | sum(pv) over(order by createtime) as pv1 |from table """.stripMargin).show
運行結果:
由結果可以看出,如果只是按照排序,不進行分區求和,得出來的結果好像亂七八糟的,有問題,所以我一般不這么做
5、over子句為空的情況下
ssc.sql( """ |select cookieid,createtime,pv, | sum(pv) over() as pv1 |from table """.stripMargin).show
運行結果:
由結果看出,該種方式,其實就是對所有的行進行了求和
window子句
前面一開始執行了一個關於窗口子句:
ssc.sql( """ |select cookieid,createtime,pv, | sum(pv) over(partition by cookieid order by createtime) as pv1, | sum(pv) over(partition by cookieid order by createtime | rows between unbounded preceding and current row) as pv2 |from table """.stripMargin).show
同一個select查詢中存在多個窗口函數時,他們相互之間是沒有影響的,每個窗口函數應用自己的規則
rows between unbounded preceding and current row:
-
- rows between ... and ...(開始到結束,位置不能交換)
- unbounded preceding :從第一行開始
- current row:到當前行
當然,上述的從第幾行開始到第幾行是可以自定義的:
-
- 首行:unbounded preceding
- 末行:unbounded following
- 前 n 行:n preceding
- 后 n 行:n following
示例需求:
pv:原始值
pv1:起始行到當前行的累計值
pv2:等同於pv1,語法不同
pv3:僅有一個合計值
pv4:前三行到當前行的累計值
pv5:前三行到后一行的累計值
pv6:當前行到最后一行的累計值
注:這里所指的前三行,並不包含當前行本身
運行結果:
row & range
range:是邏輯窗口,是指定當前行對應值的范圍取值,列數不固定,只要行值在范圍內,對應列都包含在內
rows:是物理窗口,根據order by子句排序后,取前n行的數據以及后n行的數據進行計算(與當前行的值無關,至於排序由的行號有關)
需求案例:
1、對pv進行排名,求前一名到后兩名的和
ssc.sql( """ |select cookieid,createtime,pv, | sum(pv) over(partition by cookieid order by pv | range between 1 preceding and 2 following) as pv1 |from table """.stripMargin).show
運行結果:
解釋:
其他的聚合函數,用法與sum類似,比如:avg,min,max,count等
排名函數
排序方式:
-
- row_number() :順序排,忽略 並列排名
- dense_rank() :有並列,后面的元素接着排名
- rank() :有並列,后面的元素跳着排名
- ntile(n) :用於將分組數據按照順序切分成n片
例:
ssc.sql( """ |select cookieid,createtime,pv, | row_number() over(partition by cookieid order by pv desc) rank1, | rank() over(partition by cookieid order by pv desc) rank2, | dense_rank() over(partition by cookieid order by pv desc) rank3, | ntile(3) over(partition by cookieid order by pv desc) rank4 |from table """.stripMargin).show
運行結果:
lag & lead
lag(field,n):取前 n 行的值
lead(field n):取后 n 行的值
例:
ssc.sql( """ |select cookieid,createtime,pv, |lag(pv) over(partition by cookieid order by pv) as col1, |lag(pv,1) over(partition by cookieid order by pv) as col2, |lag(pv,2) over(partition by cookieid order by pv) as col3 |from table """.stripMargin).show
運行結果:
ssc.sql( """ |select cookieid,createtime,pv, |lead(pv) over(partition by cookieid order by pv) as col1, |lead(pv,1) over(partition by cookieid order by pv) as col2, |lead(pv,2) over(partition by cookieid order by pv) as col3 |from table """.stripMargin).show
運行結果:
ssc.sql( """ |select cookieid,createtime,pv, |lead(pv,-2) over(partition by cookieid order by pv) as col1, |lag(pv,2) over(partition by cookieid order by pv) as col2 |from table """.stripMargin).show
運行結果:
first_value & last_value
first_value(field) :取分組內排序后,截止到當前行的第一個值
last_value(field) :取分組內排序后,截止到當前行的最后一個值
例:
ssc.sql( """ |select cookieid,createtime,pv, |row_number() over(partition by cookieid order by pv desc) as rank1, |first_value(createtime) over(partition by cookieid order by pv desc) as rank2, |first_value(pv) over(partition by cookieid order by pv desc) as rank3 |from table """.stripMargin).show
運行結果:
ssc.sql( """ |select cookieid,createtime,pv, |row_number() over(partition by cookieid order by pv desc) as rank1, |last_value(createtime) over(partition by cookieid order by pv desc) as rank2, |last_value(pv) over(partition by cookieid order by pv desc) as rank3 |from table """.stripMargin).show
運行結果:
cube & rollup
cube:根據group by維度的所有組合進行聚合
rollup:是cube的自己,以左側的維度為主,進行層級聚合
例:
ssc.sql( """ |select cookieid,createtime,sum(pv) |from table |group by cube(cookieid,createtime) |order by 1,2 """.stripMargin).show(100,false)
運行結果:
ssc.sql( """ |select cookieid,createtime,sum(pv) |from table |group by rollup(cookieid,createtime) |order by 1,2 """.stripMargin).show(100,false)
運行結果:
DSL
import org.apache.spark.sql.expressions.Window import ssc.implicits._ import org.apache.spark.sql.functions._ val w1 = Window.partitionBy("cookieid").orderBy("createtime") val w2 = Window.partitionBy("cookieid").orderBy("pv") //聚合函數 df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show() //排名 df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show() df.select($"cookieid", $"pv", dense_rank().over(w2).alias("dense_rank")).show() df.select($"cookieid", $"pv", row_number().over(w2).alias("row_number")).show() //lag、lead df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("row_number")).show() df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("row_number")).show() //cube、rollup df.cube("cookieid", "createtime").agg(sum("pv")).show() df.rollup("cookieid", "createtime").agg(sum("pv")).show()
運行結果:
1、聚合函數
2、排名函數:
lag、lead
cube、rollup