【原創】大叔問題定位分享(11)Spark中對大表子查詢加limit為什么會報Broadcast超時錯誤


當兩個表需要join時,如果一個是大表,一個是小表,正常的map-reduce流程需要shuffle,這會導致大表數據在節點間網絡傳輸,常見的優化方式是將小表讀到內存中並廣播到大表處理,避免shuffle+reduce;

在hive中叫mapjoin(map-side join),配置為 hive.auto.convert.join

在spark中叫BroadcastHashJoin (broadcast hash join)

Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold.

Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.

有幾種方式可以觸發:

1)sql hint (從spark 2.3版本開始支持)

SELECT /*+ MAPJOIN(b) */ ...

SELECT /*+ BROADCASTJOIN(b) */ ...

SELECT /*+ BROADCAST(b) */ ...

2)broadcast function:DataFrame.broadcast

testTable3= testTable1.join(broadcast(testTable2), Seq("id"), "right_outer")

3)自動優化

org.apache.spark.sql.execution.SparkStrategies.JoinSelection

    private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.statistics.isBroadcastable ||

        (plan.statistics.sizeInBytes >= 0 &&

          plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)

    }

 

 

例如:

spark-sql> explain select * from big_table1 a, (select * from big_table2 limit 10) b where a.id = b.id;

18/09/17 18:14:09 339 WARN Utils66: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

== Physical Plan ==

BroadcastHashJoin [id#5], [id#14], Inner, BuildRight

:- *Filter isnotnull(id#5)

:  +- HiveTableScan [name#4, id#5], MetastoreRelation big_table1

+- BroadcastExchange HashedRelationBroadcastMode(List(input[6, string, false]))

   +- Filter isnotnull(id#14)

      +- GlobalLimit 10

         +- Exchange SinglePartition

            +- LocalLimit 10

               +- HiveTableScan [id#14, ... 187 more fields], MetastoreRelation big_table2

Time taken: 4.216 seconds, Fetched 1 row(s)

BroadcastExchange 執行過程為

org.apache.spark.sql.execution.exchange.BroadcastExchangeExec

  override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {

    ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout)

      .asInstanceOf[broadcast.Broadcast[T]]

  }

其中timeout是指spark.sql.broadcastTimeout,默認300s

  private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {

    // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.

    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)

    Future {

      // This will run in another thread. Set the execution id so that we can connect these jobs

      // with the correct execution.

      SQLExecution.withExecutionId(sparkContext, executionId) {

        try {

          val beforeCollect = System.nanoTime()

          // Note that we use .executeCollect() because we don't want to convert data to Scala types

          val input: Array[InternalRow] = child.executeCollect()

          if (input.length >= 512000000) {

            throw new SparkException(

              s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")

          }

          val beforeBuild = System.nanoTime()

          longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000

          val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum

          longMetric("dataSize") += dataSize

          if (dataSize >= (8L << 30)) {

            throw new SparkException(

              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")

          }

 

          // Construct and broadcast the relation.

          val relation = mode.transform(input)

          val beforeBroadcast = System.nanoTime()

          longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000

 

          val broadcasted = sparkContext.broadcast(relation)

          longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000

 

          SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)

          broadcasted

對一個表broadcast執行過程為首先計算然后collect,然后通過SparkContext broadcast出去,並且執行過程為線程異步執行,超時時間為spark.sql.broadcastTimeout;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM