Spark的五種JOIN策略解析


JOIN操作是非常常見的數據處理操作,Spark作為一個統一的大數據處理引擎,提供了非常豐富的JOIN場景。本文分享將介紹Spark所提供的5種JOIN策略,希望對你有所幫助。本文主要包括以下內容:

  • 影響JOIN操作的因素
  • Spark中JOIN執行的5種策略
  • Spark是如何選擇JOIN策略的

影響JOIN操作的因素

數據集的大小

參與JOIN的數據集的大小會直接影響Join操作的執行效率。同樣,也會影響JOIN機制的選擇和JOIN的執行效率。

JOIN的條件

JOIN的條件會涉及字段之間的邏輯比較。根據JOIN的條件,JOIN可分為兩大類:等值連接非等值連接。等值連接會涉及一個或多個需要同時滿足的相等條件。在兩個輸入數據集的屬性之間應用每個等值條件。當使用其他運算符(運算連接符不為**=**)時,稱之為非等值連接。

JOIN的類型

在輸入數據集的記錄之間應用連接條件之后,JOIN類型會影響JOIN操作的結果。主要有以下幾種JOIN類型:

  • 內連接(Inner Join):僅從輸入數據集中輸出匹配連接條件的記錄。
  • 外連接(Outer Join):又分為左外連接、右外鏈接和全外連接。
  • 半連接(Semi Join):右表只用於過濾左表的數據而不出現在結果集中。
  • 交叉連接(Cross Join):交叉聯接返回左表中的所有行,左表中的每一行與右表中的所有行組合。交叉聯接也稱作笛卡爾積。

Spark中JOIN執行的5種策略

Spark提供了5種JOIN機制來執行具體的JOIN操作。該5種JOIN機制如下所示:

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

Shuffle Hash Join

簡介

當要JOIN的表數據量比較大時,可以選擇Shuffle Hash Join。這樣可以將大表進行按照JOIN的key進行重分區,保證每個相同的JOIN key都發送到同一個分區中。如下圖示:

如上圖所示:Shuffle Hash Join的基本步驟主要有以下兩點:

  • 首先,對於兩張參與JOIN的表,分別按照join key進行重分區,該過程會涉及Shuffle,其目的是將相同join key的數據發送到同一個分區,方便分區內進行join。
  • 其次,對於每個Shuffle之后的分區,會將小表的分區數據構建成一個Hash table,然后根據join key與大表的分區數據記錄進行匹配。

條件與特點

  • 僅支持等值連接,join key不需要排序
  • 支持除了全外連接(full outer joins)之外的所有join類型
  • 需要對小表構建Hash map,屬於內存密集型的操作,如果構建Hash表的一側數據比較大,可能會造成OOM
  • 將參數*spark.sql.join.prefersortmergeJoin (default true)*置為false

Broadcast Hash Join

簡介

也稱之為Map端JOIN。當有一張表較小時,我們通常選擇Broadcast Hash Join,這樣可以避免Shuffle帶來的開銷,從而提高性能。比如事實表與維表進行JOIN時,由於維表的數據通常會很小,所以可以使用Broadcast Hash Join將維表進行Broadcast。這樣可以避免數據的Shuffle(在Spark中Shuffle操作是很耗時的),從而提高JOIN的效率。在進行 Broadcast Join 之前,Spark 需要把處於 Executor 端的數據先發送到 Driver 端,然后 Driver 端再把數據廣播到 Executor 端。如果我們需要廣播的數據比較多,會造成 Driver 端出現 OOM。具體如下圖示:

Broadcast Hash Join主要包括兩個階段:

  • Broadcast階段 :小表被緩存在executor中
  • Hash Join階段:在每個 executor中執行Hash Join

條件與特點

  • 僅支持等值連接,join key不需要排序
  • 支持除了全外連接(full outer joins)之外的所有join類型
  • Broadcast Hash Join相比其他的JOIN機制而言,效率更高。但是,Broadcast Hash Join屬於網絡密集型的操作(數據冗余傳輸),除此之外,需要在Driver端緩存數據,所以當小表的數據量較大時,會出現OOM的情況
  • 被廣播的小表的數據量要小於spark.sql.autoBroadcastJoinThreshold值,默認是10MB(10485760)
  • 被廣播表的大小閾值不能超過8GB,spark2.4源碼如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
  • 基表不能被broadcast,比如左連接時,只能將右表進行廣播。形如:fact_table.join(broadcast(dimension_table),可以不使用broadcast提示,當滿足條件時會自動轉為該JOIN方式。

Sort Merge Join

簡介

該JOIN機制是Spark默認的,可以通過參數spark.sql.join.preferSortMergeJoin進行配置,默認是true,即優先使用Sort Merge Join。一般在兩張大表進行JOIN時,使用該方式。Sort Merge Join可以減少集群中的數據傳輸,該方式不會先加載所有數據的到內存,然后進行hashjoin,但是在JOIN之前需要對join key進行排序。具體圖示:

Sort Merge Join主要包括三個階段:

  • Shuffle Phase : 兩張大表根據Join key進行Shuffle重分區
  • Sort Phase: 每個分區內的數據進行排序
  • Merge Phase: 對來自不同表的排序好的分區數據進行JOIN,通過遍歷元素,連接具有相同Join key值的行來合並數據集

條件與特點

  • 僅支持等值連接
  • 支持所有join類型
  • Join Keys是排序的
  • 參數**spark.sql.join.prefersortmergeJoin (默認true)**設定為true

Cartesian Join

簡介

如果 Spark 中兩張參與 Join 的表沒指定join key(ON 條件)那么會產生 Cartesian product join,這個 Join 得到的結果其實就是兩張行數的乘積。

條件

  • 僅支持內連接
  • 支持等值和不等值連接
  • 開啟參數spark.sql.crossJoin.enabled=true

Broadcast Nested Loop Join

簡介

該方式是在沒有合適的JOIN機制可供選擇時,最終會選擇該種join策略。優先級為:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

在Cartesian 與Broadcast Nested Loop Join之間,如果是內連接,或者非等值連接,則優先選擇Broadcast Nested Loop策略,當時非等值連接並且一張表可以被廣播時,會選擇Cartesian Join。

條件與特點

  • 支持等值和非等值連接
  • 支持所有的JOIN類型,主要優化點如下:
    • 當右外連接時要廣播左表
    • 當左外連接時要廣播右表
    • 當內連接時,要廣播左右兩張表

Spark是如何選擇JOIN策略的

等值連接的情況

有join提示(hints)的情況,按照下面的順序

  • 1.Broadcast Hint:如果join類型支持,則選擇broadcast hash join
  • 2.Sort merge hint:如果join key是排序的,則選擇 sort-merge join
  • 3.shuffle hash hint:如果join類型支持, 選擇 shuffle hash join
  • 4.shuffle replicate NL hint: 如果是內連接,選擇笛卡爾積方式

沒有join提示(hints)的情況,則逐個對照下面的規則

  • 1.如果join類型支持,並且其中一張表能夠被廣播(spark.sql.autoBroadcastJoinThreshold值,默認是10MB),則選擇 broadcast hash join
  • 2.如果參數spark.sql.join.preferSortMergeJoin設定為false,且一張表足夠小(可以構建一個hash map) ,則選擇shuffle hash join
  • 3.如果join keys 是排序的,則選擇sort-merge join
  • 4.如果是內連接,選擇 cartesian join
  • 5.如果可能會發生OOM或者沒有可以選擇的執行策略,則最終選擇broadcast nested loop join

非等值連接情況

有join提示(hints),按照下面的順序

  • 1.broadcast hint:選擇broadcast nested loop join.
  • 2.shuffle replicate NL hint: 如果是內連接,則選擇cartesian product join

沒有join提示(hints),則逐個對照下面的規則

  • 1.如果一張表足夠小(可以被廣播),則選擇 broadcast nested loop join
  • 2.如果是內連接,則選擇cartesian product join
  • 3.如果可能會發生OOM或者沒有可以選擇的執行策略,則最終選擇broadcast nested loop join

join策略選擇的源碼片段

  object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
          getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.BroadcastHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
          getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.ShuffledHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createSortMergeJoin() = {
          if (RowOrdering.isOrderable(leftKeys)) {
            Some(Seq(joins.SortMergeJoinExec(
              leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
          } else {
            None
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

        createBroadcastHashJoin(true)
          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
          .orElse(createShuffleHashJoin(true))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())

    
          if (canBuildLeft(joinType)) BuildLeft else BuildRight
        }

        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
          val maybeBuildSide = if (buildLeft && buildRight) {
            Some(desiredBuildSide)
          } else if (buildLeft) {
            Some(BuildLeft)
          } else if (buildRight) {
            Some(BuildRight)
          } else {
            None
          }

          maybeBuildSide.map { buildSide =>
            Seq(joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), buildSide, joinType, condition))
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }

        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())
      case _ => Nil
    }
  }

總結

本文主要介紹了Spark提供的5種JOIN策略,並對三種比較重要的JOIN策略進行了圖示解析。首先對影響JOIN的因素進行了梳理,然后介紹了5種Spark的JOIN策略,並對每種JOIN策略的具體含義和觸發條件進行了闡述,最后給出了JOIN策略選擇對應的源碼片段。希望本文能夠對你有所幫助。

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM