Spark(九)【RDD的分區和自定義Partitioner】


spark的分區

​ Spark目前支持Hash分區和Range分區,用戶也可以自定義分區,Hash分區為當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle過程屬於哪個分區和Reduce的個數。

注意

(1)只有Key-Value類型的RDD才有分區器的,非Key-Value類型的RDD,分區器的值是None
(2)每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬於那個分區的。

查看RDD的分區器

scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at 
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None

對RDD進行重新分區

val partitioned = pairs.partitionBy(new HashPartitioner(2))
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27

一. Hash分區

HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除以分區的個數取余,如果余數小於0,則用余數+分區的個數(否則加0),最后返回的值就是這個key所屬的分區ID。

聚類! key相同,hashCode相同,分配到同一個區

問題:數據傾斜,每個分區中數據量的不均勻

二. Ranger分區

​ 將一定范圍內的數映射到某一個分區內,盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的

實現過程:

​ ①抽樣產生邊界數組

​ ②將元素根據邊界數組判斷屬於哪個區

三. 自定義Partitioner

實現過程

要實現自定義的分區器,你需要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。

(1)numPartitions: Int:返回創建出來的分區數。

(2)getPartition(key: Any): Int:返回給定鍵的分區編號(0到numPartitions-1)。

使用

使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。

使用自定義分區器,傳給 partitionBy() 方法
scala> val par = data.partitionBy(new MyCustomerPartitioner(2))
par: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at partitionBy at <console>:27
查看重新分區后的數據分布
scala> par.mapPartitionsWithIndex((index,items)=>items.map((index,_))).collect
res3: Array[(Int, (Int, Int))] = Array((0,(2,2)), (0,(4,4)), (0,(6,6)), (1,(1,1)), (1,(3,3)), (1,(5,5)))

案例

需求:有以下數據,希望年齡相同的進入同一個區。

User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23)
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
 * @description: TODO
 * @author: HaoWu
 * @create: 2020年08月03日
 */
object MyPartitionerTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val list = List(User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23))
    val result = sc.makeRDD(list).map {
      case User(name, age) => (age, name)
    }.partitionBy(new MyPartitioner(3))
    result.saveAsTextFile("output")
  }
}

/**
 * User樣例類
 */
case class User(name: String, age: Int)

/**
 * 自定義分區器
 */
class MyPartitioner(num: Int) extends Partitioner {
  //設置分區數
  override def numPartitions: Int = num

  //分區規則
  override def getPartition(key: Any): Int = {
    //判斷是否為Int類型
    if (!key.isInstanceOf[Int]) {
      0
    } else {
      //Hash分區具有聚類的作用,相同age的會被分如同一個區
      key.asInstanceOf[Int] % numPartitions
    }
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM