在上一篇文章中 Spark源碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。
RDD重新分區的手段與DataFrame類似,有repartition、coalesce兩個方法
repartition
- def repartition(numPartitions: Int): JavaRDD[T]
1 /** 2 * Return a new RDD that has exactly numPartitions partitions. 3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions)
返回一個新的RDD,該RDD恰好具有numPartitions分區。
repartition這個方法可以增加或減少此RDD中的並行度。在內部,這使用shuffle來重新分配數據。
如果要減少RDD中的分區數量,請考慮使用“coalesce”,這樣可以避免執行shuffle。
這個方法在org.apache.spark.api.java.JavaRDD里面
真正調用的是org.apache.spark.rdd.RDD里面的repartition
1 /** 2 * Return a new RDD that has exactly numPartitions partitions. 3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 11 coalesce(numPartitions, shuffle = true) 12 }
從上面可以看出,在此處還不是方法最終的,還調用了coalesce(numPartitions, shuffle = true) 這個方法,這個方法實現如下:
1 /** 2 * Return a new RDD that is reduced into `numPartitions` partitions. 3 * 4 * This results in a narrow dependency, e.g. if you go from 1000 partitions 5 * to 100 partitions, there will not be a shuffle, instead each of the 100 6 * new partitions will claim 10 of the current partitions. 7 * 8 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, 9 * this may result in your computation taking place on fewer nodes than 10 * you like (e.g. one node in the case of numPartitions = 1). To avoid this, 11 * you can pass shuffle = true. This will add a shuffle step, but means the 12 * current upstream partitions will be executed in parallel (per whatever 13 * the current partitioning is). 14 * 15 * Note: With shuffle = true, you can actually coalesce to a larger number 16 * of partitions. This is useful if you have a small number of partitions, 17 * say 100, potentially with a few partitions being abnormally large. Calling 18 * coalesce(1000, shuffle = true) will result in 1000 partitions with the 19 * data distributed using a hash partitioner. 20 */ 21 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) 22 : RDD[T] = withScope { 23 if (shuffle) { 24 /** Distributes elements evenly across output partitions, starting from a random partition. 注意,鍵的哈希代碼就是鍵本身。HashPartitioner將用分區的總數對它進行修改。*/ 25 val distributePartition = (index: Int, items: Iterator[T]) => { 26 var position = (new Random(index)).nextInt(numPartitions) 27 items.map { t => 28 // Note that the hash code of the key will just be the key itself. The HashPartitioner 29 // will mod it with the number of total partitions. 30 position = position + 1 31 (position, t) 32 } 33 } : Iterator[(Int, T)] 34 35 // include a shuffle step so that our upstream tasks are still distributed 包含一個shuffle步驟,以便我們的上游任務仍然是分布式的。 36 new CoalescedRDD( 37 new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), 38 new HashPartitioner(numPartitions)), 39 numPartitions).values 40 } else { 41 new CoalescedRDD(this, numPartitions) 42 } 43 }
這個方法返回一個新的RDD,它被簡化為"numpartition"分區。
這導致了一個狹窄的依賴關系,例如,如果從1000個分區到100個分區,將不會有一個shuffle,而是100個新分區中的每一個都會聲明10個當前分區。
然而,如果你正在做一個劇烈的合並,例如當numPartitions = 1時,這可能導致您的計算發生在比您期待的更少的節點上(例如numpartition=1的情況下只有一個節點),即可能導致並行度下降,無法充分利用分布式環境的優勢。
為了避免這種情況,可以傳遞shuffle = true。這將添加一個shuffle步驟,但意味着當前的上游分區將並行執行(無論當前分區是什么)。
注意:使用shuffle = true,您實際上可以合並到更多的分區。
如果您有少量的分區(比如100個),可能有一些分區非常大,那么這是非常有用的,調用coalesce(1000, shuffle = true)將產生1000個分區,使用散列分區器分發數據。
從上面的源碼可以看到,def repartition(numPartitions: Int): JavaRDD[T] 其實調用的是coalesce(numPartitions, shuffle = true)這個方法,而且這個方法產生shuffle操作,分區的規則采用的個是哈希分區。
coalesce
- def coalesce(numPartitions: Int): JavaRDD[T]
1 2 /** 3 * Return a new RDD that is reduced into `numPartitions` partitions. 4 */ 5 def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
而這個方法調用的是org.apache.spark.rdd.RDD里面的def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]。
這個方法和上面repartitions的是一樣的,只不過此處的shuffle參數是默認的false。
真正調用的是new CoalescedRDD(this, numPartitions)此時不會觸發shuffle。
- def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]
1 /** 2 * Return a new RDD that is reduced into `numPartitions` partitions. 3 */ 4 def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = 5 rdd.coalesce(numPartitions, shuffle)
這個和上面的coalesce(numPartitions: Int)類似,只是此處的shuffle參數不再是默認的false,而是自己指定的了,當shuffle為true是會觸發shuffle,反之不會。
演示
1 scala> var rdd1=sc.textFile("hdfs://file.txt") 2 rdd1: org.apache.spark.rdd.RDD[String] = hdfs://file.txt MapPartitionsRDD[20] at textFile at <console>:27 3 4 //默認分區數量為177 5 scala> rdd1.partitions.size 6 res12: Int = 177 7 8 //調用coalesce(10) 減少分區數量 9 scala> var rdd2 = rdd1.coalesce(10) 10 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[21] at coalesce at <console>:29 11 12 //分區數量減少到10個 13 scala> rdd2.partitions.size 14 res13: Int = 10 15 16 //直接增加分區數量到200 17 scala> var rdd2 = rdd1.coalesce(200) 18 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[22] at coalesce at <console>:29 19 20 //方法沒有生效 21 scala> rdd2.partitions.size 22 res14: Int = 177 23 24 //將shuffle設置為true,增加分區到200 25 scala> var rdd2 = rdd1.coalesce(200,true) 26 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at coalesce at <console>:29 27 28 //重新分區生效 29 scala> rdd2.partitions.size 30 res15: Int = 200 31 32 ------------------------------------------------------------------------------------------------ 33 //對於repartition增加分區到200 34 scala> var rdd2 = rdd1.repartition 直接增加o(200) 35 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at repartition at <console>:29 36 37 //增加分區生效 38 scala> rdd2.partitions.size 39 res16: Int = 200 40 41 //對於repartition減少分區到10 42 scala> var rdd2 = rdd1.repartition(10) 43 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[34] at repartition at <console>:29 44 45 //減少分區生效 46 scala> rdd2.partitions.size 47 res17: Int = 10
總結
- coalesce(numPartitions: Int)
當新的分區數小於原來的分區時,分區生效切並且不會觸發shuffle;
當新的分區數大於原來的分區時,分區無效還是原來的數量。
- coalesce(numPartitions: Int, shuffle: Boolean)
當shuffle為true時候,無論新的分區比原來的大還是小,分區均生效,並且觸發shuffle操作,此時等同於repartition(numPartitions: Int);
當shuffle為false時候,等同於coalesce(numPartitions: Int)。
- def repartition(numPartitions: Int)
無論新的分區比原來的大還是小,分區均生效,並且觸發shuffle操作;
很明顯repartition就是當shuffle為true時候的coalesce(numPartitions: Int, shuffle: Boolean)方法。
此為本人學習工作總結,轉載請注明出處!!!!