spark提示Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;


spark提示Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;

起因

編寫了一個處理兩列是否相等的UDF,這兩列的數據結構是一樣的,但是結構比較復雜,如下:

|-- list: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Date: integer (nullable = true)
 |    |    |    |    |-- Name: string (nullable = true)
 |-- list2: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Date: integer (nullable = true)
 |    |    |    |    |-- Name: string (nullable = true)

也就是說Array里嵌着Map,Map里還嵌着一個Array,只能依次去比較,編寫的UDF如下:

case class AppList(Date: Int, versionCode: Int, Name:String)

    def isMapEqual(map1: Map[String, Array[AppList]], map2:Map[String, Array[AppList]]): Boolean = {
      try{
        if (map1.size != map2.size){
          return false
        } else{
          for ( x <- map1.keys){
            if (map1(x) != map2(x)){
              return false
            }
          }
          return true
        }
      } catch {
        case e: Exception => false
      }
    }

    def isListEqual(list1: Array[Map[String, Array[AppList]]], list2:Seq[Map[String, Seq[AppList]]]): Boolean = {
      try {
        if (list1.length != list2.length){
           return false
        } else if (list1.length == 0 || list2.length == 0){
          return false
        } else {
          return isMapEqual(list1(0), list2(0))
        }
      } catch {
        case e: Exception => false
      }
    }

    val isColumnEqual = udf((list1: Array[Map[String, Array[AppList]]], list2:Array[Map[String, Array[AppList]]]) => {
      isListEqual(list1, list2)
    })

然后我就貼到spark-shell里去執行了下面語句:

val dat = df.withColumn("equal", isColumnEqual($"list1", $"list2"))
dat.show()

此時就出現了如下錯誤:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<map<string,array<struct<Date:int,Name:string>>>>, array<map<string,array<struct<Date:int,Name:string>>>>) => boolean)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.collection.immutable.Map;
  at $anonfun$1.apply(<console>:42)
  ... 16 more
解決辦法

所謂的解決辦法,自然是去谷歌了…

這里看到,說把Array改成Seq就好了,囧,嘗試了一下,果然就好了

原因

這里說:

So it looks like the ArrayType on Dataframe "idDF" is really a WrappedArray and not an Array - So the function call to "filterMapKeysWithSet" failed as it expected an Array but got a WrappedArray/ Seq instead (which doesn't implicitly convert to Array in Scala 2.8 and above).

意思是,此Array非Scala中的原生Array,而是封裝了一下的Array(有錯的一定指出來,我都沒寫過Scala,慌

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM