這兩天和同事一起在想着如何把一個表的記錄減少,表記錄包含了:objectid(主小區信息),gridid(歸屬柵格),height(高度),rsrp(主小區rsrp),n_objectid(鄰區),n_rsrp(鄰小區rsrp)
記錄中一個主小區對應有多個鄰區信息,在分組合並記錄時:
1)先按照objectid,gridid,height進行分組,把所有鄰區信息給存儲到集合中;
2)基於1)的結果之上,按照objectid分組,把gridid,height,rsrp,array(n_objectid),array(n_rsrp)作為集合存儲。
實現思路一:采用array<array<string>>單維元祖存儲
[my@sdd983 tommyduan_service]$ /app/my/fi_client/spark2/Spark2x/spark/bin/spark-shell 2018-03-24 14:10:38,583 | WARN | main | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62) 2018-03-24 14:10:38,827 | WARN | main | In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66) 2018-03-24 14:10:38,837 | WARN | main | Detected deprecated memory fraction settings: [spark.shuffle.memoryFraction, spark.storage.memoryFraction, spark.storage.unrollFraction]. As of Spark 1.6, execution and storage memory management are unified. All memory fractions used in the old model are now deprecated and no longer read. If you wish to use the old memory management, you may explicitly enable `spark.memory.useLegacyMode` (not recommended). | org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66) Spark context Web UI available at http://192.168.143.332:23799 Spark context available as 'sc' (master = local[*], app id = local-1521871839949). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_72) Type in expressions to have them evaluated. Type :help for more information. scala> import spark.sql import spark.sql scala> import spark.implicits._ import spark.implicits._ scala> sql("use my_hive_db") 2018-03-24 14:10:56,686 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101) 2018-03-24 14:10:58,250 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101) res0: org.apache.spark.sql.DataFrame = [] scala> var fpb_df = sql( | s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis | |from fpd_tabke | |where p_city=571 and p_day=20180322 limit 50 | |""".stripMargin) fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields] scala> var fpb_groupby_obj_grid_height_df1 = fpb_df.groupBy("objectid", "gridid", "height", "rsrp").agg( | collect_list("n_objectid").alias("n_objectid1"), | collect_list("rsrp_dis").alias("rsrp_dis1") | ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("n_objectid1").alias("n_objectid"), col("rsrp_dis1").alias("rsrp_dis")) fpb_groupby_obj_grid_height_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: string ... 4 more fields] scala> var fpb_groupby_obj_df1 = fpb_groupby_obj_grid_height_df1.groupBy("objectid").agg( | collect_list("gridid").alias("gridid1"), | collect_list("height").alias("height1"), | collect_list("rsrp").alias("rsrp1"), | collect_list("n_objectid").alias("n_objectid1"), | collect_list("rsrp_dis").alias("rsrp_dis1") | ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("n_objectid1").alias("n_objectid"),col("rsrp_dis1").alias("rsrp_dis")) fpb_groupby_obj_df1: org.apache.spark.sql.DataFrame = [objectid: string, gridid: array<string> ... 4 more fields] scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).show +---------+--------------------+--------------------+--------------------+--------------------+--------------------+ | _1| _2| _3| _4| _5| _6| +---------+--------------------+--------------------+--------------------+--------------------+--------------------+ |100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray(101...|[WrappedArray(0.0...| +---------+--------------------+--------------------+--------------------+--------------------+--------------------+ scala> fpb_groupby_obj_df1.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[String]](4), s.getSeq[Seq[Double]](5))).schema res4: org.apache.spark.sql.types.StructType = StructType( StructField(_1,StringType,true), StructField(_2,ArrayType(StringType,true),true), StructField(_3,ArrayType(StringType,true),true), StructField(_4,ArrayType(StringType,true),true), StructField(_5,ArrayType(ArrayType(StringType,true),true),true), StructField(_6,ArrayType(ArrayType(DoubleType,false),true),true) )
方案二:存儲格式為:array<array<(string,double)>>,讀取失敗。
scala> sql("use my_hive_db") 2018-03-24 14:10:56,686 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101) 2018-03-24 14:10:58,250 | WARN | main | load mapred-default.xml, HIVE_CONF_DIR env not found! | org.apache.hadoop.hive.ql.session.SessionState.loadMapredDefaultXml(SessionState.java:1101) res0: org.apache.spark.sql.DataFrame = [] scala> var fpb_df = sql( | s"""|select gridid,height,objectid,n_objectid,rsrp,(rsrp-n_rsrp) as rsrp_dis | |from fpd_tabke | |where p_city=571 and p_day=20180322 limit 50 | |""".stripMargin) fpb_df: org.apache.spark.sql.DataFrame = [gridid: string, height: string ... 4 more fields] scala> var fpb_groupby_obj_grid_height_df2 = fpb_df.map(s => (s.getAs[String]("objectid"), s.getAs[String]("gridid"), s.getAs[String]("height"), s.getAs[String]("rsrp"), (s.getAs[String]("n_objectid"), s.getAs[Double]("rsrp_dis"))) ).toDF("objectid", "gridid", "height", "rsrp", "neighbour").groupBy("objectid", "gridid", "height", "rsrp").agg( collect_list("neighbour").alias("neighbour1") ).select(col("objectid"), col("gridid"), col("height"), col("rsrp"), col("neighbour1").alias("neighbour")) scala> var fpb_groupby_obj_df2 = fpb_groupby_obj_grid_height_df2.groupBy("objectid").agg( collect_list("gridid").alias("gridid1"), collect_list("height").alias("height1"), collect_list("rsrp").alias("rsrp1"), collect_list("neighbour").alias("neighbour1") ).select(col("objectid"), col("gridid1").alias("gridid"), col("height1").alias("height"), col("rsrp1").alias("rsrp"), col("neighbour1").alias("neighbour")) scala> val encoder = Encoders.tuple( | Encoders.STRING, | Encoders.javaSerialization[Seq[String]], | Encoders.javaSerialization[Seq[String]], | Encoders.javaSerialization[Seq[String]], | Encoders.javaSerialization[Seq[Seq[(String, Double)]]] | ) encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary] scala> fpb_groupby_obj_df2.show +---------+--------------------+--------------------+--------------------+--------------------+ | objectid| gridid| height| rsrp| neighbour| +---------+--------------------+--------------------+--------------------+--------------------+ |100700931|[2676906_708106, ...|[0, 5, 0, 0, 0, 0...|[-130.399994, -12...|[WrappedArray([10...| +---------+--------------------+--------------------+--------------------+--------------------+ scala> fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show +---------+--------------------+--------------------+--------------------+--------------------+ | value| _2| _3| _4| _5| +---------+--------------------+--------------------+--------------------+--------------------+ |100700931|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...|[AC ED 00 05 73 7...| +---------+--------------------+--------------------+--------------------+--------------------+ scala> fpb_groupby_obj_df2.map(s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4))).show() [Stage 6:======================================================>(963 + 1) / 964]2018-03-24 15:09:09,267 | ERROR | Executor task launch worker for task 3859 | Exception in task 0.0 in stage 7.0 (TID 3859) | org.apache.spark.internal.Logging$class.logError(Logging.scala:91) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:381) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) scala> val encoder = Encoders.tuple( | Encoders.STRING, | Encoders.kryo[Seq[String]], | Encoders.kryo[Seq[String]], | Encoders.kryo[Seq[String]], | Encoders.kryo[Seq[Seq[(String, Double)]]] | ) encoder: org.apache.spark.sql.Encoder[(String, Seq[String], Seq[String], Seq[String], Seq[Seq[(String, Double)]])] = class[_1[0]: string, _2[0]: binary, _3[0]: binary, _4[0]: binary, _5[0]: binary] scala> fpb_groupby_obj_df2.map { s => (s.getAs[String]("objectid"), s.getSeq[String](1), s.getSeq[String](2), s.getSeq[String](3), s.getSeq[Seq[(String, Double)]](4)) }(encoder).show +---------+--------------------+--------------------+--------------------+--------------------+ | value| _2| _3| _4| _5| +---------+--------------------+--------------------+--------------------+--------------------+ |100700931|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...|[01 00 73 63 61 6...| +---------+--------------------+--------------------+--------------------+--------------------+