[Spark] - HashPartitioner & RangePartitioner 區別


Spark RDD的寬依賴中存在Shuffle過程,Spark的Shuffle過程同MapReduce,也依賴於Partitioner數據分區器,Partitioner類的代碼依賴結構主要如下所示:

主要是HashPartitioner和RangePartitioner兩個類,分別用於根據RDD中key的hashcode值進行分區以及根據范圍進行數據分區

一、Partitioner

  Spark中數據分區的主要工具類(數據分區類),主要用於Spark底層RDD的數據重分布的情況中,主要方法兩個,如下:

 

二、HashPartitioner

  Spark中非常重要的一個分區器,也是默認分區器,默認用於90%以上的RDD相關API上;功能:依據RDD中key值的hashCode的值將數據取模后得到該key值對應的下一個RDD的分區id值,支持key值為null的情況,當key為null的時候,返回0;該分區器基本上適合所有RDD數據類型的數據進行分區操作;但是需要注意的是,由於JAVA中數組的hashCode是基於數組對象本身的,不是基於數組內容的,所以如果RDD的key是數組類型,那么可能導致數據內容一致的數據key沒法分配到同一個RDD分區中,這個時候最好自定義數據分區器,采用數組內容進行分區或者將數組的內容轉換為集合。HashPartitioner代碼說明如下:

 

三、RangePartitioner

  SparkCore中除了HashPartitioner分區器外,另外一個比較重要的已經實現的分區器,主要用於RDD的數據排序相關API中,比如sortByKey底層使用的數據分區器就是RangePartitioner分區器;該分區器的實現方式主要是通過兩個步驟來實現的,第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每個分區的最大key值,形成一個Array[KEY]類型的數組變量rangeBounds;第二步:判斷key在rangeBounds中所處的范圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是可以排序的,代碼說明如下:

class RangePartitioner[K: Ordering : ClassTag, V](
                                                   partitions: Int,
                                                   rdd: RDD[_ <: Product2[K, V]],
                                                   private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  // 獲取RDD中key類型數據的排序器
  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      // 如果給定的分區數是一個的情況下,直接返回一個空的集合,表示數據不進行分區
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // 給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每個RDD分區至少抽取20條數據
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      // 計算每個分區抽取的數據量大小, 假設輸入數據每個分區分布的比較均勻
      // 對於超大數據集(分區數超過5萬的)乘以3會讓數據稍微增大一點,對於分區數低於5萬的數據集,每個分區抽取數據量為60條也不算多
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
      // 從rdd中抽取數據,返回值:(總rdd數據量, Array[分區id,當前分區的數據量,當前分區抽取的數據])
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        // 如果總的數據量為0(RDD為空),那么直接返回一個空的數組
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 計算總樣本數量和總記錄數的占比,占比最大為1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存樣本數據的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存數據分布不均衡的分區id(數據量超過fraction比率的分區)
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 計算抽取出來的樣本數據
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以當前分區中的數據量大於之前計算的每個分區的抽象數據大小,那么表示當前分區抽取的數據太少了,該分區數據分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 當前分區不屬於數據分布不均衡的分區,計算占比權重,並添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }

        // 對於數據分布不均衡的RDD分區,重新進行數據抽樣
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 獲取數據分布不均衡的RDD分區,並構成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 隨機種子
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽樣函數API進行數據抽樣
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

        // 將最終的抽樣數據計算出rangeBounds出來
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }

  // 下一個RDD的分區數量是rangeBounds數組中元素數量+ 1個
  def numPartitions: Int = rangeBounds.length + 1

  // 二分查找器,內部使用java中的Arrays類提供的二分查找方法
  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  // 根據RDD的key值返回對應的分區id。從0開始
  def getPartition(key: Any): Int = {
    // 強制轉換key類型為RDD中原本的數據類型
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      // 如果分區數據小於等於128個,那么直接本地循環尋找當前k所屬的分區下標
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      // 如果分區數量大於128個,那么使用二分查找方法尋找對應k所屬的下標;
      // 但是如果k在rangeBounds中沒有出現,實質上返回的是一個負數(范圍)或者是一個超過rangeBounds大小的數(最后一個分區,比所有數據都大)
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition - 1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }

    // 根據數據排序是升序還是降序進行數據的排列,默認為升序
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

 

  其實RangePartitioner的重點是在於構建rangeBounds數組對象,主要步驟是:

  1. 如果分區數量小於2或者rdd中不存在數據的情況下,直接返回一個空的數組,不需要計算range的邊界;如果分區數據大於1的情況下,而且rdd中有數據的情況下,才需要計算數組對象

  2. 計算總體的數據抽樣大小sampleSize,計算規則是:至少每個分區抽取20個數據或者最多1M的數據量

  3. 根據sampleSize和分區數量計算每個分區的數據抽樣樣本數量sampleSizePrePartition

  4. 調用RangePartitioner的sketch函數進行數據抽樣,計算出每個分區的樣本

  5. 計算樣本的整體占比以及數據量過多的數據分區,防止數據傾斜

  6. 對於數據量比較多的RDD分區調用RDD的sample函數API重新進行數據抽取

  7. 將最終的樣本數據通過RangePartitoner的determineBounds函數進行數據排序分配,計算出rangeBounds

  RangePartitioner的sketch函數的作用是對RDD中的數據按照需要的樣本數據量進行數據抽取,主要調用SamplingUtils類的reservoirSampleAndCount方法對每個分區進行數據抽取,抽取后計算出整體所有分區的數據量大小;reservoirSampleAndCount方法的抽取方式是先從迭代器中獲取樣本數量個數據(順序獲取), 然后對剩余的數據進行判斷,替換之前的樣本數據,最終達到數據抽樣的效果

  RangePartitioner的determineBounds函數的作用是根據樣本數據記憶權重大小確定數據邊界, 代碼注釋講解如下:

def determineBounds[K: Ordering : ClassTag](
                                               candidates: ArrayBuffer[(K, Float)],
                                               partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // 按照數據進行數據排序,默認升序排列
    val ordered = candidates.sortBy(_._1)
    // 獲取總的樣本數量大小
    val numCandidates = ordered.size
    // 計算總的權重大小
    val sumWeights = ordered.map(_._2.toDouble).sum
    // 計算步長
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      // 獲取排序后的第i個數據及權重
      val (key, weight) = ordered(i)
      // 累計權重
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        // 權重已經達到一個步長的范圍,計算出一個分區id的值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          // 上一個邊界值為空,或者當前邊界key數據大於上一個邊界的值,那么當前key有效,進行計算
          // 添加當前key到邊界集合中
          bounds += key
          // 累計target步長界限
          target += step
          // 分區數量加1
          j += 1
          // 上一個邊界的值重置為當前邊界的值
          previousBound = Some(key)
        }
      }
      i += 1
    }
    // 返回結果
    bounds.toArray
  }

 

四、總結

  一般而已,使用默認的HashPartitioner即可,RangePartitioner的使用有一定的局限性

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM