作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處
SparkSQL這塊兒從1.4開始支持了很多的窗口分析函數,像row_number這些,平時寫程序加載數據后用SQLContext 能夠很方便實現很多分析和查詢,如下
val sqlContext = new SQLContext(sc)
sqlContext.sql(“select ….”)
然而我看到Spark后續版本的DataFrame功能很強大,想試試使用這種方式來實現比如row_number這種功能,話不多說,快速用pyspark測試一下,記錄一下遇到的問題.
from pyspark.sql import Row, functions as F from pyspark.sql.window import Window from pyspark import SparkContext sc = SparkContext("local[3]", "test data frame on 2.0") testDF = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78) )).toDF() (testDF.select("c", "s", F.rowNumber().over(Window.partitionBy("c").orderBy("s")).alias("rowNum") ).show())
spark-submit提交任務后直接報錯如下
告訴我RDD沒有toDF()屬性,查閱spark官方文檔得知還是需要用SQLContext或者sparkSession來初始化一下,先考慮用SQLContext吧,修改代碼如下
from pyspark.sql import Row, functions as F from pyspark.sql.window import Window from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext("local[3]", "test data frame on 2.0") rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78))) sqlContext = SQLContext(sc) testDF = rddData.toDF() (testDF.select("c", "s", F.rowNumber().over(Window.partitionBy("c").orderBy("s")).alias("rowNum") ).show())
spark-submit提交任務后接着報另外一個錯,如下
ok,錯誤很清楚,rowNumber這里我寫錯了,沒有這個函數,查閱spark源碼中的functions.py,會發現如下說明
這里說了,rowNumber從1.6開始,用row_number代替,直接修改py腳本如下
from pyspark.sql import Row, functions as F from pyspark.sql.window import Window from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext("local[3]", "test data frame on 2.0") rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78))) sqlContext = SQLContext(sc) testDF = rddData.toDF() (testDF.select("c", "s", F.row_number().over(Window.partitionBy("c").orderBy("s")).alias("rowNum") ).show())
這次運行沒問題,結果如下
但是我只想取每組rowNum為1的那個,代碼如下
from pyspark.sql import Row, functions as F from pyspark.sql.window import Window from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext("local[3]", "test data frame on 2.0") rddData = sc.parallelize( (Row(c="class1", s=50), Row(c="class2", s=40), Row(c="class3", s=70), Row(c="class2", s=49), Row(c="class3", s=29), Row(c="class1", s=78))) sqlContext = SQLContext(sc) testDF = rddData.toDF() result = (testDF.select("c", "s", F.row_number().over(Window.partitionBy("c").orderBy("s")).alias("rowNum"))) finalResult = result.where(result.rowNum <= 1).show()
可以看到,sql能實現的DataFrame的函數都可以實現,畢竟DataFrame是基於row和column的,就是寫起來麻煩點.
參考資料:http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html