一、分區的概念
分區是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被划分為多個分片,每一個分片稱為分區,分區的格式決定了並行計算的粒度,而每個分區的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(准確來說是作業最后一個RDD)的分區數決定。
二、為什么要進行分區
數據分區,在分布式集群里,網絡通信的代價很大,減少網絡傳輸可以極大提升性能。mapreduce框架的性能開支主要在io和網絡傳輸,io因為要大量讀寫文件,它是不可避免的,但是網絡傳輸是可以避免的,把大文件壓縮變小文件, 從而減少網絡傳輸,但是增加了cpu的計算負載。
Spark里面io也是不可避免的,但是網絡傳輸spark里面進行了優化:
Spark把rdd進行分區(分片),放在集群上並行計算。同一個rdd分片100個,10個節點,平均一個節點10個分區,當進行sum型的計算的時候,先進行每個分區的sum,然后把sum值shuffle傳輸到主程序進行全局sum,所以進行sum型計算對網絡傳輸非常小。但對於進行join型的計算的時候,需要把數據本身進行shuffle,網絡開銷很大。
spark是如何優化這個問題的呢?
Spark把key-value rdd通過key的hashcode進行分區,而且保證相同的key存儲在同一個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程,我們進行mapreduce計算的時候為什么要進行shuffle?,就是說mapreduce里面網絡傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網絡的,它要把所有的數據混在一起走網絡,然后它才能把相同的key走到一起。要進行shuffle是存儲決定的。
Spark從這個教訓中得到啟發,spark會把key進行分區,也就是key的hashcode進行分區,相同的key,hashcode肯定是一樣的,所以它進行分區的時候100t的數據分成10分,每部分10個t,它能確保相同的key肯定在一個分區里面,而且它能保證存儲的時候相同的key能夠存在同一個節點上。比如一個rdd分成了100份,集群有10個節點,所以每個節點存10份,每一分稱為每個分區,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分區。
key的分布不均決定了有的分區大有的分區小。沒法分區保證完全相等,但它會保證在一個接近的范圍。所以mapreduce里面做的某些工作里邊,spark就不需要shuffle了,spark解決網絡傳輸這塊的根本原理就是這個。
進行join的時候是兩個表,不可能把兩個表都分區好,通常情況下是把用的頻繁的大表事先進行分區,小表進行關聯它的時候小表進行shuffle過程。
大表不需要shuffle。
需要在工作節點間進行數據混洗的轉換極大地受益於分區。這樣的轉換是 cogroup,groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,combineByKey 和lookup。
三、Spark分區原則及方法
RDD分區的一個分區原則:盡可能是得分區的個數等於集群核心數目
無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其默認分區個數,若沒有設置該值,則根據不同的集群環境確定該值
3.1 本地模式
(1)默認方式
以下這種默認方式就一個分區
結果
(2)手動設置
設置了幾個分區就是幾個分區
結果
(3)跟local[n] 有關
n等於幾默認就是幾個分區
如果n=* 那么分區個數就等於cpu core的個數
結果
本機電腦查看cpu core,我的電腦--》右鍵管理--》設備管理器--》處理器
(4)參數控制
結果
3.2 YARN模式
進入defaultParallelism方法
繼續進入defaultParallelism方法
這個一個trait,其實現類是(Ctrl+h)
進入TaskSchedulerImpl類找到defaultParallelism方法
繼續進入defaultParallelism方法,又是一個trait,看其實現類
Ctrl+h看SchedulerBackend類的實現類
進入CoarseGrainedSchedulerBackend找到defaultParallelism
totalCoreCount.get()是所有executor使用的core總數,和2比較去較大值
如果正常的情況下,那你設置了多少就是多少
四、分區器
(1)如果是從HDFS里面讀取出來的數據,不需要分區器。因為HDFS本來就分好區了。
分區數我們是可以控制的,但是沒必要有分區器。
(2)非key-value RDD分區,沒必要設置分區器
al testRDD = sc.textFile("C:\\Users\\Administrator\\IdeaProjects\\myspark\\src\\main\\hello.txt") .flatMap(line => line.split(",")) .map(word => (word, 1)).partitionBy(new HashPartitioner(2))
沒必要設置,但是非要設置也行。
(3)Key-value形式的時候,我們就有必要了。
HashPartitioner
val resultRDD = testRDD.reduceByKey(new HashPartitioner(2),(x:Int,y:Int) => x+ y) //如果不設置默認也是HashPartitoiner,分區數跟spark.default.parallelism一樣 println(resultRDD.partitioner) println("resultRDD"+resultRDD.getNumPartitions)
RangePartitioner
val resultRDD = testRDD.reduceByKey((x:Int,y:Int) => x+ y) val newresultRDD=resultRDD.partitionBy(new RangePartitioner[String,Int](3,resultRDD)) println(newresultRDD.partitioner) println("newresultRDD"+newresultRDD.getNumPartitions)
注:按照范圍進行分區的,如果是字符串,那么就按字典順序的范圍划分。如果是數字,就按數據自的范圍划分。
自定義分區
需要實現2個方法
class MyPartitoiner(val numParts:Int) extends Partitioner{ override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { val domain = new URL(key.toString).getHost val code = (domain.hashCode % numParts) if (code < 0) { code + numParts } else { code } } } object DomainNamePartitioner { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("word count").setMaster("local") val sc = new SparkContext(conf) val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2), ("http://baidu.com/index", 2), ("http://ali.com", 3), ("http://baidu.com/tmmmm", 4), ("http://baidu.com/test", 4))) //Array[Array[(String, Int)]] // = Array(Array(), // Array((http://baidu.com/index,2), (http://baidu.com/tmmmm,4), // (http://baidu.com/test,4), (http://baidu.com/test,2), (http://ali.com,3))) val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2)) hashPartitionedRDD.glom().collect() //使用spark-shell --jar的方式將這個partitioner所在的jar包引進去,然后測試下面的代碼 // spark-shell --master spark://master:7077 --jars spark-rdd-1.0-SNAPSHOT.jar val partitionedRDD = urlRDD.partitionBy(new MyPartitoiner(2)) val array = partitionedRDD.glom().collect() } }