1. 從輸入到輸出
一個MapReducer作業經過了input,map,combine,reduce,output五個階段,其中combine階段並不一定發生,map輸出的中間結果被分到reduce的過程成為shuffle(數據清洗)。


在shuffle階段還會發生copy(復制)和sort(排序)。
在MapReduce的過程中,一個作業被分成Map和Reducer兩個計算階段,它們由一個或者多個Map任務和Reduce任務組成。如下圖所示,一個MapReduce作業從數據的流向可以分為Map任務和Reduce任務。當用戶向Hadoop提交一個MapReduce作業時,JobTracker則會根據各個TaskTracker周期性發送過來的心跳信息綜合考慮TaskTracker的資源剩余量,作業優先級,作業提交時間等因素,為TaskTracker分配合適的任務。Reduce任務默認會在Map任務數量完成5%后才開始啟動。

Map任務的執行過程可以概括為:首先通過用戶指定的InputFormat類中的getSplits方法和next方法將輸入文件切片並解析成鍵值對作為map函數的輸入。然后map函數經過處理之后將中間結果交給指定的Partitioner處理,確保中間結果分發到指定的Reduce任務處理,此時如果用戶指定了Combiner,將執行combine操作。最后map函數將中間結果保存到本地。
Reduce任務的執行過程可以概括為:首先需要將已經完成Map任務的中間結果復制到Reduce任務所在的節點,待數據復制完成后,再以key進行排序,通過排序,將所有key相同的數據交給reduce函數處理,處理完成后,結果直接輸出到HDFS上。
2. input
如果使用HDFS上的文件作為MapReduce的輸入,MapReduce計算框架首先會用org.apache.hadoop.mapreduce.InputFomat類的子類FileInputFormat類將作為輸入HDFS上的文件切分形成輸入分片(InputSplit),每個InputSplit將作為一個Map任務的輸入,再將InputSplit解析為鍵值對。InputSplit的大小和數量對於MaoReduce作業的性能有非常大的影響。
InputSplit只是邏輯上對輸入數據進行分片,並不會將文件在磁盤上分成分片進行存儲。InputSplit只是記錄了分片的元數據節點信息,例如起始位置,長度以及所在的節點列表等。數據切分的算法需要確定InputSplit的個數,對於HDFS上的文件,FileInputFormat類使用computeSplitSize方法計算出InputSplit的大小,代碼如下:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}
其中 minSize 由mapred-site.xml文件中的配置項mapred.min.split.size決定,默認為1;maxSize 由mapred-site.xml文件中的配置項mapred.max.split.size決定,默認為9223 372 036 854 775 807;而blockSize是由hdfs-site.xml文件中的配置項dfs.block.size決定,默認為67 108 864字節(64M)。所以InputSplit的大小確定公式為:
max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size));
一般來說,dfs.block.size的大小是確定不變的,所以得到目標InputSplit大小,只需改變mapred.min.split.size 和 mapred.max.split.size 的大小即可。InputSplit的數量為文件大小除以InputSplitSize。InputSplit的原數據信息會通過一下代碼取得:
splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
從上面的代碼可以發現,元數據的信息由四部分組成:文件路徑,文件開始位置,文件結束位置,數據塊所在的host。
對於Map任務來說,處理的單位為一個InputSplit。而InputSplit是一個邏輯概念,InputSplit所包含的數據是仍然存儲在HDFS的塊里面,它們之間的關系如下圖所示:

當輸入文件切分為InputSplit后,由FileInputFormat的子類(如TextInputFormat)的createRecordReader方法將InputSplit解析為鍵值對,代碼如下:
public RecordReader<LongWritable, Text>createRecordReader(InputSplit split,TaskAttemptContext context) {String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");byte[] recordDelimiterBytes = null;if (null != delimiter)recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);return new LineRecordReader(recordDelimiterBytes);}
此處默認是將行號作為鍵。解析出來的鍵值對將被用來作為map函數的輸入。至此input階段結束。
3. map及中間結果的輸出
InputSplit將解析好的鍵值對交給用戶編寫的map函數處理,處理后的中間結果會寫到本地磁盤上,在刷寫磁盤的過程中,還做了partition(分區)和 sort(排序)的操作。
map函數產生輸出時,並不是簡單的刷寫磁盤。為了保證I/O效率,采取了先寫到內存的環形內存緩沖區,並做一次預排序,如下圖所示:


每個Map任務都有一個環形內存緩沖區,用於存儲map函數的輸出。默認情況下,緩沖區大小是100M,該值可以通過mapred-site.xml文件中的io.sort.mb的配置項配置。一旦緩沖區內容達到閾值(由mapred-site.xml文件的io.sort.spill.percent的值決定,默認為0.80 或者 80%),一個后台線程便會將緩沖區的內容溢寫到磁盤中。再寫磁盤的過程中,map函數的輸出繼續被寫到緩沖區,但如果在此期間緩沖區被填滿,map會阻塞直到寫磁盤過程完成。寫磁盤會以輪詢的方式寫到mapred.local.dir(mapred-site.xml文件的配置項)配置的作業特定目錄下。
在寫磁盤之前,線程會根據數據最終要傳入到的Reducer把緩沖區的數據划分成(默認是按照鍵)相應的分區。在每個分區中,后台線程按照建進行內排序,此時如果有一個Combiner,它會在排序后的輸出上運行。
一旦內存緩沖區達到溢出的閾值,就會新建一個溢出寫文件,因此在Map任務完成最后一個輸出記錄之后,會有若干個溢出寫文件。在Map任務完成之前,溢出寫文件被合並成一個已分區且已排序的輸出文件作為map輸出的中間結果,這也是Map任務的輸出結果。
如果已經指定Combiner且溢出寫次數至少為3時,Combiner就會在輸出文件寫到磁盤之前運行。如前文所述,Combiner可以多次運行,並不影響輸出結果。運行Combiner的意義在於使map輸出的中間結果更緊湊,使得寫到本地磁盤和傳給Reducer的數據更少。
為了提高磁盤IO性能,可以考慮壓縮map的輸出,這樣會寫磁盤的速度更快,節約磁盤空間,從而使傳送給Reducer的數據量減少。默認情況下,map的輸出是不壓縮的,但只要將mapred-site.xml文件的配置項mapred.compress.map.output設為true即可開啟壓縮功能。使用的壓縮庫由mapred-site.xml文件的配置項mapred.map.output.compression.codec
指定,如下列出了目前hadoop支持的壓縮格式:
| 壓縮格式 | 工具 | 算法 | 文件擴展名 | 是否包含多個文件 | 是否可切分 |
|---|---|---|---|---|---|
| DEFLATE* | N/A | DEFLATE | .deflate | 否 | 否 |
| Gzip | gzip | DEFLATE | .gz | 否 | 否 |
| bzip2 | bzip2 | bzip2 | .bz2 | 否 | 是 |
| LZO | Lzop | LZO | .lzo | 否 | 否 |
map輸出的中間結果存儲的格式為IFile,IFile是一種支持航壓縮的存儲格式,支持上述壓縮算法。
Reducer通過Http方式得到輸出文件的分區。將map輸出的中間結果發送到Reducer的工作線程的數量由mapred-site.xml文件的tasktracker.http.threds配置項決定,此配置針對每個節點,而不是每個Map任務,默認是40,可以根據作業大小,集群規模以及節點的計算能力而增大。
4. shuffle
shuffle,也叫數據清洗。在某些語境下,代表map函數產生輸出到reduce的消化輸入的整個過程。
4.1 copy階段
Map任務輸出的結果位於Map任務的TaskTracker所在的節點的本地磁盤上。TaskTracker需要為這些分區文件(map輸出)運行Reduce任務。但是,Reduce任務可能需要多個Map任務的輸出作為其特殊的分區文件。每個Map任務的完成時間可能不同,當只要有一個任務完成,Reduce任務就開始復制其輸出。這就是shuffle的copy階段。如下圖所示,Reduce任務有少量復制線程,可以並行取得Map任務的輸出,默認值為5個線程,該值可以通過設置mapred-site.xml的mapred.reduce.parallel.copies的配置項來改變。

如果map輸出相當小,則會被復制到Reduce所在TaskTracker的內存的緩沖區中,緩沖區的大小由mapred-site.xml文件中的mapred.job.shuffle.input.buffer.percent配置項指定。否則,map輸出將會被復制到磁盤。一旦內存緩沖區達到閾值大小(由mapred-site.xml文件mapred.job.shuffle.merge.percent配置項決定)或緩沖區的文件數達到閾值大小(由mapred-site.xml文件mapred.inmem.merge.threshold配置項決定),則合並后溢寫到磁盤中。
4.2 sort階段
隨着溢寫到磁盤的文件增多,shuffle進行sort階段。這個階段將合並map的輸出文件,並維持其順序排序,其實做的是歸並排序。排序的過程是循環進行,如果有50個map的輸出文件,而合並因子(由mapred-site.xml文件的io.sort.factor配置項決定,默認為10)為10,合並操作將進行5次,每次將10個文件合並成一個文件,最后有5個文件,這5個文件由於不滿足合並條件(文件數小於合並因子),則不會進行合並,將會直接把5個文件交給Reduce函數處理。到此shuffle階段完成。
從shuffle的過程可以看出,Map任務處理的是一個InputSplit,而Reduce任務處理的是所有Map任務同一個分區的中間結果。
5. reduce及最后結果的輸出
reduce階段操作的實質就是對經過shuffle處理后的文件調用reduce函數處理。由於經過了shuffle的處理,文件都是按鍵分區且有序,對相同分區的文件調用一次reduce函數處理。
與map的中間結果不同的是,reduce的輸出一般為HDFS。
6. sort
排序貫穿於Map任務和Reduce任務,排序操作屬於MapReduce計算框架的默認行為,不管流程是否需要,都會進行排序。在MapReduce計算框架中,主要用到了兩種排序算法:快速排序和歸並排序。
在Map任務和Reduce任務的過程中,一共發生了3次排序操作。
(1)當map函數產生輸出時,會首先寫入內存的環形緩沖區,當達到設定的閾值,在刷寫磁盤之前,后台線程會將緩沖區的數據划分相應的分區。在每個分區中,后台線程按鍵進行內排序。如下圖所示。
(2)在Map任務完成之前,磁盤上存在多個已經分好區,並排好序,大小和緩沖區一樣的溢寫文件,這時溢寫文件將被合並成一個已分區且已排序的輸出文件。由於溢寫文件已經經過一次排序,所以合並文件時只需再做一次排序就可使輸出文件整體有序。如下圖所示。


(3)在shuffle階段,需要將多個Map任務的輸出文件合並,由於經過第二次排序,所以合並文件時只需在做一次排序就可以使輸出文件整體有序。


在這3次排序中第一次是在內存緩沖區做的內排序,使用的算法是快速排序;第二次排序和第三次排序都是在文件合並階段發生的,使用的是歸並排序。
7. 作業的進度組成
一個MapReduce作業在Hadoop上運行時,客戶端的屏幕通常會打印作業日志,如下:


對於一個大型的MapReduce作業來說,執行時間可能會比較比較長,通過日志了解作業的運行狀態和作業進度是非常重要的。對於Map來說,進度代表實際處理輸入所占比例,例如 map 60% reduce 0% 表示Map任務已經處理了作業輸入文件的60%,而Reduce任務還沒有開始。而對於Reduce的進度來說,情況比較復雜,從前面得知,reduce階段分為copy,sort 和 reduce,這三個步驟共同組成了reduce的進度,各占1/3。如果reduce已經處理了2/3的輸入,那么整個reduce的進度應該為1/3 + 1/3 + 1/3 * (2/3) = 5/9 ,因為reduce開始處理時,copy和sort已經完成。
來源於:《Hadoop 海量數據處理》
轉: http://blog.csdn.net/sunnyyoona/article/details/53939546
