repartition導致的廣播失敗,關於錯誤Failed to get broadcast_544_piece0 of broadcast_544


今天一個生產環境任務出現了性能問題,,經過仔細檢查發現是在一個join操作時,原設定廣播右表數據廣播失敗,導致后續步驟進行緩慢,,報錯信息

java.io.IOException: org.apache.spark.SparkException:Failed to get broadcast_544_piece0 of broadcast_544

 源代碼大概是這個樣子(變量全部用xx、yy代替了,不影響整個結構)

    val Site = draftedSite.join(broadcast(toSite), Seq("joinCon"))
      .withColumn("xxx", distanceUDF($"yy", $"yy", $"yy", $"yy"))
      .withColumn("xxx", defineSiteDistanceUDF($"yy", $"yy", $"yy", $"yy"))
      .filter("xx> 0 and xx< yy")
      .withColumn("deleteSite", expr(
        """
          |case
          |when xx!= xx then if (xx< xx, xx, xx)
          |when xx!= xx then if(xx< xx, xx, xx)
          |else if(xx> xx, xx, xx)
          |end
        """.stripMargin)).repartition(xx).cache()

 

一開始查詢網上,大致都是一種說法,類似https://issues.apache.org/jira/browse/SPARK-5594中的sparkContect中的殘留信息數據導致不成功,這明顯不是我這個問題,我每次都是新起動一個sparkContect的。

后來公司的大神看了這段代碼之后,指出 可能是repartition導致的廣播失敗,去掉repartition(xx),之后任務成功執行。

    在key值不夠的情況下,強制repartition可能會導致生成一部分空分區,空分區導致了廣播的失敗。

    另外在數據量不定的情況下不建議使用強制廣播,建議將tosite注冊為臨時表之后cache,有spark根據數據量自動判斷是否廣播

最終修改之后結果如下:

    toSite.createOrReplaceTempView("temp")
    spark.catalog.cacheTable("temp")
    val temp= spark.sql("select * from temp")

val Site = draftedSite.join(toSite, Seq("joinCon"))
      .withColumn("xxx", distanceUDF($"yy", $"yy", $"yy", $"yy"))
      .withColumn("xxx", defineSiteDistanceUDF($"yy", $"yy", $"yy", $"yy"))
      .filter("xx> 0 and xx< yy")
      .withColumn("deleteSite", expr(
        """
          |case
          |when xx!= xx then if (xx< xx, xx, xx)
          |when xx!= xx then if(xx< xx, xx, xx)
          |else if(xx> xx, xx, xx)
          |end
        """.stripMargin)).cache()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM