本節內容
------------------
· Spark為什么要分區
· Spark分區原則及方法
· Spark分區案例
· 參考資料
------------------
一、Spark為什么要分區
分區概念:分區是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被划分為多個分片,每一個分片稱為分區,分區的格式決定了並行計算的粒度,而每個分區的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(准確來說是作業最后一個RDD)的分區數決定。
為什么要分區,這個借用別人的一段話來闡述。
數據分區,在分布式集群里,網絡通信的代價很大,減少網絡傳輸可以極大提升性能。mapreduce框架的性能開支主要在io和網絡傳輸,io因為要大量讀寫文件,它是不可避免的,但是網絡傳輸是可以避免的,把大文件壓縮變小文件, 從而減少網絡傳輸,但是增加了cpu的計算負載。
Spark里面io也是不可避免的,但是網絡傳輸spark里面進行了優化:
spark把rdd進行分區(分片),放在集群上並行計算。同一個rdd分片100個,10個節點,平均一個節點10個分區,當進行sum型的計算的時候,先進行每個分區的sum,然后把sum值shuffle傳輸到主程序進行全局sum,所以進行sum型計算對網絡傳輸非常小。但對於進行join型的計算的時候,需要把數據本身進行shuffle,網絡開銷很大。
spark是如何優化這個問題的呢?
spark把key-value rdd通過key的hashcode進行分區,而且保證相同的key存儲在同一個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程,我們進行mapreduce計算的時候為什么要進行shuffle?,就是說mapreduce里面網絡傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網絡的,它要把所有的數據混在一起走網絡,然后它才能把相同的key走到一起。要進行shuffle是存儲決定的。
spark從這個教訓中得到啟發,spark會把key進行分區,也就是key的hashcode進行分區,相同的key,hashcode肯定是一樣的,所以它進行分區的時候100t的數據分成10分,每部分10個t,它能確保相同的key肯定在一個分區里面,而且它能保證存儲的時候相同的key能夠存在同一個節點上。比如一個rdd分成了100份,集群有10個節點,所以每個節點存10份,每一分稱為每個分區,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分區。
key的分布不均決定了有的分區大有的分區小。沒法分區保證完全相等,但它會保證在一個接近的范圍。所以mapreduce里面做的某些工作里邊,spark就不需要shuffle了,spark解決網絡傳輸這塊的根本原理就是這個。
進行join的時候是兩個表,不可能把兩個表都分區好,通常情況下是把用的頻繁的大表事先進行分區,小表進行關聯它的時候小表進行shuffle過程。
大表不需要shuffle。
RDD 內部的數據集合在邏輯上(以及物理上)被划分成多個小集合,這樣的每一個小集合被稱為分區。像是下面這圖中,三個 RDD,每個 RDD 內部都有兩個分區。
在源碼級別,RDD 類內存儲一個 Partition 列表。每個 Partition 對象都包含一個 index 成員,通過 RDD 編號 + index 就能從唯一確定分區的 Block 編號,持久化的 RDD 就能通過這個 Block 編號從存儲介質中獲得對應的分區數據。
二、Spark分區原則及方法
RDD分區的一個分區原則:盡可能是得分區的個數等於集群核心數目
下面我們僅討論Spark默認的分區個數,這里分別就parallelize和textFile具體分析其默認的分區數
無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其默認分區個數,若沒有設置該值,則根據不同的集群環境確定該值
本地模式:默認為本地機器的CPU數目,若設置了local[N],則默認為N
Apache Mesos:默認的分區數為8
Standalone或YARN:默認取集群中所有核心數目的總和,或者2,取二者的較大值。對於parallelize來說,沒有在方法中的指定分區數,則默認為spark.default.parallelism,對於textFile來說,沒有在方法中的指定分區數,則默認為min(defaultParallelism,2),而defaultParallelism對應的就是spark.default.parallelism。如果是從hdfs上面讀取文件,其分區數為文件分片數(128MB/片)
如何創建分區,有兩種情況,創建 RDD 時和通過轉換操作得到新 RDD 時。
對於前者,在調用 textFile 和 parallelize 方法時候手動指定分區個數即可。例如 sc.parallelize(Array(1, 2, 3, 5, 6), 2) 指定創建得到的 RDD 分區個數為 2。
對於后者,直接調用 repartition 方法即可。實際上分區的個數是根據轉換操作對應多個 RDD 之間的依賴關系來確定,窄依賴子 RDD 由父 RDD 分區個數決定,例如 map 操作,父 RDD 和子 RDD 分區個數一致;Shuffle 依賴則由分區器(Partitioner)決定,例如 groupByKey(new HashPartitioner(2)) 或者直接 groupByKey(2) 得到的新 RDD 分區個數等於 2。
三、Spark分區案例
下次再寫......早點睡覺去
四、參考資料
1.http://blog.csdn.net/jiangpeng59/article/details/52754928,Spark基礎隨筆:分區詳解
2.http://blog.csdn.net/zengxiaosen/article/details/52637001-spark的優化-控制數據分區和分布
3.http://blog.csdn.net/jiangpeng59/article/details/52754928