在Spark開發中,有時為了更好的效率,特別是涉及到關聯操作的時候,對數據進行重新分區操作可以提高程序運行效率(很多時候效率的提升遠遠高於重新分區的消耗,所以進行重新分區還是很有價值的)。
在SparkSQL中,對數據重新分區主要有兩個方法 repartition 和 coalesce ,下面將對兩個方法比較
repartition
repartition 有三個重載的函數:
- def repartition(numPartitions: Int): DataFrame
1 /** 2 * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. 3 * @group dfops 4 * @since 1.3.0 5 */ 6 def repartition(numPartitions: Int): DataFrame = withPlan { 7 Repartition(numPartitions, shuffle = true, logicalPlan) 8 }
此方法返回一個新的[[DataFrame]],該[[DataFrame]]具有確切的 'numpartition' 分區。
- def repartition(partitionExprs: Column*): DataFrame
1 /** 2 * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving 3 * the existing number of partitions. The resulting DataFrame is hash partitioned. 4 * 5 * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). 6 * 7 * @group dfops 8 * @since 1.6.0 9 */ 10 @scala.annotation.varargs 11 def repartition(partitionExprs: Column*): DataFrame = withPlan { 12 RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None) 13 }
此方法返回一個新的[[DataFrame]]分區,它由保留現有分區數量的給定分區表達式划分。得到的DataFrame是哈希分區的。
這與SQL (Hive QL)中的“distribution BY”操作相同。
- def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
1 /** 2 * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into 3 * `numPartitions`. The resulting DataFrame is hash partitioned. 4 * 5 * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). 6 * 7 * @group dfops 8 * @since 1.6.0 9 */ 10 @scala.annotation.varargs 11 def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = withPlan { 12 RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions)) 13 }
此方法返回一個新的[[DataFrame]],由給定的分區表達式划分為 'numpartition' 。得到的DataFrame是哈希分區的。
這與SQL (Hive QL)中的“distribution BY”操作相同。
coalesce
- coalesce(numPartitions: Int): DataFrame
1 /** 2 * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. 3 * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. 4 * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of 5 * the 100 new partitions will claim 10 of the current partitions. 6 * @group rdd 7 * @since 1.4.0 8 */ 9 def coalesce(numPartitions: Int): DataFrame = withPlan { 10 Repartition(numPartitions, shuffle = false, logicalPlan) 11 }
返回一個新的[[DataFrame]],該[[DataFrame]]具有確切的 'numpartition' 分區。類似於在[[RDD]]上定義的coalesce,這種操作會導致一個狹窄的依賴關系,例如:
如果從1000個分區到100個分區,就不會出現shuffle,而是100個新分區中的每一個都會聲明10個當前分區。
反過來從100個分區到1000個分區,將會出現shuffle。
注:coalesce(numPartitions: Int): DataFrame 和 repartition(numPartitions: Int): DataFrame 底層調用的都是 class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
1 /** 2 * Returns a new RDD that has exactly `numPartitions` partitions. Differs from 3 * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user 4 * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer 5 * of the output requires some specific ordering or distribution of the data. 6 */ 7 case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) 8 extends UnaryNode { 9 override def output: Seq[Attribute] = child.output 10 }
返回一個新的RDD,該RDD恰好具有“numpartition”分區。與[[RepartitionByExpression]]不同的是,這個方法直接由DataFrame調用,因為用戶需要' coalesce '或' repartition '。
當輸出的使用者需要特定的數據排序或分布時使用[[RepartitionByExpression]]。(源碼里面說的是RDD,但是返回類型寫的是DataFrame,感覺沒差)。
而repartition(partitionExprs: Column*): DataFrame 和repartition(numPartitions: Int, partitionExprs: Column*): DataFrame 底層調用是
class RepartitionByExpression(partitionExpressions:Seq[Expression],child:LogicalPlan,numPartitions:Option[Int]=None) extends RedistributeData
1 /** 2 * This method repartitions data using [[Expression]]s into `numPartitions`, and receives 3 * information about the number of partitions during execution. Used when a specific ordering or 4 * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like 5 * `coalesce` and `repartition`. 6 * If `numPartitions` is not specified, the number of partitions will be the number set by 7 * `spark.sql.shuffle.partitions`. 8 */ 9 case class RepartitionByExpression( 10 partitionExpressions: Seq[Expression], 11 child: LogicalPlan, 12 numPartitions: Option[Int] = None) extends RedistributeData { 13 numPartitions match { 14 case Some(n) => require(n > 0, "numPartitions must be greater than 0.") 15 case None => // Ok 16 } 17 }
該方法使用[[Expression]]將數據重新划分為 'numpartition',並在執行期間接收關於分區數量的信息。當用戶期望某個特定的排序或分布時使用。使用[[Repartition]]用於類rdd的 'coalesce' 和 'Repartition'。
如果沒有指定 'numpartition',那么分區的數量將由 "spark.sql.shuffle.partition" 設置。
使用示例
- def repartition(numPartitions: Int): DataFrame
1 // 獲取一個測試的DataFrame 里面包含一個user字段 2 val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 3 // 獲得10個分區的DataFrame 4 testDataFrame.repartition(10)
- def repartition(partitionExprs: Column*): DataFrame
1 // 獲取一個測試的DataFrame 里面包含一個user字段 2 val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 3 // 根據 user 字段進行分區,分區數量由 spark.sql.shuffle.partition 決定 4 testDataFrame.repartition($"user")
- def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
1 // 獲取一個測試的DataFrame 里面包含一個user字段 2 val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 3 // 根據 user 字段進行分區,將獲得10個分區的DataFrame,此方法有時候在join的時候可以極大的提高效率,但是得注意出現數據傾斜的問題 4 testDataFrame.repartition(10,$"user")
- coalesce(numPartitions: Int): DataFrame
1 val testDataFrame1: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 2 val testDataFrame2=testDataFrame1.repartition(10) 3 // 不會觸發shuffle 4 testDataFrame2.coalesce(5) 5 // 觸發shuffle 返回一個100分區的DataFrame 6 testDataFrame2.coalesce(100)
至於分區的數據設定,得根據自己的實際情況來,多了浪費少了負優化。
現在的只是初步探討,具體的底層代碼實現,后續去研究一下。
此文為本人工作學習整理筆記,轉載請注明出處!!!!!!