若DataFrame為簡單的二維表,則可以借助RDD的zipWithIndex實現索引列添加。
scala> val df = spark.createDataFrame(Seq(("ming",20,1234),("hong",19,1235),("zhi",21,1236))) scala> df.show +----+---+----+ | _1| _2| _3| +----+---+----+ |ming| 20|1234| |hong| 19|1235| | zhi| 21|1236| +----+---+----+ scala> val rdd = df.rdd.zipWithIndex() rdd: org.apache.spark.rdd.RDD[(org.apache.spark.sql.Row, Long)] = ZippedWithIndexRDD[5] at zipWithIndex at <console>:25 scala> rdd.collect.foreach(println) ([ming,20,1234],0) ([hong,19,1235],1) ([zhi,21,1236],2)
若DataFrame來源於JSON格式數據,直接通過rdd.zipWithIndex實現索引列添加,會報如下錯誤:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_1") - root class: "scala.Tuple2" at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:650) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:644) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56) at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906) at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452) at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34) at com.ipinyou.mip.ma.worker.ABTestProcessor$.getAudienceByNumber(ABTestProcessor.scala:69) at com.ipinyou.mip.ma.worker.ABTestProcessor$.execute(ABTestProcessor.scala:47) at com.ipinyou.mip.ma.worker.ABTestProcessor$.main(ABTestProcessor.scala:92) at com.ipinyou.mip.ma.worker.ABTestProcessor.main(ABTestProcessor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
解決辦法是,選擇DataFrame中數據維度單一的列,轉成rdd后使用zipWithIndex,最后將此DataFrame與原始DataFrame做join,這個操作的局限是所選擇的列必須不能存在重復值。
audience數據示例:
val indexColName = "index" val cdpId = audience.select(cdpIdColName).rdd.map(_.getString(0)).zipWithIndex().toDF(cdpIdColName, indexColName) val audienceWithIndex = audience.join(cdpId, Seq(cdpIdColName), "left")