【spark】分區


RDD是彈性分布式數據集,通常RDD很大,會被分成多個分區,保存在不同節點上。

那么分區有什么好處呢?

分區能減少節點之間的通信開銷,正確的分區能大大加快程序的執行速度。

我們看個例子

首先我們要了解一個概念,分區並不等同於分塊。

分塊是我們把全部數據切分成好多塊來存儲叫做分塊。

如上圖b,產生的分塊,每個分塊都可能含有同樣范圍的數據。

而分區,則是把同樣范圍的數據分開,如圖a

我們通過這個圖片可以清楚的看到,我們通過把相同主鍵的數據連接。

經過有序分區的數據,只需要按照相同的主鍵分區 join 即可。

未通過分區的分塊執行 join ,額外進行多次連接操作,把同樣的數據連接到不同節點上,大大增大了通信開銷。

在一些操作上,join groupby,filer等等都能從分區上獲得很大的收益。

分區原則

RDD分區的一個分區原則是使得分區個數盡量等於集群中的CPU核心(core)數量。分區過多並不會增加執行速度。

例如,我們集群有10個core,我們分5個區,每個core執行一個分區操作,剩下5個core浪費。

如果,我們分20分區,一個core執行一個分區,剩下的10分區將會排隊等待。

默認分區數目

對於不同的Spark部署模式而言(本地模式,standalone模式,YARN模式,Mesos模式)

都可以數值spark.default.parallelism這個參數值,來配置默認分區。

當然針對不同的部署模式,默認分區的數目肯定也是不相同的。

本地模式,默認為本地機器的CPU數目,若設置了local[N],則默認為N。一般使用local[*]來使用所有CPU數。

YARN模式,在集群中所有CPU核心數目總和和 2 二者中取較大值作為默認值。

Mesos模式,默認分區為8.

如何手動設置分區

1.創建RDD時:在調用 textFile 和 parallelize 方法的時候手動指定分區個數即可。

  語法格式  sc.parallelize(path,partitionNum)       sc.textFile(path,partitionNum)

//sc.parallelize(path,partitionNum)
val list = List("Hadoop","Spark","Hive");
val rdd1 = sc.parallelize(list,2);//設置兩個分區
val rdd2 = sc.parallelize(list);//未指定分區,默認為spark.default.parallelism

//sc.textFile(path,partitionNum)
val rdd3 = sc.textFile("file://+本地文件地址",2);//設置兩個分區
val rdd4 = sc.textFile("file://+本地文件地址");//未指定分區,默認為min(2,spark.default.parallelism)
val rdd5 = sc.textFile("file://+HDFS文件地址");//未指定分區,默認為HDFS文件分片數

2.通過轉化操作得到新的RDD時:調用 repartition 方法即可。

  語法格式  val newRdd = oldRdd.repartition(1)  

val list = List("Hadoop","Spark","Hive");
val rdd1 = sc.parallelize(list,2);//設置兩個分區
val newRdd1 = rdd1.repartition(3);//重新分區
println(newRdd1.partitions.size);//查看分區數

分區函數

我們在使用分區的時候要了解兩條規則

(1)只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None

(2)每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬於那個分區的

spark內部提供了 HashPartitioner 和 RangePartitioner 兩種分區策略。

1.HashPartitioner

原理:

  對於給定的key,計算其hashCode,並除於分區的個數取余,如果余數小於0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。

語法:

  rdd.partitionBy(new spark.HashPartitioner(n)) 

示例:

object Main{
  def main(args:Array[String]): Unit ={
    val conf = new SparkConf();
    val sc = new SparkContext(conf);
    val list = List((1,1),(1,2),(2,1),(2,2),(3,1),(3,2))//注意這里必須是(k,v)形式
    val rdd = sc.parallelize(list);
    rdd.partitionBy(new spark.HashPartitioner(3));//使用HashPartitioner
  }
}

我們再看一個例子說明HashPartitioner如何分區的。

注意:實際我們使用的默認分區方式實際是 HashPartitioner 分區方式

2.RangePartitioner 

原理:

  根據key值范圍和分區數確定分區范圍,將范圍內的鍵分配給相應的分區。

語法:

  rdd.partitionBy(new RangePartitioner(n,rdd));

示例:

object Main{
  def main(args:Array[String]): Unit ={
    val conf = new SparkConf();
    val sc = new SparkContext(conf);
    val list = List((1,1),(1,2),(2,1),(2,2),(3,1),(3,2))
    val rdd = sc.parallelize(list);
    val pairRdd = rdd.partitionBy(new RangePartitioner(3,rdd));//根據key分成三個區
  }
}

3.用戶自定義分區

如果上面兩種分區都滿足不了你的要求的時候,我們可以自己定義分區類。

  Spark提供了相應的接口,我們只需要擴展Partitioner抽象類。

abstract class Partitioner extends Serializable { 
  def numPartitions: Int  //這個方法需要返回你想要創建分區的個數
  def getPartition(key: Any): Int //這個函數需要對輸入的key做計算,然后返回該key的分區ID,范圍一定是0到 numPartitions-1
}   

  定義完畢后,通過parttitionBy()方法調用。

示例:

 

我們看這樣一個實例,需要按照最后一位數來分區,我們用普通的分區並不能滿足要求,所以這個時候需要自己定義分區類。

class UDPartitioner (numParts:Int) extends Partitioner {
  //覆蓋分區數
  override def numPartitions = numParts;
  //覆蓋分區獲取函數,返回分區所用的key
  override def getPartition(key: Any) : Int= {
    key.toString.toInt % 10;//通過key除10取余來獲取最后一位數並返回。
  }
}
object Main{
  def main(args:Array[String]): Unit ={
    val conf = new SparkConf();
    val sc = new SparkContext(conf);
    //模擬5個分區的數據
    val data1 = sc.parallelize(1 to 10,5);
    //注意,RDD一定要是key-value,才能使用用戶自定義的分區類,通過key來確定分區
    val data2 = data1.map((_,1));//占位符用法,等同於data.map(x => (x,1))
    //根據尾號轉變為10個分區,分別寫到10個文件中
    data2.partitionBy(new UDPartitioner(10)).saveAsTextFile("file:///usr/local/output");
  }
}

另外,我們也可以通過在函數中額外定義 hashcode()方法 和 equal()方法來保證分區的正確分配。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM