关于问题描述: spark中的union导致数据不符合预期,出现数据错位的情况
package com.blue.spark.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object TestDFUnion {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName(getClass.getName)
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = sparkSession.sparkContext
import sparkSession.implicits._
val rdd = sc.parallelize(Array(("1","up","down","msg content1"),("2","up","down","msg content2"),("3","up","down","msg content3"),("1","up","down","msg content1")))
rdd.foreach(println)
val df1= rdd.toDF("col1","col2","col3","col4")
df1.union(df1).show()
val df2 = df1.withColumnRenamed("col2","flag")
val df3 = df1.withColumnRenamed("col3","flag")
df2.union(df3).show()
val df5 = df1.withColumnRenamed("col2","flag").drop("col3")
val df6 = df1.withColumnRenamed("col3","flag").drop("col2")
df5.union(df6).show()
sc.stop()
}
}
这里我们的运行结果如下:
(3,up,down,msg content3)
(1,up,down,msg content1)
(2,up,down,msg content2)
(1,up,down,msg content1)
+----+----+----+------------+
|col1|col2|col3| col4|
+----+----+----+------------+
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
+----+----+----+------------+
+----+----+----+------------+
|col1|flag|col3| col4|
+----+----+----+------------+
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
+----+----+----+------------+
+----+----+------------+
|col1|flag| col4|
+----+----+------------+
| 1| up|msg content1|
| 2| up|msg content2|
| 3| up|msg content3|
| 1| up|msg content1|
| 1|down|msg content1|
| 2|down|msg content2|
| 3|down|msg content3|
| 1|down|msg content1|
+----+----+------------+
这里我们发现了几个点:
- dataframe中的union并没有去重复的功能(参考df1.union(df1)的结果),实际上 更像是union all操作
- 对于不同的列名,union并不是把列名相同的放到一起(参考df2.union(df3)的结果)
- 对于需要union的部分,需要让所有列的列名完全一致(参考df5.union(df6)的结果),才能union出正确的结果(如果需要去重还需要单独考虑)