兩個概念:
- 分區partition
- 分區器partitioner
partition
RDD有個partitions方法:
final def partitions: Array[Partition]
,
能夠返回一個數組,數組元素是RDD的partition。
partition是RDD的最小數據處理單元,可以看作是一個數據塊,每個partition有個編號index。
一個partition被一個map task處理。
partitioner
MR任務的map階段的處理結果會進行分片(也可以叫分區,這個分區不同於上面的分區),分片的數量就是reduce task的數量。
具體怎么分片由分區器partitioner決定,spark中默認定義了兩種partitioner:
- 哈希分區器(Hash Partitioner)
- 范圍分區器(Range Partitioner)
hash分區器會根據key-value的鍵值key的hashcode進行分區,速度快,但是可能產生數據偏移,造成每個分區中數據量不均衡。
range分區器會對現有rdd中的key-value數據進行抽樣,盡量找出均衡分割點,一定程度上解決了數據偏移問題,力求分區后的每個分區內數據量均衡,但是速度相對慢。
partitioner分區詳情
在對父RDD執行完Map階段任務后和在執行Reduce階段任務前,會對Map階段中間結果進行分區。
分區由父RDD的partitioner確定,主要包括兩部分工作:
- 確定分區數量(也就是reduce task數量),也是子RDD的partition數量。
- 決定將Map階段中間結果的每個key-value對分到哪個分區上。
假設一個父RDD要執行reduceByKey任務,我們可以顯式的指定分區器:
val rdd_child = rdd_parent.reduceByKey(new HashPartitioner(3), _+_)
HashPartitioner
構造參數3就是分區數量,也是啟動的reduce task數量,也是reduceByKey
結果返回的子RDD的partitions
方法返回的數組的長度。
如果沒有顯式指定分區器,則會調用org.apache.spark
包下伴生對象Partitioner
的defaultPartitioner
靜態方法返回的分區器作為默認分區器。
defaultPartitioner
返回默認分區器的過程如下:
嘗試利用父RDD的partitioner,如果父RDD沒有partitioner,則會查看sparkConf中是否定義了 spark.default.parallelism
配置參數,如果定義了就返回new HashPartitioner(sc.defaultParallelism)
作為默認分區器,如果沒定義就返回new HashPartitioner(rdd_parent.partitions.length)
作為默認分區器——
以下是源碼:
//org.apache.spark包下伴生對象object Partitioner的方法
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
更具體的,無論是以本地模式、Standalone 模式、Yarn 模式或者是 Mesos 模式來運行 Apache Spark,分區的默認個數等於對spark.default.parallelism
的指定值,若該值未設置,則 Apache Spark 會根據不同集群模式的特征,來確定這個值。