避免使用GroupByKey
我們看一下兩種計算word counts 的方法,一個使用reduceByKey,另一個使用 groupByKey:
val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordPairsRDD .reduceByKey(_ + _) .collect() val wordCountsWithGroup = wordPairsRDD .groupByKey() .map(t => (t._1, t._2.sum)) .collect()
以上兩個函數都會產生正確的結果,reduceByKey的例子在大型數據集上工作的效率會更高。因為Spark知道:在shuffle data之前,它可以根據key, 在每個partition上,對輸出數據在本地做combine。
下圖描述了reduceByKey的執行過程。值得注意的是,在shuffle 數據之前,同一個機器上具有相同key的item會先在本地combine(使用的combine函數是傳遞給reduceByKey的lambda 函數)。然后這個lambda 函數會再次在執行shuffle后的每個分區上被調用,以產生最終的結果。
而在groupByKey中,所有的key-value對被先shuffle到下游RDD分區中。這會導致很多不必要的網絡數據傳輸。
在決定將一個key-value對shuffle到哪個機器上時,spark會key-value對中的key調用一個partitioning 函數,以決定分到的目標機器。在shuffle時,若是shuffle的數據(由於內存大小限制)無法全部放入到一個executor中,則Spark會將數據spill到disk。但是,在flush數據到disk時,一次只flush一個key(對應的key-value pairs 數據):所以如果單個key對應的key-value pairs 數據超出了executor可用的memory,則會拋出OOM異常。在較新版的Spark中會處理此異常並讓job可以繼續執行,但是仍需要盡量避免此類現象:當spark需要spill到磁盤時,spark性能會受到顯著影響。
所以在非常大的數據集上計算時,對於reduceByKey與groupByKey來說,它們所需要傳輸的shuffle數據是有顯著不同的。
而在小型數據集上進行測試時(仍使用word count的例子),從測試結果來看,groupByKey的表現要優於reduceByKey。拋開shuffle階段來看,reduceByKey對內存率會更高於groupByKey,所以相對會報出更多內存不足的情況。若是需要使用reduceByKey,則需要給executor 更多內存在本地做計算。
相對於groupByKey,除了reduceByKey,下面的函數也會是更好的選擇:
- combineByKey:可以用於combine元素,用於返回與輸入類型不同類型的值
- foldByKey:初始化一個“zero value”,然后對每個Key的值做聚合操作
接下來詳細介紹一下這兩個函數。
combineByKey
我們先看一下combineByKey的定義:
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }
可以看到此方法調用的是 combineByKeyWithClassTag:
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) }
繼續查看下一層調用:
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]
查看reduceByKey代碼,可以發現它最終調用的也是combineByKeyWithClassTag 方法:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
從combineByKeyWithClassTag方法定義來看,第一個參數是提供用戶自定義的類型,用於將輸入的<K,V> 中的 V 轉化為用戶指定類型,第二個參數用於merge V 的值到 C(用戶定義類型),第三個參數用於將 C 的值 combine 為一個單值。這里可以看到默認是會在map端做combine,所以默認combineByKey與reduceByKey都是會在map端先做combine操作。
但是對於 groupByKey來說:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
可以看到,groupByKey雖然最終調用的也是combineByKeyWithClassTag 方法,但是並不會在map端執行Combine操作(mapSideCombine為false)。
下面我們寫一個combineByKey求解平均數的例子:
type ScoreCollector = (Int, Double) type PersonScores = (String, (Int, Double))
val initialScores = Array(("Alice", 90.0), ("Bob", 100.0), ("Tom", 93.0), ("Alice", 95.0), ("Bob", 70.0), ("Jack", 98.0)) val scoreData = sc.parallelize(initialScores).cache() val createScoreCombiner = (score: Double) => (1, score)
val scoreMerge = (scorecollector: ScoreCollector, score: Double) => (scorecollector._1 +1, scorecollector._2 + score) val scoreCombine = (scorecollector1: ScoreCollector, scorecollector2: ScoreCollector) => (scorecollector1._1 + scorecollector2._1, scorecollector1._2 + scorecollector2._2) scoreData.combineByKey( createScoreCombiner, scoreMerge, scoreCombine ).map( {pscore: PersonScores => (pscore._1, pscore._2._2 / pscore._2._1)}).collect
輸出為: Array[(String, Double)] = Array((Tom,93.0), (Alice,92.5), (Bob,85.0), (Jack,98.0))
可以看到,首先原類型為(String, Double),然后我們通過combineByKey的第一個參數,將其轉化為(Int, Double) 形式,用於統計次數與分數。接下來第二個參數用於merge,將同樣key條目出現的次數、以及分數相加。最后第三個參數用於做combine,對每個key,求得的分數求總和,然后除以次數,求得平均值。
這里可以看出 combineByKey與reduceByKey的區別是:combineByKey的可以返回與輸入數據類型不一樣的輸出。
foldByKey
foldByKey 是初始化一個“zero value“,然后對key的value值做聚合操作。例如:
val initialScores = Array(("Alice", 90.0), ("Bob", 100.0), ("Tom", 93.0), ("Alice", 95.0), ("Bob", 70.0), ("Jack", 98.0)) val scoreData = sc.parallelize(initialScores).cache() scoreData.foldByKey(0)(_+_).collect
輸出為: Array[(String, Double)] = Array((Tom,93.0), (Alice,185.0), (Bob,170.0), (Jack,98.0))
可以看到,這里給出的“zero value“為0,在執行計算時,會先將所有key的value值與”zero value“做一次加法操作(由_+_定義),然后再對所有key-pair做加法操作。所以若是此時使用:
scoreData.foldByKey(1)(_+_).collect
則輸出為:Array[(String, Double)] = Array((Tom,94.0), (Alice,187.0), (Bob,172.0), (Jack,99.0))
下面是 foldByKey的源碼:
def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }
可以看到它與reduceByKey和combineByKye類似,最終調用的也是combineByKeyWithClassTag 方法,且未覆蓋mapSideCombine 的值,所以默認也會在map端進行combine操作。
所以在大型數據集中,為了減少shuffle的數據量,相對於groupByKey來說,使用reduceByKey、combineByKey以及foldByKey 會是更好的選擇。
References:
[2] http://codingjunkie.net/spark-combine-by-key/