Spark解決SQL和RDDjoin結果不一致問題(工作實錄)


問題描述:DataFrame的join結果不正確,dataframeA(6000無重復條數據) join dataframeB(220條無重復數據,由dataframeA轉化而來,key值均源於dataframeA) 只有200條數據,丟了20條

問題驗證:

1,查詢丟的20條數據,均無異常,不存在Null,數據不存在空格

2,重新運行算法,丟18條數據,證明丟數據存在一定隨機性

3,簡化問題到最簡模式,代碼如下:

    val xxx1= phySiteEvaluationPhySiteKey.select("physitekey").distinct()
    val xxx2= physitefinal.select("physitekey").distinct()
    val xxx3 = xxx1.join(xxx2, Seq("physitekey"))

    val rdd1=xxx1.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r))
    val rdd2 =xxx2.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r))
    val rdd3=rdd1.join(rdd2)

    log.info(s"rdd3=${rdd3.count()}")
    log.info(s"xxx3==${xxx3.count()}")

xxx3和rdd3的結果居然不相等!!違背了spark常識

問題分析:

1,據spark原理可知,DataFrame的底層實現就是RDD,具體實現在Catalyst包類,需要DataFrame=>未解析的邏輯執行計划=>解析邏輯計划=>優化邏輯執行計划=>物理執行計划=>RDD執行

也就是說xxx3的執行計划生成出的RDD執行方案與RDD3結果不一致,因此在這里我打印了xxx3的執行計划,期望有所發現

    xxx1.join(xxx2, Seq("physitekey")).explain()

執行計划長達1000多行,涉及內部實現因項目保密需要無法展示。

2,執行計划超長是因為phySiteEvaluationPhySiteKey、physitefinal均為迭代計算結果,不是直接來源於輸入表

3,依據執行計划,我猜測Spark在邏輯計划優化的時候出錯,導致結果不符合預期

4,驗證方案:為xxx1、xxx2的取值加上checkpoint,斬斷血緣依賴,重新查看執行計划是否符合預期

    val xxx1= phySiteEvaluationPhySiteKey.select("physitekey").distinct().checkpoint()
    val xxx2= physitefinal.select("physitekey").distinct().checkpoint()
    xxx1.join(xxx2, Seq("physitekey")).explain()
    val xxx3 = xxx1.join(xxx2, Seq("physitekey"))

    val rdd1=xxx1.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r))
    val rdd2 =xxx2.rdd.map(r=>r.getAs[String]("physitekey")).map(r=>(r,r))
    val rdd3=rdd1.join(rdd2)

    log.info(s"rdd3=${rdd3.count()}")
    log.info(s"xxx3==${xxx3.count()}")

結果執行計划如下:

== Physical Plan ==
*Project [physitekey#1648]
+- *SortMergeJoin [physitekey#1648], [physitekey#43875], Inner
   :- *Sort [physitekey#1648 ASC NULLS FIRST], false, 0
   :  +- Exchange(coordinator id: 1135069612) hashpartitioning(physitekey#1648, 200), coordinator[target post-shuffle partition size: 67108864]
   :     +- *Filter isnotnull(physitekey#1648)
   :        +- Scan ExistingRDD[physitekey#1648]
   +- *Sort [physitekey#43875 ASC NULLS FIRST], false, 0
      +- Exchange(coordinator id: 1135069612) hashpartitioning(physitekey#43875, 200), coordinator[target post-shuffle partition size: 67108864]
         +- *Filter isnotnull(physitekey#43875)
            +- Scan ExistingRDD[physitekey#43875]

沒有問題,RDD3與XXX3結果相等,正確了。

確認問題出在Spark中DataFrame在持有超長血緣關系時轉化為RDD執行出錯,具體錯誤有機會下次分析,應當是僅在一定特殊情況下才會暴露的BUG

5、問題反思

開源組件也是可能存在BUG的,應當在使用時盡量使用其最常見的用法,列如在本問題中,如果迭代計算之后及時斬斷血緣依賴,就不會出現問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM