開始編寫我們的統計邏輯,使用row_number()函數
先說明一下,row_number()開窗函數的作用
其實就是給每個分組的數據,按照其排序順序,打上一個分組內行號
比如說,有一個分組20151001,里面有三條數據,1122,1121,1124
那么對這個分組的每一行使用row_number()開窗函數以后,三行依次會獲得一個組內的行號
行號從1開始遞增,比如1122,1 1121,2 1124,3row_number()開窗函數的語法說明
首先可以在select查詢時,使用row_number()函數
其次,row_number()函數后面先跟上over關鍵字
然后括號中是partition by也就是根據哪個字段進行分組
其次是可以用order by進行組內排序 然后row_number()就可以給每個組內的行,一個組內行號
RowNumberWindowFunc.scala
package com.UDF.row_numberFUNC
import org.apache.spark.sql.{SaveMode, SparkSession}
object RowNumberWindowFunc extends App {
val spark = SparkSession
.builder()
.appName("RowNumberWindowFunc")
.master("local[2]")
.getOrCreate()
//創建銷售額表,sales表
spark.sql("drop table if exists sales")
spark.sql("create table if not exists sales ("
+ "product string, "
+ "category string, "
+ "revenue bigint)")
spark.sql("load data "
+ "load inpath '/usr/local/data'"
+ "into table sales")
//開始編寫我們的統計邏輯,使用row_number()函數
//先說明一下,row_number()開窗函數的作用
//其實就是給每個分組的數據,按照其排序順序,打上一個分組內行號
//比如說,有一個分組20151001,里面有三條數據,1122,1121,1124
//那么對這個分組的每一行使用row_number()開窗函數以后,三行依次會獲得一個組內的行號
//行號從1開始遞增,比如1122,1 1121,2 1124,3
val top3SalesDF = spark.sql(""
+ "select product,category,revenue"
+ "from ("
+ "select product,category,revenue,"
//row_number()開窗函數的語法說明
//首先可以在select查詢時,使用row_number()函數
//其次,row_number()函數后面先跟上over關鍵字
//然后括號中是partition by也就是根據哪個字段進行分組
//其次是可以用order by進行組內排序
//然后row_number()就可以給每個組內的行,一個組內行號
+ "row_number() over (partition by catefory order by revenue desc ) rank "
+ " from sales) tmp_sales "
+ "where rank <= 3")
//將魅族排名前三的數據,保存到一個表中
spark.sql("drop table if exists top3_sales")
top3SalesDF.write //保存,要用write開頭
.mode(SaveMode.Overwrite) //覆蓋模式
.format("hive") //格式hive (hive默認格式,數據文件純文本無壓縮存儲)
.saveAsTable("top3_sales") //做為表保存
/**
* format支持的格式有:
* hive (hive默認格式,數據文件純文本無壓縮存儲)
* parquet (spark默認采用格式)
* orc
* json
* csv
* text (若用saveAsTable只能保存一個列的df)
* jdbc
* libsvm
*/
}