spark shuffle:分區原理及相關的疑問


一、分區原理

1.為什么要分區?(這個借用別人的一段話來闡述。)

  為了減少網絡傳輸,需要增加cpu計算負載。數據分區,在分布式集群里,網絡通信的代價很大,減少網絡傳輸可以極大提升性能。mapreduce框架的性能開支主要在io和網絡傳輸,io因為要大量讀寫文件,它是不可避免的,但是網絡傳輸是可以避免的,把大文件壓縮變小文件, 從而減少網絡傳輸,但是增加了cpu的計算負載。

Spark里面io也是不可避免的,但是網絡傳輸spark里面進行了優化。spark把rdd進行分區(分片),放在集群上並行計算。同一個rdd分片100個,10個節點,平均一個節點10個分區,當進行sum型的計算的時候,先進行每個分區的sum然后把sum值shuffle傳輸到主程序進行全局sum,所以進行sum型計算對網絡傳輸非常小。但對於進行join型的計算的時候,需要把數據本身進行shuffle,網絡開銷很大。

2.spark是如何優化這個問題的呢?

  • mapreduce分區:就是說mapreduce的網絡傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網絡的,它要把所有的數據混在一起走網絡,然后它才能把相同的key的數據拉到一起。要進行shuffle是存儲決定的
  • spark分區:spark會基於key進行分區,也就是key的hashcode進行分區(相同的key,它的hashcode相同)。所以,它進行分區的時候100t的數據分成10份,每部分10個t,它能確保相同的key肯定在一個分區里面,而且它能保證存儲的時候相同的key能夠存在同一個節點上。例如,一個rdd分成了100份,集群有10個節點,所以每個節點存10份,每一份稱為一個分區,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分區。
  • key的分布不均決定了有的分區大有的分區小。沒法保證分區數據量完全相等,但它會保證在一個接近的范圍。所以對於mapreduce做的某些工作里邊,spark就不需要shuffle了,spark解決網絡傳輸這塊的根本原理就是這個。

    進行join的時候是兩個表,不可能把兩個表都分區好,通常情況下是把用的頻繁的大表事先進行分區,小表進行關聯它的時候小表進行shuffle過程。

大表不需要shuffle。

   RDD 內部的數據集合在邏輯上(以及物理上)被划分成多個小集合,這樣的每一個小集合被稱為分區。像是下面這圖中,三個 RDD,每個 RDD 內部都有兩個分區。

  

    在源碼級別,RDD 類內存儲一個 Partition 列表。每個 Partition 對象都包含一個 index 成員,通過 RDD 編號 + index 就能從唯一確定分區的 Block 編號,持久化的 RDD 就能通過這個 Block 編號從存儲介質中獲得對應的分區數據。(RDD + index-> Block 編號->分區數據)

二、Spark分區原則及方法

1.RDD分區的一個分區原則:盡可能是得分區的個數等於集群核心數目。

下面我們僅討論Spark默認的分區個數,這里分別就parallelize和textFile具體分析其默認的分區數:

   無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其默認分區個數,若沒有設置該值,則根據不同的集群環境確定該值。

  • 本地模式:默認為本地機器的CPU數目,若設置了local[N],則默認為N
  • Apache Mesos:默認的分區數為8
  • Standalone或YARN:默認取集群中所有核心數目的總和,或者2,取二者的較大值。對於parallelize來說,沒有在方法中的指定分區數,則默認為spark.default.parallelism,對於textFile來說,沒有在方法中的指定分區數,則默認為min(defaultParallelism,2),而defaultParallelism對應的就是spark.default.parallelism。如果是從hdfs上面讀取文件,其分區數為文件分片數(128MB/片)

2.如何創建分區?

有兩種情況,創建 RDD 時和通過轉換操作得到新 RDD 時。其實就是創建RDD的2種方法。

  • 對於前者,在調用 textFile 和 parallelize 方法時候手動指定分區個數即可。例如 sc.parallelize(Array(1, 2, 3, 5, 6), 2) 指定創建得到的 RDD 分區個數為 2。
  • 對於后者,直接調用 repartition 方法即可。實際上,分區的個數是根據轉換操作對應多個 RDD 之間的依賴關系來確定的。

           1)窄依賴,子 RDD 由父 RDD 分區個數決定,例如 map 操作,父 RDD 和子 RDD 分區個數一致;

           2)Shuffle 依賴,則由分區器(Partitioner)決定,例如 groupByKey(new HashPartitioner(2)) 或者直接 groupByKey(2) 得到的新 RDD 分區個數等於 2。

3.spark shuffle中的partitioner的執行時機具體是哪里?

  •  partitioner在worker節點上執行,每個stage(除了最后一個)的最后一步就是將數據分區后,寫入磁盤,然后把分區信息上報給master。
  • master在啟動新的stage是會將上一個stage的分區信息攜帶給新的task,這樣新的task就知道去哪讀取數據了。

  分區是spark並行中的最小粒度,也就是說一個分區的數據必定需要一個線程來處理,不可拆分。幾個分區就是並行幾個task。

  例如,假定一個RDD的數據來源於2個hdfs文件,那么加載數據集時一開始默認是2個分區,可以並行處理2個文件。如果你有100個節點,每個節點有一個core,那么最多只能利用2個節點,這時我想提高並行度,就可以對這些數據進行重分區,把這兩個文件分成100個分區,這時候就會使用hashpartitioner把這些數據散列到100個分區。

怎么做呢?

  因為一開始是2個分區,所以產生2個任務,分散到2個節點,每個任務各自利用hashpartitioner開始分區,分區完成的數據寫入磁盤,這個時候2個節點的本地各自都會有100個分區的數據,編號0-99。也就是說一個分區的數據實際位於2個節點。然后它們把這些分區信息上報給driver,這樣driver就知道這些分區的位置了。這個過程就是shuffle。

  接下來假設,我們需要對這個重分區后的RDD計數,此時就有100個分區,可以利用集群100個節點,。對每個分區來看,其實就是從driver上獲取分區信息,然后從兩個節點把該分區的數據通過網絡撈出來,做累加計算(聚合reduce)。

 參考:

1.http://blog.csdn.net/jiangpeng59/article/details/52754928,Spark基礎隨筆:分區詳解

2.http://blog.csdn.net/zengxiaosen/article/details/52637001-spark的優化-控制數據分區和分布

3.http://blog.csdn.net/jiangpeng59/article/details/52754928


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM