在MapReduce框架中,shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark作為MapReduce框架的一種實現,自然也實現了shuffle的邏輯。
Shuffle
Shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希,並且分發到每一個Reducer上去,這個過程就是shuffle。由於shuffle涉及到了磁盤的讀寫和網絡的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。
下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介於Map phase和Reduce phase之間。
概念上shuffle就是一個溝通數據連接的橋梁,那么實際上shuffle(partition)這一部分是如何實現的的呢,下面我們就以Spark為例講一下shuffle在Spark中的實現。
Spark Shuffle進化史
先以圖為例簡單描述一下Spark中shuffle的整一個流程:
- 首先每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M×RM×R,其中MM是Map的個數,RR是Reduce的個數。
- 其次Mapper產生的結果會根據設置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中去。
- 當Reducer啟動時,它會根據自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket作為Reducer的輸入進行處理。
這里的bucket是一個抽象概念,在實現中每個bucket可以對應一個文件,可以對應文件的一部分或是其他等。
Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有着諸多類似,一些概念可直接套用,例如,Shuffle 過程中,提供數據的一端,被稱作 Map 端,Map 端每個生成數據的任務稱為 Mapper,對應的,接收數據的一端,被稱作 Reduce 端,Reduce 端每個拉取數據的任務稱為 Reducer,Shuffle 過程本質上都是將 Map 端獲得的數據使用分區器進行划分,並將數據發送給對應的 Reducer 的過程。
參考:
http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/
https://ihainan.gitbooks.io/spark-source-code/content/section3/index.html