問題描述:DataFrame的join結果不正確,dataframeA(6000無重復條數據) join dataframeB(220條無重復數據,由dataframeA轉化而來,key值均源於dataframeA) 只有200條數據,丟了20條
問題驗證:
1,查詢丟的20條數據,均無異常,不存在Null,數據不存在空格
2,重新運行算法,丟18條數據,證明丟數據存在一定隨機性
3,簡化問題到最簡模式,代碼如下:
val xxx1= phySiteEvaluationPhySiteKey.select("physitekey").distinct() val xxx2= physitefinal.select("physitekey").distinct() val xxx3 = xxx1.join(xxx2, Seq("physitekey")) val rdd1=xxx1.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd2 =xxx2.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd3=rdd1.join(rdd2) log.info(s"rdd3=${rdd3.count()}") log.info(s"xxx3==${xxx3.count()}")
xxx3和rdd3的結果居然不相等!!違背了spark常識
問題分析:
1,據spark原理可知,DataFrame的底層實現就是RDD,具體實現在Catalyst包類,需要DataFrame=>未解析的邏輯執行計划=>解析邏輯計划=>優化邏輯執行計划=>物理執行計划=>RDD執行
也就是說xxx3的執行計划生成出的RDD執行方案與RDD3結果不一致,因此在這里我打印了xxx3的執行計划,期望有所發現
xxx1.join(xxx2, Seq("physitekey")).explain()
執行計划長達1000多行,涉及內部實現因項目保密需要無法展示。
2,執行計划超長是因為phySiteEvaluationPhySiteKey、physitefinal均為迭代計算結果,不是直接來源於輸入表
3,依據執行計划,我猜測Spark在邏輯計划優化的時候出錯,導致結果不符合預期
4,驗證方案:為xxx1、xxx2的取值加上checkpoint,斬斷血緣依賴,重新查看執行計划是否符合預期
val xxx1= phySiteEvaluationPhySiteKey.select("physitekey").distinct().checkpoint() val xxx2= physitefinal.select("physitekey").distinct().checkpoint() xxx1.join(xxx2, Seq("physitekey")).explain() val xxx3 = xxx1.join(xxx2, Seq("physitekey")) val rdd1=xxx1.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd2 =xxx2.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r)) val rdd3=rdd1.join(rdd2) log.info(s"rdd3=${rdd3.count()}") log.info(s"xxx3==${xxx3.count()}")
結果執行計划如下:
== Physical Plan == *Project [physitekey#1648] +- *SortMergeJoin [physitekey#1648], [physitekey#43875], Inner :- *Sort [physitekey#1648 ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 1135069612) hashpartitioning(physitekey#1648, 200), coordinator[target post-shuffle partition size: 67108864] : +- *Filter isnotnull(physitekey#1648) : +- Scan ExistingRDD[physitekey#1648] +- *Sort [physitekey#43875 ASC NULLS FIRST], false, 0 +- Exchange(coordinator id: 1135069612) hashpartitioning(physitekey#43875, 200), coordinator[target post-shuffle partition size: 67108864] +- *Filter isnotnull(physitekey#43875) +- Scan ExistingRDD[physitekey#43875]
沒有問題,RDD3與XXX3結果相等,正確了。
確認問題出在Spark中DataFrame在持有超長血緣關系時轉化為RDD執行出錯,具體錯誤有機會下次分析,應當是僅在一定特殊情況下才會暴露的BUG
5、問題反思
開源組件也是可能存在BUG的,應當在使用時盡量使用其最常見的用法,列如在本問題中,如果迭代計算之后及時斬斷血緣依賴,就不會出現問題