Spark的DataFrame的窗口函數使用


作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處

SparkSQL這塊兒從1.4開始支持了很多的窗口分析函數,像row_number這些,平時寫程序加載數據后用SQLContext 能夠很方便實現很多分析和查詢,如下

val sqlContext = new SQLContext(sc)

sqlContext.sql(“select ….”)

然而我看到Spark后續版本的DataFrame功能很強大,想試試使用這種方式來實現比如row_number這種功能,話不多說,快速用pyspark測試一下,記錄一下遇到的問題.

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
from pyspark import SparkContext
sc = SparkContext("local[3]", "test data frame on 2.0")
testDF = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78) )).toDF()
(testDF.select("c", "s", F.rowNumber().over(Window.partitionBy("c").orderBy("s")).alias("rowNum") ).show())
 
spark-submit提交任務后直接報錯如下
rddNoDFMethod

告訴我RDD沒有toDF()屬性,查閱spark官方文檔得知還是需要用SQLContext或者sparkSession來初始化一下,先考慮用SQLContext吧,修改代碼如下

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("local[3]", "test data frame on 2.0")
rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78)))
sqlContext = SQLContext(sc) testDF = rddData.toDF()
(testDF.select("c", "s", F.rowNumber().over(Window.partitionBy("c").orderBy("s")).alias("rowNum") ).show())

spark-submit提交任務后接着報另外一個錯,如下

DFNoRow_number

ok,錯誤很清楚,rowNumber這里我寫錯了,沒有這個函數,查閱spark源碼中的functions.py,會發現如下說明

sparkDataFrameTest

這里說了,rowNumber從1.6開始,用row_number代替,直接修改py腳本如下

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("local[3]", "test data frame on 2.0")
rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78)))
sqlContext = SQLContext(sc)
testDF = rddData.toDF()
(testDF.select("c", "s", F.row_number().over(Window.partitionBy("c").orderBy("s")).alias("rowNum") ).show())

這次運行沒問題,結果如下

rowNumber_result

但是我只想取每組rowNum為1的那個,代碼如下

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("local[3]", "test data frame on 2.0")
rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78)))
sqlContext = SQLContext(sc)
testDF = rddData.toDF()
result = (testDF.select("c", "s", F.row_number().over(Window.partitionBy("c").orderBy("s")).alias("rowNum")))
finalResult = result.where(result.rowNum <= 1).show()

可以看到,sql能實現的DataFrame的函數都可以實現,畢竟DataFrame是基於row和column的,就是寫起來麻煩點.

參考資料:http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM