spark的分區
Spark目前支持Hash分區和Range分區,用戶也可以自定義分區,Hash分區為當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle過程屬於哪個分區和Reduce的個數。
注意
(1)只有Key-Value類型的RDD才有分區器的,非Key-Value類型的RDD,分區器的值是None
(2)每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬於那個分區的。
查看RDD的分區器
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None
對RDD進行重新分區
val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27
一. Hash分區
HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除以分區的個數取余,如果余數小於0,則用余數+分區的個數(否則加0),最后返回的值就是這個key所屬的分區ID。
聚類! key相同,hashCode相同,分配到同一個區
問題:數據傾斜,每個分區中數據量的不均勻
二. Ranger分區
將一定范圍內的數映射到某一個分區內,盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的
實現過程:
①抽樣產生邊界數組
②將元素根據邊界數組判斷屬於哪個區
三. 自定義Partitioner
實現過程
要實現自定義的分區器,你需要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。
(1)numPartitions: Int:返回創建出來的分區數。
(2)getPartition(key: Any): Int:返回給定鍵的分區編號(0到numPartitions-1)。
使用
使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。
使用自定義分區器,傳給 partitionBy() 方法
scala> val par = data.partitionBy(new MyCustomerPartitioner(2))
par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27
查看重新分區后的數據分布
scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))
案例
需求:有以下數據,希望年齡相同的進入同一個區。
User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23)
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
/**
* @description: TODO
* @author: HaoWu
* @create: 2020年08月03日
*/
object MyPartitionerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val list = List(User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23))
val result = sc.makeRDD(list).map {
case User(name, age) => (age, name)
}.partitionBy(new MyPartitioner(3))
result.saveAsTextFile("output")
}
}
/**
* User樣例類
*/
case class User(name: String, age: Int)
/**
* 自定義分區器
*/
class MyPartitioner(num: Int) extends Partitioner {
//設置分區數
override def numPartitions: Int = num
//分區規則
override def getPartition(key: Any): Int = {
//判斷是否為Int類型
if (!key.isInstanceOf[Int]) {
0
} else {
//Hash分區具有聚類的作用,相同age的會被分如同一個區
key.asInstanceOf[Int] % numPartitions
}
}
}
