Spark--Shuffle


 

理解reduceByKey操作,有助於理解Shuffle

reduceByKey

reduceByKey操作將map中的有相同key的value值進行合並,但是map中的數據鍵值對,並不一定分布在相同的partition中,甚至相同的機器中。

所以需要將數據取到相同的主機進行計算-同地協作。

單一task操作在單一partition上,為了組織所有數據進行單一的redueceByKey reduce 任務執行,Spark需要完成all-to-all(多對多)操作,所以必須在所有partitions中尋找所有values為了所有keys。

然后將每一個key對應的值從不同的partitions中放到一起進行最終的計算。這就是Shuffle.

 

 

Shuffle

1、數據完整性

2、網絡IO消耗

3、磁盤IO消耗

 

回顧MapReduce的shuffle

MapReduce的shuffle操作

Shuffle階段在map函數的輸出到reduce函數的輸入,都是shuffle階段,

Split與block的對應關系可能是多對一,默認是一對一。每個map任務會處理一個split,如果block大和split相同,有多少個block就有多少個map任務,hadoop的2.*版本中一個block默認128M。

 

Map階段的shuffle操作:

得到map的輸出,然后進行分區,默認的分區規則:key值計算hash然后對應reduce個數取模;分區個數與reduce個數一致

map分區后的結果會放入到內存的環形緩沖區,它的默認大小是100M,配置信息是mapreduce.task.io.sort.mb,當緩沖區的大小使用超過一定的閥值(mapred-site.xml:mapreduce.map.sort.spill.percent,默認80%),一個后台的線程就會啟動把緩沖區中的數據溢寫(spill)到本地磁盤中(mapred-site.xml:mapreduce.cluster.local.dir),與此同時Mapper繼續向環形緩沖區中寫入數據。

環形緩沖區將數據溢寫到磁盤,在溢寫過程中對數據進行sort和Combiner,排序默認是針對key進行排序,combiner如果指定是優化的一種,類似將reduce的操作在map端進行,避免多余數據的傳輸,比如在分區中有3個("Hadoop",1),可將數據進行合並("Hadoop",3)。溢寫到磁盤小文件大小為80M。

然后將多個小文件合並成一個大文件,在這個過程中,還是會進行sort和combiner,因為即使小文件的內容是已經排序的,合並以后數據也還是需要排序的。不然數據還是無序的。

 

Reduce階段的shuffle操作:

Reduce從Task Tracker中取數據,使用http協議取數據,copy過來的數據放入到內存緩存區中,這里的內存緩沖區的大小為JVM的heap大小。

然后對數據進行merge,這里的merge也會進行sort和combiner,如果設置了combiner。merge也會進行默認的分組,將key進行分組。

 

Spark Shuffle

HashBaseShuffle

缺點:小文件過多,數量為task*reduce的數量

數據到內存buffer是進行partition操作,對key求hash然后根據reduce數量取模。buffer的大小不大32k,不是很大,buffer的數據會隨時寫到block file,這個過程沒有sort。

reduce端通過netty傳輸來取數據,然后將數據放到內存。通過hashmap存儲。

 

優化:使用spark.shuffle.consolidateFiles機制,修改值為true,默認為false,沒有啟用。

文件數量為:reduce*core

在一個core里面並行運行的task其中生成的文件數為reduce的個數。一個core里面並行運行的task,將數據都追加到一起。

 

SortBaseShuffle

現在默認的shuflle為SortBaseShuffle

自帶consolidateFiles機制

自帶sort

 

不用sort排序可以通過配置實現

1、spark.shuffle.sort.bypassMergeThreshold 默認值為200 ,如果shuffle read task的數量小於這個閥值200,則不會進行排序。

2、或者使用hashbasedshuffle + consolidateFiles 機制

 

修改shuffle方法:

spark.shuffle.manager 默認值:sort 

有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計划中的堆外內存管理機制,內存使用效率更高。tungsten-sort慎用,存在bug.

 

 

參考:http://langyu.iteye.com/blog/992916


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM