原理圖:
中間結果的排序與溢出(spill)流程圖
map分析:
(1)、輸入分片(input split):在進行mapreduce之前,mapreduce首先會對輸入文件進行輸入分片(input split)操作,每一個輸入分片針對一個map任務,輸入分片(input split)存儲的並非數據本身,而是一個分片長度和一個記錄數據的位置的數組,對於輸入文件的分片大小,通常跟hdfs的塊大小有關系,例如:hdfs的塊大小為64MB,假如輸入三個文件,1MB、98MB的文件,mapreduce就會對1MB的文件當作一個input split,對98MB的文件做兩個input split(可以通過修改參數mapreduce.input.fileinputformat.split.minsize,使其大於塊容量;由於CPU的約束,有時可以減少fileinputformat.split.minsize屬性值,使它小於HDFS的塊容量,從而提高資源的利用率。)。針對這三個分片操作,就有三個map任務要執行,但是這里每一個map任務執行的數據大小並不均勻,這里也是一個調優的重點。
(2)、map階段就是通過程序員定義好的map函數輸出鍵值<k1,v1>對了。每一個map task有一個環形內存緩沖區,用於存放map task的輸出,也就是鍵值對<k2,v2>,已經被序列化,但沒有排序。環形緩沖區默認大小100MB(mapreduce.task.io.sort.mb屬性),一旦達到閥值0.8(mapreduce.map.sort.spill.percent屬性),一個后台線程就把溢出(spill)內容寫到Linux本地磁盤中的指定目錄(mapreduce.cluster.local.dir)下的新建的一個溢出寫文件,當超過閾值時,Map任務不會因為緩存溢出而被阻塞。但如果達到硬限制,Map任務會被阻塞,直到溢出行為結束。緩存的好處就是減少磁盤I/O的開銷,提高合並和排序的速度。又因為默認的內存緩沖大小是100M(當然這個是可以配置的),所以在編寫map函數的時候要盡量減少內存的使用,為shuffle過程預留更多的內存,因為該過程是最耗時的過程。
♥ 線程會將記錄基於鍵進行分區(通過 mapreduce.job.partitioner.class設置分區算法的類),在內存中將每個分區的記錄按鍵排序(通過map.sort.class指定排序算法,默認快速排序org.apache.hadoop.util.QuickSort),然后寫入一個文件。每次溢出,都有一個獨立的文件存儲。
♥ Map任務完成后,緩存溢出的各個文件會按鍵排序后合並到一個輸出文件(通過mapreduce.cluster.local.dir指定輸出目錄,值為${hadoop.tmp.dir}/mapred/local)。合並文件的流的數量通過mapreduce.task.io.sort.factor指定,默認10,即同時打開10個文件執行合並。
說白點就是:在寫磁盤前,要進行partition、sort和combine等操作。通過分區,將不同類型的數據分開處理,之后對不同分區的數據進行排序,如果有Combiner,還要對排序后的數據進行combine。等最后記錄寫完,將全部溢出文件合並為一個分區且排序的文件。(注意:在寫磁盤的時候采用壓縮的方式將map的輸出結果進行壓縮是一個減少網絡開銷很有效的方法!)
根據上面步驟,最好僅在Map任務結束的時候才能緩存寫到磁盤中。
可以采用以下方法提高排序和緩存寫入磁盤的效率:
- 調整mapreduce.task.io.sort.mb大小,從而避免或減少緩存溢出的數量。當調整這個參數時,最好同時檢測Map任務的JVM的堆大小,並必要的時候增加堆空間。
- mapreduce.task.io.sort.factor屬性的值提高100倍左右,這可以使合並處理更快,並減少磁盤的訪問。
- 為K-V提供一個更高效的自定義序列化工具,序列化后的數據占用空間越少,緩存使用率就越高。
- 提供更高效的Combiner(合並器),使Map任務的輸出結果聚合效率更高。
- 提供更高效的鍵比較器和值的分組比較器。
注:
如果指定了Combiner,可能在兩個地方被調用。
- 當為作業設置Combiner類后,緩存溢出線程將緩存存放到磁盤時,就會調用;
- 緩存溢出的數量超過mapreduce.map.combine.minspills(默認3)時,在緩存溢出文件合並的時候會調用Combiner。
1、獲取中間輸出結果(Map側)
Reducer需要通過網絡獲取Map任務的輸出結果,然后才能執行Reduce任務,可以通過下述Map側的優化來減輕網絡負載:
- 通過壓縮輸出結果,mapreduce.map.output.compress設置為true(默認false),mapreduce.map.output.compress.codec指定壓縮方式。
- Reduce任務是通過HTTP協議獲取輸出分片的,可以使用mapreduce.tasktracker.http.threads指定執行線程數(默認40)
reduce階段:
Reduce任務是一個數據聚合的步驟。數量默認為1,而使用過多的Reduce任務則意味着復雜的shuffle,並使輸出文件的數量激增。mapreduce.job.reduces屬性設置reduce數量,也可以通過編程的方式,調用Job對象的setNumReduceTasks()方法來設置。一個節點Reduce任務數量上限由mapreduce.tasktracker.reduce.tasks.maximum設置(默認2)。
可以采用以下探試法來決定Reduce任務的合理數量:
# 每個reducer都可以在Map任務完成后立即執行 0.95 * (節點數量 * mapreduce.tasktracker.reduce.tasks.maximum)
另一個方法是
# 較快的節點在完成第一個Reduce任務后,馬上執行第二個 1.75 * (節點數量 * mapreduce.tasktracker.reduce.tasks.maximum)
2. 獲取中間輸出結果(Reduce側)
Reduce任務在結束時都會獲取Map任務相應的分區數據,這個過程叫復制階段(copy phase)。一個Reduce任務並行多少個Map任務是由mapreduce.reduce.shuffle.parallelcopies參數決定(默認5)。
由於網絡問題,Reduce任務無法獲取數據時,會以指數退讓(exponential backoff)的方式重試,超時時間由mapreduce.reduce.shuffle.connect.timeout設置(默認180000,單位毫秒),超時之后,Reduce任務標記為失敗狀態。
3. 中間輸出結果的合並與溢出
Reduce任務也需要對多個Map任務的輸出結果進行合並,過程如上圖,根據Map任務的輸出數據的大小,可能將其復制到內存或磁盤。mapreduce.reduce.shuffle.input.buffer.percent屬性配置了這個任務占用的緩存空間在堆棧空間中的占用比例(默認0.70)。
mapreduce.reduce.shuffle.merge.percent決定緩存溢出到磁盤的閾值(默認0.66),mapreduce.reduce.merge.inmem.threshold設置了Map任務在緩存溢出前能夠保留在內存中的輸出個數的閾值(默認1000),只要一個滿足,輸出數據都將會寫到磁盤。
在收到Map任務輸出數據后,Reduce任務進入合並(merge)或排序(sort)階段。同時合並的文件流的數量由mapreduce.task.io.sort.factor屬性決定(默認10)。
Map任務輸出數據的所有壓縮操作,在合並時都會在內存中進行解壓縮操作。
借鑒:https://blog.csdn.net/u013980127/article/details/52807360
鏈接:https://blog.csdn.net/u012151684/article/details/72589302