spark sql 窗口函數over partition by


1、窗口函數需要使用hiveContext,故引入如下包

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" % "provided"

libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.4.1" % "provided"

libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.4.1"

關於hiveContext,需要說明一點,使用hiveContext不是說一定要部署hive,像row_number() over(partition by 。。。。)窗口函數就不用,

另外,在spark-shell里,你看到的sqlContext其實就是HiveContext(這也就是為什么會在運行spark-shell的目錄產生一個derby文件derby.log和文件夾metastore_db),

也就是說,你在spark-shell里邊可以直接使用窗口函數(注意:真正寫的spark app jar包,必須把hive打進去,才能在集群上運行,這點與spark-shell不同)

2、使用窗口函數,取每個mac的第一條記錄

sqlContext.read.load(s"hdfs://myspark/logs").registerTempTable("logs")
sql(
s"""select *
from (select mac_address, remote_ip, event_date, country, province, city,
row_number() over(partition by mac_address order by event_date) as rn
from logs where event_date <=$event_date_int) as group_by_mac
where rn =1
""").drop("rn").registerTempTable("mac_first_result")

3、關於where 

from logs where event_date <=$event_date_int  //建議使用支持filterpushdown的數據格式,如,spark 默認的parquet

4、關於性能

使用窗口函數時,建議需要做cache的,就做下cache,每算一次還是挺花費時間,消耗性能的

5、其他窗口函數,大家自行摸索吧

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM