概述
Shuffle,翻譯成中文就是洗牌。之所以需要Shuffle,還是因為具有某種共同特征的一類數據需要最終匯聚(aggregate)到一個計算節點上進行計算。這些數據分布在各個存儲節點上並且由不同節點的計算單元處理。以最簡單的Word Count為例,其中數據保存在Node1、Node2和Node3;
經過處理后,這些數據最終會匯聚到Nodea、Nodeb處理,如下圖所示。
這個數據重新打亂然后匯聚到不同節點的過程就是Shuffle。但是實際上,Shuffle過程可能會非常復雜:
1)數據量會很大,比如單位為TB或PB的數據分散到幾百甚至數千、數萬台機器上。
2)為了將這個數據匯聚到正確的節點,需要將這些數據放入正確的Partition,因為數據大小已經大於節點的內存,因此這個過程中可能會發生多次硬盤續寫。
3)為了節省帶寬,這個數據可能需要壓縮,如何在壓縮率和壓縮解壓時間中間
做一個比較好的選擇?
4)數據需要通過網絡傳輸,因此數據的序列化和反序列化也變得相對復雜。
一般來說,每個Task處理的數據可以完全載入內存(如果不能,可以減小每個Partition的大小),因此Task可以做到在內存中計算。但是對於Shuffle來說,如果不持久化這個中間結果,一旦數據丟失,就需要重新計算依賴的全部RDD,因此有必要持久化這個中間結果。所以這就是為什么Shuffle過程會產生文件的原因。
如果Shuffle過程不落地,①可能會造成內存溢出 ②當某分區丟失時,會重新計算所有父分區數據
Shuffle Write
Shuffle Write,即數據是如何持久化到文件中,以使得下游的Task可以獲取到其需要處理的數據的(即Shuffle Read)。在Spark 0.8之前,Shuffle Write是持久化到緩存的,但后來發現實際應用中,shuffle過程帶來的數據通常是巨量的,所以經常會發生內存溢出的情況,所以在Spark 0.8以后,Shuffle Write會將數據持久化到硬盤,再之后Shuffle Write不斷進行演進優化,但是數據落地到本地文件系統的實現並沒有改變。
1)Hash Based Shuffle Write
在Spark 1.0以前,Spark只支持Hash Based Shuffle。因為在很多運算場景中並不需要排序,因此多余的排序只能使性能變差,比如Hadoop的Map Reduce就是這么實現的,也就是Reducer拿到的數據都是已經排好序的。實際上Spark的實現很簡單:每個Shuffle Map Task根據key的哈希值,計算出每個key需要寫入的Partition然后將數據單獨寫入一個文件,這個Partition實際上就對應了下游的一個Shuffle Map Task或者Result Task。因此下游的Task在計算時會通過網絡(如果該Task與上游的Shuffle Map Task運行在同一個節點上,那么此時就是一個本地的硬盤讀寫)讀取這個文件並進行計算。
Hash Based Shuffle Write存在的問題
由於每個Shuffle Map Task需要為每個下游的Task創建一個單獨的文件,因此文件的數量就是:
number(shuffle_map_task)*number(result_task)。
如果Shuffle Map Task是1000,下游的Task是500,那么理論上會產生500000個文件(對於size為0的文件Spark有特殊的處理)。生產環境中Task的數量實際上會更多,因此這個簡單的實現會帶來以下問題:
1)每個節點可能會同時打開多個文件,每次打開文件都會占用一定內存。假設每個Write Handler的默認需要100KB的內存,那么同時打開這些文件需要50GB的內存,對於一個集群來說,還是有一定的壓力的。尤其是如果Shuffle Map Task和下游的Task同時增大10倍,那么整體的內存就增長到5TB。
2)從整體的角度來看,打開多個文件對於系統來說意味着隨機讀,尤其是每個文件比較小但是數量非常多的情況。而現在機械硬盤在隨機讀方面的性能特別差,非常容易成為性能的瓶頸。如果集群依賴的是固態硬盤,也許情況會改善很多,但是隨機寫的性能肯定不如順序寫的。
2)Sort Based Shuffle Write
在Spark 1.2.0中,Spark Core的一個重要的升級就是將默認的Hash Based Shuffle換成了Sort Based Shuffle,即spark.shuffle.manager從Hash換成了Sort,對應的實現類分別是org.apache.spark.shuffle.hash.HashShuffleManager和org.apache.spark.shuffle.sort.SortShuffleManager。
那么Sort Based Shuffle"取代"Hash Based Shuffle作為默認選項的原因是什么?
正如前面提到的,Hash Based Shuffle的每個Mapper都需要為每個Reducer寫一個文件,供Reducer讀取,即需要產生M*R個數量的文件,如果Mapper和Reducer的數量比較大,產生的文件數會非常多。
而Sort Based Shuffle的模式是:每個Shuffle Map Task不會為每個Reducer生成一個單獨的文件;相反,它會將所有的結果寫到一個文件里,同時會生成一個Index文件,
Reducer可以通過這個Index文件取得它需要處理的數據。避免產生大量文件的直接收益就是節省了內存的使用和順序Disk IO帶來的低延時。節省內存的使用可以減少GC的風險和頻率。而減少文件的數量可以避免同時寫多個文件給系統帶來的壓力。
Sort Based Write實現詳解
Shuffle Map Task會按照key相對應的Partition ID進行Sort,其中屬於同一個Partition的key不會Sort。因為對於不需要Sort的操作來說,這個Sort是負收益的;要知道之前Spark剛開始使用Hash Based的Shuffle而不是Sort Based就是為了避免Hadoop Map Reduce對於所有計算都會Sort的性能損耗。對於那些需要Sort的運算,比如sortByKey,這個Sort在Spark 1.2.0里還是由Reducer完成的。
①答出shuffle的定義
②spark shuffle的特點
③spark shuffle的目的
④spark shuffel的實現類,即對應優缺點
.