1、窗口函數需要使用hiveContext,故引入如下包
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.4.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.4.1"
關於hiveContext,需要說明一點,使用hiveContext不是說一定要部署hive,像row_number() over(partition by 。。。。)窗口函數就不用,
另外,在spark-shell里,你看到的sqlContext其實就是HiveContext(這也就是為什么會在運行spark-shell的目錄產生一個derby文件derby.log和文件夾metastore_db),
也就是說,你在spark-shell里邊可以直接使用窗口函數(注意:真正寫的spark app jar包,必須把hive打進去,才能在集群上運行,這點與spark-shell不同)
2、使用窗口函數,取每個mac的第一條記錄
sqlContext.read.load(s"hdfs://myspark/logs").registerTempTable("logs")
sql(
s"""select *
from (select mac_address, remote_ip, event_date, country, province, city,
row_number() over(partition by mac_address order by event_date) as rn
from logs where event_date <=$event_date_int) as group_by_mac
where rn =1
""").drop("rn").registerTempTable("mac_first_result")
3、關於where
from logs where event_date <=$event_date_int //建議使用支持filterpushdown的數據格式,如,spark 默認的parquet
4、關於性能
使用窗口函數時,建議需要做cache的,就做下cache,每算一次還是挺花費時間,消耗性能的
5、其他窗口函數,大家自行摸索吧