在使用org.apache.spark.sql.functions中的Window functions過程中,遇到了幾個棘手的問題,經過不斷搜尋和多次試驗,終於找到了解決方法。
首先看例子:
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, Row} import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object WindowQueryTest { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setAppName("WndFnc_demo").setMaster("local")) val hiveContext = new HiveContext(sc) val data = Seq(("A", 4), ("C", 1), ("D", 1), ("B", 2), ("B", 2), ("D", 4), ("A", 1), ("B", 4)) val withRowNumbers: Seq[(String, Int, Int)] = data.zipWithIndex.map(e => (e._1._1, e._1._2, e._2)) val rdd: RDD[Row] = sc.parallelize(withRowNumbers).map(triplet => Row(triplet._1, triplet._2, triplet._3)) hiveContext.sql("DROP TABLE IF EXISTS delme") hiveContext.sql( """CREATE TABLE `delme`( `key` string, `val` int, `ord` int)""") val schema = StructType(Seq(StructField("key", StringType), StructField("val", IntegerType), StructField("ord", IntegerType))) hiveContext.createDataFrame(rdd, schema).write.mode(SaveMode.Append).saveAsTable("delme") val qRes = hiveContext.sql("""SELECT key, val ,MAX(val)OVER(PARTITION BY key) mx ,MIN(val)OVER(PARTITION BY key) mn ,row_number() OVER(ORDER BY ord desc) revord ,rank() OVER(ORDER BY val) rnk FROM delme""") qRes.collect().foreach(println) } }
一、初始化必需使用HiveContext
如果初始化的是SQLContext實例:
val sqlContext = new SQLContext(sc)
則會報錯,提示必需使用HiveContext:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
……
HiveContext繼承自SQLContext。
class HiveContext(sc : org.apache.spark.SparkContext) extends org.apache.spark.sql.SQLContext with org.apache.spark.Logging
二、外部庫需要添加spark/lib中的三個jar文件依賴
External Libraies必需包含以下三個jar文件,datanucleus-api-jdo, datanucleus-core和datanucleus-rdbms:
工程編譯時將自動生成metastore_db文件夾和derby.log文件。
否則,出現如下錯誤信息:
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator ClassLoaderResolver for class "" gave error on creation : {1} org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1} …… 16/01/18 15:40:07 WARN HiveMetaStore: Retrying creating default database after error: Unexpected exception caught. javax.jdo.JDOFatalInternalException: Unexpected exception caught. …… 16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called 16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator ClassLoaderResolver for class "" gave error on creation : {1} org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1} …… 16/01/18 15:40:07 WARN Hive: Failed to access metastore. This class should not accessed in runtime. org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient …… 16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called 16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator ClassLoaderResolver for class "" gave error on creation : {1} org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1} …… 16/01/18 15:40:07 WARN HiveMetaStore: Retrying creating default database after error: Unexpected exception caught. javax.jdo.JDOFatalInternalException: Unexpected exception caught. …… 16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called 16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator ClassLoaderResolver for class "" gave error on creation : {1} org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1} …… Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ……
這三個文件存在於spark/lib中。
三、運行配置JVM參數JAVA_OPTS (FATAL!)
看起來Everything is OK。編譯執行程序,卻發生異常退出,而且只在最后報出main進程異常,沒有任何ERROR,很難發現到底是什么原因。
……
Exception in thread "main" Process finished with exit code 1
多次執行,會出現如下異常信息,重點在PermGen Space(持久加載區空間大小)。
Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183) at org.apache.spark.sql.hive.client.IsolatedClientLoader.<init>(IsolatedClientLoader.scala:179) at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226) at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392) at org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174) at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:177) at WindowQueryTest$.main(WindowQueryTest.scala:14) at WindowQueryTest.main(WindowQueryTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.OutOfMemoryError: PermGen space at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174) at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503) at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171) ... 18 more Process finished with exit code 1
再次編譯執行,還可能出現更長的異常信息,錯誤可能會變化,但萬變不離其宗,症結依舊是PermGen Space的大小!
解決方法:在Run Configuration中添加JVM options:-server -Xms512M -Xmx1024M -XX:PermSize=256M -XX:MaxNewSize=512M -XX:MaxPermSize=512M
各個參數可以根據具體機器配置調整。
四、WindowSpec指定窗口設置
再看這個列子:
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object WindowFunctions { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Window Functions").setMaster("local") val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) import hiveContext.implicits._ val l = (1997, 1) :: (1997, 4) :: (1998, 2) :: (1998, 3) :: (1999, 9) :: Nil val df = sc.parallelize(l).toDF("k", "v") val w = Window.orderBy($"k") val df1 = df.withColumn("No", rowNumber().over(w)) val rowW = w.rowsBetween(-2, 0) val rangeW = w.rangeBetween(-1, 0) df1.withColumn("row", avg($"v").over(rowW)).withColumn("range", avg($"v").over(rangeW)).show sc.stop() } }
得到結果:
org.apache.spark.sql.expressions.Window定義WindowSpec,並指定分組或者排序。
@org.apache.spark.annotation.Experimental object Window extends scala.AnyRef { @scala.annotation.varargs def partitionBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } @scala.annotation.varargs def partitionBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } @scala.annotation.varargs def orderBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } @scala.annotation.varargs def orderBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } }
定義的WindowSpec可以調用rowsBetween或者rangeBetween設置偏移量,定義窗口的區間范圍;甚至也可以重置分組和排序。
@org.apache.spark.annotation.Experimental class WindowSpec private[sql] (partitionSpec : scala.Seq[org.apache.spark.sql.catalyst.expressions.Expression], orderSpec : scala.Seq[org.apache.spark.sql.catalyst.expressions.SortOrder], frame : org.apache.spark.sql.catalyst.expressions.WindowFrame) extends scala.AnyRef { @scala.annotation.varargs def partitionBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } @scala.annotation.varargs def partitionBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } @scala.annotation.varargs def orderBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } @scala.annotation.varargs def orderBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } def rowsBetween(start : scala.Long, end : scala.Long) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } def rangeBetween(start : scala.Long, end : scala.Long) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ } private[sql] def withAggregate(aggregate : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = { /* compiled code */ } }
最后通過具體的窗口函數計算得到需要的列。
References:
[1] https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
[2] http://www.cnblogs.com/mingforyou/archive/2012/03/03/2378143.html
[3] http://sonra.io/window-functions-aka-analytic-functions-in-spark/
END