Spark源碼分析 -- PairRDD


和一般RDD最大的不同就是有兩個泛型參數, [K, V]表示pair的概念
關鍵的function是, combineByKey, 所有pair相關操作的抽象

combine是這樣的操作, Turns an RDD[(K, V)] into a result of type RDD[(K, C)]
其中C有可能只是簡單類型, 但經常是seq, 比如(Int, Int) to (Int, Seq[Int])

下面來看看combineByKey的參數,
首先需要用戶自定義一些操作,
createCombiner: V => C, C不存在的情況下, 比如通過V創建seq C
mergeValue: (C, V) => C, 當C已經存在的情況下, 需要merge, 比如把item V加到seq C中, 或者疊加 
mergeCombiners: (C, C) => C,  合並兩個C
partitioner: Partitioner, Shuffle時需要的Partitioner
mapSideCombine: Boolean = true, 為了減小傳輸量, 很多combine可以在map端先做, 比如疊加, 可以先在一個partition中把所有相同的key的value疊加, 再shuffle
serializerClass: String = null, 傳輸需要序列化, 用戶可以自定義序列化類

/**
 * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
 * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
 */
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
  extends Logging
  with SparkHadoopMapReduceUtil
  with Serializable {

  /**
   * Generic function to combine the elements for each key using a custom set of aggregation
   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
   * Note that V and C can be different -- for example, one might group an RDD of type
   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
   *
   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
   * - `mergeCombiners`, to combine two C's into a single one.
   *
   * In addition, users can control the partitioning of the output RDD, and whether to perform
   * map-side aggregation (if a mapper can produce multiple items with the same key).
   */
  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializerClass: String = null): RDD[(K, C)] = {
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) //1.Aggregator
    //RDD本身的partitioner和傳入的partitioner相等時, 即不需要重新shuffle, 直接map即可
    if (self.partitioner == Some(partitioner)) {  
      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //2. mapPartitions, map端直接調用combineValuesByKey
    } else if (mapSideCombine) { //如果需要mapSideCombine
      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //先在partition內部做mapSideCombine
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass) //3. ShuffledRDD, 進行shuffle
      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true) //Shuffle完后, 在reduce端再做一次combine, 使用combineCombinersByKey
    } else {
      // Don't apply map-side combiner.和上面的區別就是不做mapSideCombine
      // A sanity check to make sure mergeCombiners is not defined.
      assert(mergeCombiners == null)
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
    }
  }
}

 

1. Aggregator

在combineByKey中, 首先創建Aggregator, 其實在Aggregator中封裝了兩個函數,
combineValuesByKey, 用於處理將V加入到C的case, 比如加入一個item到一個seq里面, 用於map端
combineCombinersByKey, 用於處理兩個C合並, 比如兩個seq合並, 用於reduce端

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) {

  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
    val combiners = new JHashMap[K, C]
    for (kv <- iter) {
      val oldC = combiners.get(kv._1)
      if (oldC == null) {
        combiners.put(kv._1, createCombiner(kv._2))
      } else {
        combiners.put(kv._1, mergeValue(oldC, kv._2))
      }
    }
    combiners.iterator
  }

  def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
    val combiners = new JHashMap[K, C]
    iter.foreach { case(k, c) =>
      val oldC = combiners.get(k)
      if (oldC == null) {
        combiners.put(k, c)
      } else {
        combiners.put(k, mergeCombiners(oldC, c))
      }
    }
    combiners.iterator
  }
}

 

2. mapPartitions

mapPartitions其實就是使用MapPartitionsRDD
做的事情就是對當前partition執行map函數f, Iterator[T] => Iterator[U]
比如, 執行combineValuesByKey: Iterator[_ <: Product2[K, V]] to Iterator[(K, C)]

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   */
  def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] =
    new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
 
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
    prev: RDD[T],
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner =
    if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext) =
    f(firstParent[T].iterator(split, context)) // 對於map,就是調用f

 

3. ShuffledRDD

Shuffle實際上是由系統的shuffleFetcher完成的, Spark的抽象封裝非常的好
所以在這里看不到Shuffle具體是怎么樣做的, 這個需要分析到shuffleFetcher時候才能看到 
因為每個shuffle是有一個全局的shuffleid的
所以在compute里面, 你只是看到用BlockStoreShuffleFetcher根據shuffleid和partitionid直接fetch到shuffle過后的數據

/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
* @tparam K the key class.
* @tparam V the value class.
*/
class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
    @transient var prev: RDD[P],
    part: Partitioner)
  extends RDD[P](prev.context, Nil) {
  override val partitioner = Some(part)  
  //ShuffleRDD會進行repartition, 所以從Partitioner中取出新的part數目  
  //並用Array.tabulate動態創建相應個數的ShuffledRDDPartition
  override def getPartitions: Array[Partition] = {
    Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i)) 
  }
  
  override def compute(split: Partition, context: TaskContext): Iterator[P] = {
    val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
    SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics,
      SparkEnv.get.serializerManager.get(serializerClass))
  }
}

ShuffledRDDPartition沒啥區別, 一樣只是記錄id

private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  override val index = idx
  override def hashCode(): Int = idx
}

 

下面再來看一下, 如果使用combineByKey來實現其他的操作的,

group

group是比較典型的例子, (Int, Int) to (Int, Seq[Int])
由於groupByKey不使用map side combine, 因為這樣也無法減少傳輸空間, 所以不需要實現mergeCombiners

  /**
   * Group the values for each key in the RDD into a single sequence. Allows controlling the
   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
   */
  def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    def createCombiner(v: V) = ArrayBuffer(v) //創建seq
    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v //添加item到seq
    val bufs = combineByKey[ArrayBuffer[V]](
      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
    bufs.asInstanceOf[RDD[(K, Seq[V])]]
  }

 

reduce

reduce是更簡單的一種情況, 只是兩個值合並成一個值, (Int, Int V) to (Int, Int C), 比如疊加
所以createCombiner很簡單, 就是直接返回v
而mergeValue和mergeCombiners邏輯是相同的, 沒有區別

  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM