Spark源碼系列:RDD repartition、coalesce 對比


在上一篇文章中 Spark源碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。

RDD重新分區的手段與DataFrame類似,有repartition、coalesce兩個方法

repartition

  • def repartition(numPartitions: Int): JavaRDD[T]
 1   /**
 2    * Return a new RDD that has exactly numPartitions partitions.
 3    *
 4    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
 5    * a shuffle to redistribute data.
 6    *
 7    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
 8    * which can avoid performing a shuffle.
 9    */
10   def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions)  

返回一個新的RDD,該RDD恰好具有numPartitions分區。

repartition這個方法可以增加或減少此RDD中的並行度。在內部,這使用shuffle來重新分配數據。

如果要減少RDD中的分區數量,請考慮使用“coalesce”,這樣可以避免執行shuffle。 

這個方法在org.apache.spark.api.java.JavaRDD里面

真正調用的是org.apache.spark.rdd.RDD里面的repartition 

 1   /**
 2    * Return a new RDD that has exactly numPartitions partitions.
 3    *
 4    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
 5    * a shuffle to redistribute data.
 6    *
 7    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
 8    * which can avoid performing a shuffle.
 9    */
10   def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
11     coalesce(numPartitions, shuffle = true)
12   } 

從上面可以看出,在此處還不是方法最終的,還調用了coalesce(numPartitions, shuffle = true) 這個方法,這個方法實現如下:

 1   /**
 2    * Return a new RDD that is reduced into `numPartitions` partitions.
 3    *
 4    * This results in a narrow dependency, e.g. if you go from 1000 partitions
 5    * to 100 partitions, there will not be a shuffle, instead each of the 100
 6    * new partitions will claim 10 of the current partitions.
 7    *
 8    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
 9    * this may result in your computation taking place on fewer nodes than
10    * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
11    * you can pass shuffle = true. This will add a shuffle step, but means the
12    * current upstream partitions will be executed in parallel (per whatever
13    * the current partitioning is).
14    *
15    * Note: With shuffle = true, you can actually coalesce to a larger number
16    * of partitions. This is useful if you have a small number of partitions,
17    * say 100, potentially with a few partitions being abnormally large. Calling
18    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
19    * data distributed using a hash partitioner.
20    */
21   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
22       : RDD[T] = withScope {
23     if (shuffle) {
24       /** Distributes elements evenly across output partitions, starting from a random partition. 注意,鍵的哈希代碼就是鍵本身。HashPartitioner將用分區的總數對它進行修改。*/
25       val distributePartition = (index: Int, items: Iterator[T]) => {
26         var position = (new Random(index)).nextInt(numPartitions)
27         items.map { t =>
28           // Note that the hash code of the key will just be the key itself. The HashPartitioner
29           // will mod it with the number of total partitions.
30           position = position + 1
31           (position, t)
32         }
33       } : Iterator[(Int, T)]
34 
35       // include a shuffle step so that our upstream tasks are still distributed 包含一個shuffle步驟,以便我們的上游任務仍然是分布式的。
36       new CoalescedRDD(
37         new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
38         new HashPartitioner(numPartitions)),
39         numPartitions).values
40     } else {
41       new CoalescedRDD(this, numPartitions)
42     }
43   } 

這個方法返回一個新的RDD,它被簡化為"numpartition"分區。

這導致了一個狹窄的依賴關系,例如,如果從1000個分區到100個分區,將不會有一個shuffle,而是100個新分區中的每一個都會聲明10個當前分區。

然而,如果你正在做一個劇烈的合並,例如當numPartitions = 1時,這可能導致您的計算發生在比您期待的更少的節點上(例如numpartition=1的情況下只有一個節點),即可能導致並行度下降,無法充分利用分布式環境的優勢。

為了避免這種情況,可以傳遞shuffle = true。這將添加一個shuffle步驟,但意味着當前的上游分區將並行執行(無論當前分區是什么)。

注意:使用shuffle = true,您實際上可以合並到更多的分區。

如果您有少量的分區(比如100個),可能有一些分區非常大,那么這是非常有用的,調用coalesce(1000, shuffle = true)將產生1000個分區,使用散列分區器分發數據。

從上面的源碼可以看到,def repartition(numPartitions: Int): JavaRDD[T] 其實調用的是coalesce(numPartitions, shuffle = true)這個方法,而且這個方法產生shuffle操作,分區的規則采用的個是哈希分區。

coalesce

  • def coalesce(numPartitions: Int): JavaRDD[T]
1  
2  /**
3    * Return a new RDD that is reduced into `numPartitions` partitions.
4    */
5   def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) 

而這個方法調用的是org.apache.spark.rdd.RDD里面的def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]。

這個方法和上面repartitions的是一樣的,只不過此處的shuffle參數是默認的false。

真正調用的是new CoalescedRDD(this, numPartitions)此時不會觸發shuffle。

  • def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]
1 /**
2    * Return a new RDD that is reduced into `numPartitions` partitions.
3    */
4   def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
5     rdd.coalesce(numPartitions, shuffle) 

這個和上面的coalesce(numPartitions: Int)類似,只是此處的shuffle參數不再是默認的false,而是自己指定的了,當shuffletrue是會觸發shuffle,反之不會。

演示

 1 scala> var rdd1=sc.textFile("hdfs://file.txt")
 2 rdd1: org.apache.spark.rdd.RDD[String] = hdfs://file.txt MapPartitionsRDD[20] at textFile at <console>:27
 3 
 4 //默認分區數量為177
 5 scala> rdd1.partitions.size
 6 res12: Int = 177
 7 
 8 //調用coalesce(10) 減少分區數量
 9 scala> var rdd2 = rdd1.coalesce(10)
10 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[21] at coalesce at <console>:29
11 
12 //分區數量減少到10個
13 scala> rdd2.partitions.size
14 res13: Int = 10
15 
16 //直接增加分區數量到200
17 scala> var rdd2 = rdd1.coalesce(200)
18 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[22] at coalesce at <console>:29
19 
20 //方法沒有生效
21 scala> rdd2.partitions.size
22 res14: Int = 177
23 
24 //將shuffle設置為true,增加分區到200
25 scala> var rdd2 = rdd1.coalesce(200,true)
26 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at coalesce at <console>:29
27 
28 //重新分區生效
29 scala> rdd2.partitions.size
30 res15: Int = 200
31 
32 ------------------------------------------------------------------------------------------------
33 //對於repartition增加分區到200
34 scala> var rdd2 = rdd1.repartition 直接增加o(200)
35 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at repartition at <console>:29
36 
37 //增加分區生效
38 scala> rdd2.partitions.size
39 res16: Int = 200
40 
41 //對於repartition減少分區到10
42 scala> var rdd2 = rdd1.repartition(10)
43 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[34] at repartition at <console>:29
44 
45 //減少分區生效
46 scala> rdd2.partitions.size
47 res17: Int = 10 

總結

  • coalesce(numPartitions: Int)

當新的分區數小於原來的分區時,分區生效切並且不會觸發shuffle

當新的分區數大於原來的分區時,分區無效還是原來的數量。

  • coalesce(numPartitions: Int, shuffle: Boolean)

shuffletrue時候,無論新的分區比原來的大還是小,分區均生效,並且觸發shuffle操作,此時等同於repartition(numPartitions: Int)

shufflefalse時候,等同於coalesce(numPartitions: Int)

  • def repartition(numPartitions: Int)

無論新的分區比原來的大還是小,分區均生效,並且觸發shuffle操作;

很明顯repartition就是當shuffletrue時候的coalesce(numPartitions: Int, shuffle: Boolean)方法。

 

此為本人學習工作總結,轉載請注明出處!!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM