大數據技術 - MapReduce的Shuffle及調優


本章內容我們學習一下 MapReduce 中的 Shuffle 過程,Shuffle 發生在 map 輸出到 reduce 輸入的過程,它的中文解釋是 “洗牌”,顧名思義該過程涉及數據的重新分配,主要分為兩部分:1. map 任務輸出的數據分組、排序,寫入本地磁盤 2. reduce 任務拉取排序。由於該過程涉及排序、磁盤IO、以及網絡IO 等消耗資源和 CPU 比較大的操作,因此該過程向來是“兵家必爭”之地,即大家會重點優化的一個地方,因此也是大數據面試中經常會被重點考察的地方。本文力求通俗、簡單地將 Shuffle 過程描述清楚。

包含 Shuffle 過程的 MapReduce 任務處理流程如下圖,圖片來自《Hadoop權威指南(第四版)》

接下來,分別介紹 Shuffle 所涉及的主要操作。

map 端

map 端輸出時,先將數據寫入內存中的環形緩沖區,默認大小為 100M,可以通過 mapreduce.task.io.sort.mb 來設置。map 端輸出過程如下:

  • 當緩沖區的內容大小達到閾值(默認 0.8,即緩沖區大小的 80%,可通過 mapreduce.map.sort.spill.percent 設置),便有一個后台線程會將寫入緩沖區的內容溢寫到磁盤。溢寫的過程中 map 任務仍然可以寫緩沖區,一旦緩沖區寫滿,map 任務阻塞,直到后台線程寫磁盤結束
  • 后台線程寫磁盤之前會計算輸出的 key 的分區(一個分區對應一個 reduce 任務),同一個分區的 key 分在一組並按照 key 排序。最后寫到本地磁盤。如果設置 combiner 函數,會在寫磁盤之前調用 combaner 函數。我們之前沒有介紹 combiner,不理解的同學可以先忽略,只需知道它是先將數據聚合為了減少網絡IO,且不會影響 reduce 計算結果的一個操作即可
  • 每一次溢寫都會產生一個溢出文件,map 輸出結束后會產生多個溢出文件。最終會被合並成一個分區的且有序的文件。這里為什么要合並成 1 個,因為如果 map 輸出的數據比較多,產生本地的小文件會太多,影響系統性能。因此需要進行合並,通過 mapreduce.task.io.sort.factor 設置一次可以合並的文件個數,默認為 10 
  • 輸出到磁盤的過程中可以設置壓縮, 默認不壓縮。通過設置 mapreduce.map.output.compress 為 true 開啟壓縮

以上便是 map 任務輸出過程的主要操作,輸出到磁盤后,reducer 會通過 http 服務拉取輸出文件中屬於自己分區的數據。

reduce 端

reduce 端在 Shuffle 階段主要涉及復制排序兩個過程。 reduce 端拉取 map 輸出數據的過程是復制階段,對應上圖中的 fetch。一個 reduce 任務需要從多個 map 輸出復制。因此只要有 map 任務完成,reduce 任務就可以進行復制。復制的過程可以是多線程並發進行,並發的線程個數由 mapreduce.reduce.shuffle.parallelcopies 設置,默認是 5 。

  • map 任務完成后通過心跳通知 application master,reduce 端會有一個線程定期查詢 application master,以獲取完成的 map 任務的位置,從而去對應的機器復制數據
  • reduce 復制的數據先寫到 reduce 任務的 JVM 內存,通過 mapreduce.reduce.shuffle.input.buffer.percent 控制可以用的內存比例
  • 如果復制的數據大小達到內存閾值(通過 mapreduce.reduce.shuffle.merge.percent 控制)或者復制的文件數達到閾值(通過 mapreduce.reduce.merge.inmem.threshold 控制,默認 1000)則將內存的數據合並溢寫到磁盤,如果設置了 combine 函數,寫磁盤前會調用 combine 函數以減少寫入磁盤的數據量
  • 復制階段結束后,reduce 將進入排序階段。如果發生了上面第三步,即產生溢寫,那么磁盤可能會有多個溢寫文件,此時需要將磁盤文件合並並排序。如果溢寫的文件較多,需要多次合並,每次合並的文件數由 mapreduce.task.io.sort.factor 控制。最后一次合並排序的時候不會將數據寫到磁盤而直接作為 reduce 任務的輸入

以上便是 reduce 任務前的復制、排序階段。至此,整個 Shuffle 過程就介紹完畢。

參數調優

我們在上面介紹 Shuffle 過程時已經提到了一些參數,這里統一整理並說明一下

map 端調優參數

參數名 默認值 說明
mapreduce.task.io.sort.mb 100 map 輸出是所使用的內存緩沖區大小,單位:MB
mapreduce.map.sort.spill.percent 0.80 map 輸出溢寫到磁盤的內存閾值
mapreduce.task.io.sort.factor 10 排序文件是一次可以合並的流數
mapreduce.map.output.compress false  map 輸出是否壓縮
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec map 輸出壓縮的編解碼器

我們希望在 map 輸出階段能夠提供更多的內存空間,以提升性能。因此 map 函數應該盡量少占用內存,以便留出內存用於輸出。我們也可以評估 map 輸出,通過增大 mapreduce.task.io.sort.mb 值來減少溢寫的文件數。

reduce 端調優參數

參數名

默認值

說明
mapreduce.reduce.shuffle.parallelcopies 5 並發復制的線程數
mapreduce.task.io.sort.factor 10 同 map 端
mapreduce.reduce.shuffle.input.buffer.percent 0.70 Shuffle 的復制階段,用來存放 map 輸出緩沖區占reduce 堆內存的百分比
mapreduce.reduce.shuffle.merge.percent 0.66 map 輸出緩沖區的閾值,超過該比例將進行合並和溢寫磁盤
mapreduce.reduce.merge.inmem.threshold 1000 閾值,當累積的 map 輸出文件數超過該值,進行合並和溢寫磁盤,0或者負值意味着改參數無效,合並和溢寫只由 mapreduce.reduce.shuffle.merge.percent 控制
mapreduce.reduce.input.buffer.percent 0.0

在 reduce 過程(開始運行 reduce 函數時),內存中保存 map 輸出的空間站整個堆空間的比例。

默認情況下,reduce 任務開始前所有的 map 輸出合並到磁盤,以便為 reducer 提供盡可能多的內存。

如果 reducer 需要的內存較少,可以增加此值以最小化磁盤訪問次數

在 reduce 端,進行 reduce 函數前,如果中間數據全部駐留內存可以獲得最佳性能,默認情況是不能實現的。如果 reduce 函數內存需求不大,把 mapreduce.reduce.input.buffer.percent 參數設置大一些可以提升性能。

總結

今天這章,我們詳細介紹了 Shuffle 過程,關注 Shuffle 過程的性能對整個 MR 作業的性能調優至關重要。經過這章的介紹,我們能夠掌握 Shuffle 過程的關鍵技術點,雖然還不算深入。同時,我們介紹了常見的參數以及調優方法,希望能夠在實際應用中不斷的嘗試、總結,寫出性能最佳的任務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM