關於問題描述: spark中的union導致數據不符合預期,出現數據錯位的情況
package com.blue.spark.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object TestDFUnion {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName(getClass.getName)
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = sparkSession.sparkContext
import sparkSession.implicits._
val rdd = sc.parallelize(Array(("1","up","down","msg content1"),("2","up","down","msg content2"),("3","up","down","msg content3"),("1","up","down","msg content1")))
rdd.foreach(println)
val df1= rdd.toDF("col1","col2","col3","col4")
df1.union(df1).show()
val df2 = df1.withColumnRenamed("col2","flag")
val df3 = df1.withColumnRenamed("col3","flag")
df2.union(df3).show()
val df5 = df1.withColumnRenamed("col2","flag").drop("col3")
val df6 = df1.withColumnRenamed("col3","flag").drop("col2")
df5.union(df6).show()
sc.stop()
}
}
這里我們的運行結果如下:
(3,up,down,msg content3)
(1,up,down,msg content1)
(2,up,down,msg content2)
(1,up,down,msg content1)
+----+----+----+------------+
|col1|col2|col3| col4|
+----+----+----+------------+
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
+----+----+----+------------+
+----+----+----+------------+
|col1|flag|col3| col4|
+----+----+----+------------+
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
| 1| up|down|msg content1|
| 2| up|down|msg content2|
| 3| up|down|msg content3|
| 1| up|down|msg content1|
+----+----+----+------------+
+----+----+------------+
|col1|flag| col4|
+----+----+------------+
| 1| up|msg content1|
| 2| up|msg content2|
| 3| up|msg content3|
| 1| up|msg content1|
| 1|down|msg content1|
| 2|down|msg content2|
| 3|down|msg content3|
| 1|down|msg content1|
+----+----+------------+
這里我們發現了幾個點:
- dataframe中的union並沒有去重復的功能(參考df1.union(df1)的結果),實際上 更像是union all操作
- 對於不同的列名,union並不是把列名相同的放到一起(參考df2.union(df3)的結果)
- 對於需要union的部分,需要讓所有列的列名完全一致(參考df5.union(df6)的結果),才能union出正確的結果(如果需要去重還需要單獨考慮)