今天一個生產環境任務出現了性能問題,,經過仔細檢查發現是在一個join操作時,原設定廣播右表數據廣播失敗,導致后續步驟進行緩慢,,報錯信息
java.io.IOException: org.apache.spark.SparkException:Failed to get broadcast_544_piece0 of broadcast_544
源代碼大概是這個樣子(變量全部用xx、yy代替了,不影響整個結構)
val Site = draftedSite.join(broadcast(toSite), Seq("joinCon")) .withColumn("xxx", distanceUDF($"yy", $"yy", $"yy", $"yy")) .withColumn("xxx", defineSiteDistanceUDF($"yy", $"yy", $"yy", $"yy")) .filter("xx> 0 and xx< yy") .withColumn("deleteSite", expr( """ |case |when xx!= xx then if (xx< xx, xx, xx) |when xx!= xx then if(xx< xx, xx, xx) |else if(xx> xx, xx, xx) |end """.stripMargin)).repartition(xx).cache()
一開始查詢網上,大致都是一種說法,類似https://issues.apache.org/jira/browse/SPARK-5594中的sparkContect中的殘留信息數據導致不成功,這明顯不是我這個問題,我每次都是新起動一個sparkContect的。
后來公司的大神看了這段代碼之后,指出 可能是repartition導致的廣播失敗,去掉repartition(xx),之后任務成功執行。
在key值不夠的情況下,強制repartition可能會導致生成一部分空分區,空分區導致了廣播的失敗。
另外在數據量不定的情況下不建議使用強制廣播,建議將tosite注冊為臨時表之后cache,有spark根據數據量自動判斷是否廣播
最終修改之后結果如下:
toSite.createOrReplaceTempView("temp") spark.catalog.cacheTable("temp") val temp= spark.sql("select * from temp") val Site = draftedSite.join(toSite, Seq("joinCon")) .withColumn("xxx", distanceUDF($"yy", $"yy", $"yy", $"yy")) .withColumn("xxx", defineSiteDistanceUDF($"yy", $"yy", $"yy", $"yy")) .filter("xx> 0 and xx< yy") .withColumn("deleteSite", expr( """ |case |when xx!= xx then if (xx< xx, xx, xx) |when xx!= xx then if(xx< xx, xx, xx) |else if(xx> xx, xx, xx) |end """.stripMargin)).cache()