本章內容我們學習一下 MapReduce 中的 Shuffle 過程,Shuffle 發生在 map 輸出到 reduce 輸入的過程,它的中文解釋是 “洗牌”,顧名思義該過程涉及數據的重新分配,主要分為兩部分:1. map 任務輸出的數據分組、排序,寫入本地磁盤 2. reduce 任務拉取排序。由於該過程涉及排序、磁盤IO、以及網絡IO 等消耗資源和 CPU 比較大的操作,因此該過程向來是“兵家必爭”之地,即大家會重點優化的一個地方,因此也是大數據面試中經常會被重點考察的地方。本文力求通俗、簡單地將 Shuffle 過程描述清楚。
包含 Shuffle 過程的 MapReduce 任務處理流程如下圖,圖片來自《Hadoop權威指南(第四版)》
接下來,分別介紹 Shuffle 所涉及的主要操作。
map 端
map 端輸出時,先將數據寫入內存中的環形緩沖區,默認大小為 100M,可以通過 mapreduce.task.io.sort.mb 來設置。map 端輸出過程如下:
- 當緩沖區的內容大小達到閾值(默認 0.8,即緩沖區大小的 80%,可通過 mapreduce.map.sort.spill.percent 設置),便有一個后台線程會將寫入緩沖區的內容溢寫到磁盤。溢寫的過程中 map 任務仍然可以寫緩沖區,一旦緩沖區寫滿,map 任務阻塞,直到后台線程寫磁盤結束
- 后台線程寫磁盤之前會計算輸出的 key 的分區(一個分區對應一個 reduce 任務),同一個分區的 key 分在一組並按照 key 排序。最后寫到本地磁盤。如果設置 combiner 函數,會在寫磁盤之前調用 combaner 函數。我們之前沒有介紹 combiner,不理解的同學可以先忽略,只需知道它是先將數據聚合為了減少網絡IO,且不會影響 reduce 計算結果的一個操作即可
- 每一次溢寫都會產生一個溢出文件,map 輸出結束后會產生多個溢出文件。最終會被合並成一個分區的且有序的文件。這里為什么要合並成 1 個,因為如果 map 輸出的數據比較多,產生本地的小文件會太多,影響系統性能。因此需要進行合並,通過 mapreduce.task.io.sort.factor 設置一次可以合並的文件個數,默認為 10
- 輸出到磁盤的過程中可以設置壓縮, 默認不壓縮。通過設置 mapreduce.map.output.compress 為 true 開啟壓縮
以上便是 map 任務輸出過程的主要操作,輸出到磁盤后,reducer 會通過 http 服務拉取輸出文件中屬於自己分區的數據。
reduce 端
reduce 端在 Shuffle 階段主要涉及復制和排序兩個過程。 reduce 端拉取 map 輸出數據的過程是復制階段,對應上圖中的 fetch。一個 reduce 任務需要從多個 map 輸出復制。因此只要有 map 任務完成,reduce 任務就可以進行復制。復制的過程可以是多線程並發進行,並發的線程個數由 mapreduce.reduce.shuffle.parallelcopies 設置,默認是 5 。
- map 任務完成后通過心跳通知 application master,reduce 端會有一個線程定期查詢 application master,以獲取完成的 map 任務的位置,從而去對應的機器復制數據
- reduce 復制的數據先寫到 reduce 任務的 JVM 內存,通過 mapreduce.reduce.shuffle.input.buffer.percent 控制可以用的內存比例
- 如果復制的數據大小達到內存閾值(通過 mapreduce.reduce.shuffle.merge.percent 控制)或者復制的文件數達到閾值(通過 mapreduce.reduce.merge.inmem.threshold 控制,默認 1000)則將內存的數據合並溢寫到磁盤,如果設置了 combine 函數,寫磁盤前會調用 combine 函數以減少寫入磁盤的數據量
- 復制階段結束后,reduce 將進入排序階段。如果發生了上面第三步,即產生溢寫,那么磁盤可能會有多個溢寫文件,此時需要將磁盤文件合並並排序。如果溢寫的文件較多,需要多次合並,每次合並的文件數由 mapreduce.task.io.sort.factor 控制。最后一次合並排序的時候不會將數據寫到磁盤而直接作為 reduce 任務的輸入
以上便是 reduce 任務前的復制、排序階段。至此,整個 Shuffle 過程就介紹完畢。
參數調優
我們在上面介紹 Shuffle 過程時已經提到了一些參數,這里統一整理並說明一下
map 端調優參數
參數名 | 默認值 | 說明 |
mapreduce.task.io.sort.mb | 100 | map 輸出是所使用的內存緩沖區大小,單位:MB |
mapreduce.map.sort.spill.percent | 0.80 | map 輸出溢寫到磁盤的內存閾值 |
mapreduce.task.io.sort.factor | 10 | 排序文件是一次可以合並的流數 |
mapreduce.map.output.compress | false | map 輸出是否壓縮 |
mapreduce.map.output.compress.codec | org.apache.hadoop.io.compress.DefaultCodec | map 輸出壓縮的編解碼器 |
我們希望在 map 輸出階段能夠提供更多的內存空間,以提升性能。因此 map 函數應該盡量少占用內存,以便留出內存用於輸出。我們也可以評估 map 輸出,通過增大 mapreduce.task.io.sort.mb 值來減少溢寫的文件數。
reduce 端調優參數
參數名 | 默認值 |
說明 |
mapreduce.reduce.shuffle.parallelcopies | 5 | 並發復制的線程數 |
mapreduce.task.io.sort.factor | 10 | 同 map 端 |
mapreduce.reduce.shuffle.input.buffer.percent | 0.70 | Shuffle 的復制階段,用來存放 map 輸出緩沖區占reduce 堆內存的百分比 |
mapreduce.reduce.shuffle.merge.percent | 0.66 | map 輸出緩沖區的閾值,超過該比例將進行合並和溢寫磁盤 |
mapreduce.reduce.merge.inmem.threshold | 1000 | 閾值,當累積的 map 輸出文件數超過該值,進行合並和溢寫磁盤,0或者負值意味着改參數無效,合並和溢寫只由 mapreduce.reduce.shuffle.merge.percent 控制 |
mapreduce.reduce.input.buffer.percent | 0.0 | 在 reduce 過程(開始運行 reduce 函數時),內存中保存 map 輸出的空間站整個堆空間的比例。 默認情況下,reduce 任務開始前所有的 map 輸出合並到磁盤,以便為 reducer 提供盡可能多的內存。 如果 reducer 需要的內存較少,可以增加此值以最小化磁盤訪問次數 |
在 reduce 端,進行 reduce 函數前,如果中間數據全部駐留內存可以獲得最佳性能,默認情況是不能實現的。如果 reduce 函數內存需求不大,把 mapreduce.reduce.input.buffer.percent 參數設置大一些可以提升性能。
總結
今天這章,我們詳細介紹了 Shuffle 過程,關注 Shuffle 過程的性能對整個 MR 作業的性能調優至關重要。經過這章的介紹,我們能夠掌握 Shuffle 過程的關鍵技術點,雖然還不算深入。同時,我們介紹了常見的參數以及調優方法,希望能夠在實際應用中不斷的嘗試、總結,寫出性能最佳的任務。