spark RDD,reduceByKey vs groupByKey


Spark中有兩個類似的api,分別是reduceByKey和groupByKey。這兩個的功能類似,但底層實現卻有些不同,那么為什么要這樣設計呢?我們來從源碼的角度分析一下。

先看兩者的調用順序(都是使用默認的Partitioner,即defaultPartitioner)

所用spark版本:spark2.1.0

先看reduceByKey

Step1

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

Setp2

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

Setp3

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

姑且不去看方法里面的細節,我們會只要知道最后調用的是combineByKeyWithClassTag這個方法。這個方法有兩個參數我們來重點看一下,

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

首先是partitioner參數,這個即是RDD的分區設置。除了默認的defaultPartitioner,Spark還提供了RangePartitioner和HashPartitioner外,此外用戶也可以自定義partitioner。通過源碼可以發現如果是HashPartitioner的話,那么是會拋出一個錯誤的。

然后是mapSideCombine參數,這個參數正是reduceByKey和groupByKey最大不同的地方,它決定是是否會先在節點上進行一次Combine操作,下面會有更具體的例子來介紹。

然后是groupByKey

Step1

  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

Step2

  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

Setp3

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

結合上面reduceByKey的調用鏈,可以發現最終其實都是調用combineByKeyWithClassTag這個方法的,但調用的參數不同。
reduceByKey的調用

combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

groupByKey的調用

combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

正是兩者不同的調用方式導致了兩個方法的差別,我們分別來看

  • reduceByKey的泛型參數直接是[V],而groupByKey的泛型參數是[CompactBuffer[V]]。這直接導致了reduceByKey和groupByKey的返回值不同,前者是RDD[(K, V)],而后者是RDD[(K, Iterable[V])]

  • 然后就是mapSideCombine=false了,這個mapSideCombine參數的默認是true的。這個值有什么用呢,上面也說了,這個參數的作用是控制要不要在map端進行初步合並(Combine)。可以看看下面具體的例子。

從功能上來說,可以發現ReduceByKey其實就是會在每個節點先進行一次合並的操作,而groupByKey沒有。

這么來看ReduceByKey的性能會比groupByKey好很多,因為有些工作在節點已經處理了。那么groupByKey為什么存在,它的應用場景是什么呢?我也不清楚,如果觀看這篇文章的讀者知道的話不妨在評論里說出來吧。非常感謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM