Spark源碼系列:DataFrame repartition、coalesce 對比


在Spark開發中,有時為了更好的效率,特別是涉及到關聯操作的時候,對數據進行重新分區操作可以提高程序運行效率(很多時候效率的提升遠遠高於重新分區的消耗,所以進行重新分區還是很有價值的)。
在SparkSQL中,對數據重新分區主要有兩個方法 repartition 和 coalesce ,下面將對兩個方法比較

repartition

 repartition 有三個重載的函數:

  • def repartition(numPartitions: Int): DataFrame 
1 /**
2    * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
3    * @group dfops
4    * @since 1.3.0
5    */
6   def repartition(numPartitions: Int): DataFrame = withPlan {
7     Repartition(numPartitions, shuffle = true, logicalPlan)
8   }

  此方法返回一個新的[[DataFrame]],該[[DataFrame]]具有確切的 'numpartition' 分區。

  •  def repartition(partitionExprs: Column*): DataFrame 
 1 /**
 2    * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
 3    * the existing number of partitions. The resulting DataFrame is hash partitioned.
 4    *
 5    * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
 6    *
 7    * @group dfops
 8    * @since 1.6.0
 9    */
10   @scala.annotation.varargs
11   def repartition(partitionExprs: Column*): DataFrame = withPlan {
12     RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
13   }

 此方法返回一個新的[[DataFrame]]分區,它由保留現有分區數量的給定分區表達式划分。得到的DataFrame是哈希分區的。

這與SQL (Hive QL)中的“distribution BY”操作相同。

  • def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
 1   /**
 2    * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
 3    * `numPartitions`. The resulting DataFrame is hash partitioned.
 4    *
 5    * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
 6    *
 7    * @group dfops
 8    * @since 1.6.0
 9    */
10   @scala.annotation.varargs
11   def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = withPlan {
12     RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
13   }

 此方法返回一個新的[[DataFrame]],由給定的分區表達式划分為 'numpartition' 。得到的DataFrame是哈希分區的。

這與SQL (Hive QL)中的“distribution BY”操作相同。

coalesce

  • coalesce(numPartitions: Int): DataFrame
 1   /**
 2    * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
 3    * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
 4    * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
 5    * the 100 new partitions will claim 10 of the current partitions.
 6    * @group rdd
 7    * @since 1.4.0
 8    */
 9   def coalesce(numPartitions: Int): DataFrame = withPlan {
10     Repartition(numPartitions, shuffle = false, logicalPlan)
11   }

 返回一個新的[[DataFrame]],該[[DataFrame]]具有確切的 'numpartition' 分區。類似於在[[RDD]]上定義的coalesce,這種操作會導致一個狹窄的依賴關系,例如:

如果從1000個分區到100個分區,就不會出現shuffle,而是100個新分區中的每一個都會聲明10個當前分區。

反過來從100個分區到1000個分區,將會出現shuffle。

 注:coalesce(numPartitions: Int): DataFrame    和 repartition(numPartitions: Int): DataFrame 底層調用的都是 class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)

 1 /**
 2  * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
 3  * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
 4  * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
 5  * of the output requires some specific ordering or distribution of the data.
 6  */
 7 case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
 8   extends UnaryNode {
 9   override def output: Seq[Attribute] = child.output
10 }

返回一個新的RDD,該RDD恰好具有“numpartition”分區。與[[RepartitionByExpression]]不同的是,這個方法直接由DataFrame調用,因為用戶需要' coalesce '或' repartition '。

當輸出的使用者需要特定的數據排序或分布時使用[[RepartitionByExpression]]。(源碼里面說的是RDD,但是返回類型寫的是DataFrame,感覺沒差)。

repartition(partitionExprs: Column*): DataFramerepartition(numPartitions: Int, partitionExprs: Column*): DataFrame 底層調用是 

class RepartitionByExpression(partitionExpressions:Seq[Expression],child:LogicalPlan,numPartitions:Option[Int]=None) extends RedistributeData 

 1 /**
 2  * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
 3  * information about the number of partitions during execution. Used when a specific ordering or
 4  * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
 5  * `coalesce` and `repartition`.
 6  * If `numPartitions` is not specified, the number of partitions will be the number set by
 7  * `spark.sql.shuffle.partitions`.
 8  */
 9 case class RepartitionByExpression(
10     partitionExpressions: Seq[Expression],
11     child: LogicalPlan,
12     numPartitions: Option[Int] = None) extends RedistributeData {
13   numPartitions match {
14     case Some(n) => require(n > 0, "numPartitions must be greater than 0.")
15     case None => // Ok
16   }
17 } 

該方法使用[[Expression]]將數據重新划分為 'numpartition',並在執行期間接收關於分區數量的信息。當用戶期望某個特定的排序或分布時使用。使用[[Repartition]]用於類rdd的 'coalesce' 和 'Repartition'。

如果沒有指定 'numpartition',那么分區的數量將由 "spark.sql.shuffle.partition" 設置。

使用示例

  • def repartition(numPartitions: Int): DataFrame  
1 //    獲取一個測試的DataFrame 里面包含一個user字段
2     val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath)
3 //    獲得10個分區的DataFrame
4     testDataFrame.repartition(10) 
  •  def repartition(partitionExprs: Column*): DataFrame  
1 //    獲取一個測試的DataFrame 里面包含一個user字段
2     val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath)
3 //    根據 user 字段進行分區,分區數量由 spark.sql.shuffle.partition 決定
4     testDataFrame.repartition($"user")
  • def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame 
1 //    獲取一個測試的DataFrame 里面包含一個user字段
2     val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath)
3 //    根據 user 字段進行分區,將獲得10個分區的DataFrame,此方法有時候在join的時候可以極大的提高效率,但是得注意出現數據傾斜的問題
4     testDataFrame.repartition(10,$"user") 
  • coalesce(numPartitions: Int): DataFrame 
1 val testDataFrame1: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath)
2     val testDataFrame2=testDataFrame1.repartition(10)
3 //    不會觸發shuffle
4     testDataFrame2.coalesce(5)
5 //    觸發shuffle 返回一個100分區的DataFrame
6     testDataFrame2.coalesce(100)  

至於分區的數據設定,得根據自己的實際情況來,多了浪費少了負優化。

現在的只是初步探討,具體的底層代碼實現,后續去研究一下。

 

此文為本人工作學習整理筆記,轉載請注明出處!!!!!!

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM