Spark學習之路 (十七)Spark分區


一、分區的概念

  分區是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被划分為多個分片,每一個分片稱為分區,分區的格式決定了並行計算的粒度,而每個分區的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(准確來說是作業最后一個RDD)的分區數決定。

二、為什么要進行分區

  數據分區,在分布式集群里,網絡通信的代價很大,減少網絡傳輸可以極大提升性能。mapreduce框架的性能開支主要在io和網絡傳輸,io因為要大量讀寫文件,它是不可避免的,但是網絡傳輸是可以避免的,把大文件壓縮變小文件,   從而減少網絡傳輸,但是增加了cpu的計算負載。

  Spark里面io也是不可避免的,但是網絡傳輸spark里面進行了優化:

  Spark把rdd進行分區(分片),放在集群上並行計算。同一個rdd分片100個,10個節點,平均一個節點10個分區,當進行sum型的計算的時候,先進行每個分區的sum,然后把sum值shuffle傳輸到主程序進行全局sum,所以進行sum型計算對網絡傳輸非常小。但對於進行join型的計算的時候,需要把數據本身進行shuffle,網絡開銷很大。

spark是如何優化這個問題的呢?

  Spark把key-value rdd通過key的hashcode進行分區,而且保證相同的key存儲在同一個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程,我們進行mapreduce計算的時候為什么要進行shuffle?,就是說mapreduce里面網絡傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網絡的,它要把所有的數據混在一起走網絡,然后它才能把相同的key走到一起。進行shuffle是存儲決定的。

  Spark從這個教訓中得到啟發,spark會把key進行分區,也就是key的hashcode進行分區,相同的key,hashcode肯定是一樣的,所以它進行分區的時候100t的數據分成10分,每部分10個t,它能確保相同的key肯定在一個分區里面,而且它能保證存儲的時候相同的key能夠存在同一個節點上。比如一個rdd分成了100份,集群有10個節點,所以每個節點存10份,每一分稱為每個分區,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分區。

  key的分布不均決定了有的分區大有的分區小。沒法分區保證完全相等,但它會保證在一個接近的范圍。所以mapreduce里面做的某些工作里邊,spark就不需要shuffle了,spark解決網絡傳輸這塊的根本原理就是這個。

  進行join的時候是兩個表,不可能把兩個表都分區好,通常情況下是把用的頻繁的大表事先進行分區,小表進行關聯它的時候小表進行shuffle過程。

  大表不需要shuffle。
  

  需要在工作節點間進行數據混洗的轉換極大地受益於分區。這樣的轉換是  cogroup,groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,combineByKey 和lookup。

   分區是可配置的,只要RDD是基於鍵值對的即可

三、Spark分區原則及方法

RDD分區的一個分區原則:盡可能是得分區的個數等於集群核心數目

無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其默認分區個數,若沒有設置該值,則根據不同的集群環境確定該值

3.1 本地模式

(1)默認方式

以下這種默認方式就一個分區

結果

(2)手動設置

設置了幾個分區就是幾個分區

結果

(3)跟local[n] 有關

n等於幾默認就是幾個分區

如果n=* 那么分區個數就等於cpu core的個數

結果

本機電腦查看cpu core,我的電腦--》右鍵管理--》設備管理器--》處理器

(4)參數控制

結果

3.2 YARN模式

 進入defaultParallelism方法

繼續進入defaultParallelism方法

這個一個trait,其實現類是(Ctrl+h)

進入TaskSchedulerImpl類找到defaultParallelism方法

繼續進入defaultParallelism方法,又是一個trait,看其實現類

Ctrl+h看SchedulerBackend類的實現類

進入CoarseGrainedSchedulerBackend找到defaultParallelism

totalCoreCount.get()是所有executor使用的core總數,和2比較去較大值

如果正常的情況下,那你設置了多少就是多少

四、分區器

(1)如果是從HDFS里面讀取出來的數據,不需要分區器。因為HDFS本來就分好區了。

    分區數我們是可以控制的,但是沒必要有分區器。

(2)非key-value RDD分區,沒必要設置分區器

al testRDD = sc.textFile("C:\\Users\\Administrator\\IdeaProjects\\myspark\\src\\main\\hello.txt")
  .flatMap(line => line.split(","))
  .map(word => (word, 1)).partitionBy(new HashPartitioner(2))
  沒必要設置,但是非要設置也行。

(3)Key-value形式的時候,我們就有必要了。

HashPartitioner

val resultRDD = testRDD.reduceByKey(new HashPartitioner(2),(x:Int,y:Int) => x+ y)
//如果不設置默認也是HashPartitoiner,分區數跟spark.default.parallelism一樣
println(resultRDD.partitioner)
println("resultRDD"+resultRDD.getNumPartitions)

RangePartitioner

val resultRDD = testRDD.reduceByKey((x:Int,y:Int) => x+ y)
val newresultRDD=resultRDD.partitionBy(new RangePartitioner[String,Int](3,resultRDD))
println(newresultRDD.partitioner)
println("newresultRDD"+newresultRDD.getNumPartitions)
注:按照范圍進行分區的,如果是字符串,那么就按字典順序的范圍划分。如果是數字,就按數據自的范圍划分

 

自定義分區

需要實現2個方法

class MyPartitoiner(val numParts:Int) extends  Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    val domain = new URL(key.toString).getHost
    val code = (domain.hashCode % numParts)
    if (code < 0) {
      code + numParts
    } else {
      code
    }
  }
}

object DomainNamePartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("word count").setMaster("local")

    val sc = new SparkContext(conf)

    val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2),
      ("http://baidu.com/index", 2), ("http://ali.com", 3), ("http://baidu.com/tmmmm", 4),
      ("http://baidu.com/test", 4)))
    //Array[Array[(String, Int)]]
    // = Array(Array(),
    // Array((http://baidu.com/index,2), (http://baidu.com/tmmmm,4),
    // (http://baidu.com/test,4), (http://baidu.com/test,2), (http://ali.com,3)))
    val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2))
    hashPartitionedRDD.glom().collect()

    //使用spark-shell --jar的方式將這個partitioner所在的jar包引進去,然后測試下面的代碼
    // spark-shell --master spark://master:7077 --jars spark-rdd-1.0-SNAPSHOT.jar
    val partitionedRDD = urlRDD.partitionBy(new MyPartitoiner(2))
    val array = partitionedRDD.glom().collect()

  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM