/**
* @author DT大數據夢工廠
* 新浪微博 http://weibo.com/ilovepains/
* 微信公眾賬號:DT_Spark
* 直播地址 YY頻道:68917580
*/
object SparkSQLWindowFunctionOps {
def main(args: Array[String]) {
/**
* 創建SparkConf對象,設置Spark程序運行時的配置信息
* 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置為local,則代表
* Spark程序運行在本地,適合機器配置一般的初學者
*/
val conf = new SparkConf().setAppName("SparkSQLWindowFunctionOps").setMaster("spark://hadoop2001:7077")
/**
* 創建SparkContext對象
* SparkContext對象時Spark程序所有功能的唯一入口,無論是scala、java、python等都必須有一個SparkContext。
* SparkContext的核心作用:初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler,TaskScheduler,SchedulerBackend
* 同事還會負責Spark程序往Master注冊程序等
* SparkContext是整個Spark應用程序中最為至關重要的一個對象
*/
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use hive")
hiveContext.sql("DROP TABLE IF EXISTS scores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INT)"
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n'")
hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/testdate/topNGroup.txt' INTO TABLE scores")
/**
* 使用子查詢的方式完成目標數據的提取,在目標數據內部使用窗口函數row_number來進行分組排序:
* PARTITION BY:指定窗口函數分組的Key
* ORDER BY : 分組進行排序
*/
val result = hiveContext.sql("SELECT name,score " +
"FROM (" +
"SELECT name,score," +
"row_number() OVER (PARTITION BY name ORDER BY score DESC) rank" +
" FROM scores " +
") sub_scores " +
"WHERE rank <= 4")
result.show() //在Driver的控制台上打印出結果內容
hiveContext.sql("DROP TABLE IF EXISTS sortedResultScores")
result.saveAsTable("sortedResultScores")
}
}
DT大數據夢工廠由王家林老師及其團隊打造,旨在為社會培養100萬優秀大數據人才,Spark已是目前大數據行業主流數據處理框架和未來趨勢。
關注DT大數據夢工廠公眾號:
DT_Spark
查看免費公開課,內容絕對詳細。
YY永久免費直播頻道:68917580
王家林老師聯系方式: