RDD是彈性分布式數據集,通常RDD很大,會被分成多個分區,保存在不同節點上。
那么分區有什么好處呢?
分區能減少節點之間的通信開銷,正確的分區能大大加快程序的執行速度。
我們看個例子
首先我們要了解一個概念,分區並不等同於分塊。
分塊是我們把全部數據切分成好多塊來存儲叫做分塊。
如上圖b,產生的分塊,每個分塊都可能含有同樣范圍的數據。
而分區,則是把同樣范圍的數據分開,如圖a
我們通過這個圖片可以清楚的看到,我們通過把相同主鍵的數據連接。
經過有序分區的數據,只需要按照相同的主鍵分區 join 即可。
未通過分區的分塊執行 join ,額外進行多次連接操作,把同樣的數據連接到不同節點上,大大增大了通信開銷。
在一些操作上,join groupby,filer等等都能從分區上獲得很大的收益。
分區原則
RDD分區的一個分區原則是使得分區個數盡量等於集群中的CPU核心(core)數量。分區過多並不會增加執行速度。
例如,我們集群有10個core,我們分5個區,每個core執行一個分區操作,剩下5個core浪費。
如果,我們分20分區,一個core執行一個分區,剩下的10分區將會排隊等待。
默認分區數目
對於不同的Spark部署模式而言(本地模式,standalone模式,YARN模式,Mesos模式)
都可以數值spark.default.parallelism這個參數值,來配置默認分區。
當然針對不同的部署模式,默認分區的數目肯定也是不相同的。
本地模式,默認為本地機器的CPU數目,若設置了local[N],則默認為N。一般使用local[*]來使用所有CPU數。
YARN模式,在集群中所有CPU核心數目總和和 2 二者中取較大值作為默認值。
Mesos模式,默認分區為8.
如何手動設置分區
1.創建RDD時:在調用 textFile 和 parallelize 方法的時候手動指定分區個數即可。
語法格式 sc.parallelize(path,partitionNum) sc.textFile(path,partitionNum)
//sc.parallelize(path,partitionNum) val list = List("Hadoop","Spark","Hive"); val rdd1 = sc.parallelize(list,2);//設置兩個分區 val rdd2 = sc.parallelize(list);//未指定分區,默認為spark.default.parallelism //sc.textFile(path,partitionNum) val rdd3 = sc.textFile("file://+本地文件地址",2);//設置兩個分區 val rdd4 = sc.textFile("file://+本地文件地址");//未指定分區,默認為min(2,spark.default.parallelism) val rdd5 = sc.textFile("file://+HDFS文件地址");//未指定分區,默認為HDFS文件分片數
2.通過轉化操作得到新的RDD時:調用 repartition 方法即可。
語法格式 val newRdd = oldRdd.repartition(1)
val list = List("Hadoop","Spark","Hive"); val rdd1 = sc.parallelize(list,2);//設置兩個分區 val newRdd1 = rdd1.repartition(3);//重新分區 println(newRdd1.partitions.size);//查看分區數
分區函數
我們在使用分區的時候要了解兩條規則
(1)只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None
(2)每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬於那個分區的
spark內部提供了 HashPartitioner
和 RangePartitioner
兩種分區策略。
1.HashPartitioner
原理:
對於給定的key,計算其hashCode,並除於分區的個數取余,如果余數小於0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。
語法:
rdd.partitionBy(new spark.HashPartitioner(n))
示例:
object Main{
def main(args:Array[String]): Unit ={
val conf = new SparkConf();
val sc = new SparkContext(conf);
val list = List((1,1),(1,2),(2,1),(2,2),(3,1),(3,2))//注意這里必須是(k,v)形式
val rdd = sc.parallelize(list);
rdd.partitionBy(new spark.HashPartitioner(3));//使用HashPartitioner
}
}
我們再看一個例子說明HashPartitioner如何分區的。
注意:實際我們使用的默認分區方式實際是 HashPartitioner 分區方式
2.RangePartitioner
原理:
根據key值范圍和分區數確定分區范圍,將范圍內的鍵分配給相應的分區。
語法:
rdd.partitionBy(new RangePartitioner(n,rdd));
示例:
object Main{ def main(args:Array[String]): Unit ={ val conf = new SparkConf(); val sc = new SparkContext(conf); val list = List((1,1),(1,2),(2,1),(2,2),(3,1),(3,2)) val rdd = sc.parallelize(list); val pairRdd = rdd.partitionBy(new RangePartitioner(3,rdd));//根據key分成三個區 } }
3.用戶自定義分區
如果上面兩種分區都滿足不了你的要求的時候,我們可以自己定義分區類。
Spark提供了相應的接口,我們只需要擴展Partitioner
抽象類。
abstract class Partitioner extends Serializable {
def numPartitions: Int //這個方法需要返回你想要創建分區的個數
def getPartition(key: Any): Int //這個函數需要對輸入的key做計算,然后返回該key的分區ID,范圍一定是0到 numPartitions-1
}
定義完畢后,通過parttitionBy()方法調用。
示例:
我們看這樣一個實例,需要按照最后一位數來分區,我們用普通的分區並不能滿足要求,所以這個時候需要自己定義分區類。
class UDPartitioner (numParts:Int) extends Partitioner { //覆蓋分區數 override def numPartitions = numParts; //覆蓋分區獲取函數,返回分區所用的key override def getPartition(key: Any) : Int= { key.toString.toInt % 10;//通過key除10取余來獲取最后一位數並返回。 } } object Main{ def main(args:Array[String]): Unit ={ val conf = new SparkConf(); val sc = new SparkContext(conf); //模擬5個分區的數據 val data1 = sc.parallelize(1 to 10,5); //注意,RDD一定要是key-value,才能使用用戶自定義的分區類,通過key來確定分區 val data2 = data1.map((_,1));//占位符用法,等同於data.map(x => (x,1)) //根據尾號轉變為10個分區,分別寫到10個文件中 data2.partitionBy(new UDPartitioner(10)).saveAsTextFile("file:///usr/local/output"); } }
另外,我們也可以通過在函數中額外定義 hashcode()方法 和 equal()方法來保證分區的正確分配。