1、Map任務處理
1.1 讀取HDFS中的文件。每一行解析成一個<k,v>。每一個鍵值對調用一次map函數。
<0,hello you> <10,hello me>
1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。
<hello,1> <you,1> <hello,1> <me,1>
1.3 對1.2輸出的<k,v>進行分區。默認用hash分區。
1.4 對不同分區中的數據進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。
排序后:<hello,{1,1}><me,{1}><you,{1}>
1.5 (可選)對分組后的數據進行歸約。
2、Reduce任務處理
2.1 多個map任務的輸出,按照不同的分區,通過網絡copy到不同的reduce節點上。(可以進行shuffle)
2.2 對多個map的輸出進行合並、排序。覆蓋reduce函數,接收的是分組后的數據,實現自己的業務邏輯,
<hello,2> <me,1> <you,1>
處理后,產生新的<k,v>輸出。
2.3 對reduce輸出的<k,v>寫到HDFS中。
shuffle可分為map端和reduce端
Map端:
1、在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的數據,每一個InputSplit都會分配一個Mapper任務,Mapper任務結束后產生<K2,V2>的輸出,這些輸出先存放在緩存中,每個map有一個環形內存緩沖區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個后台線程就把內容寫到(spill)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。(注意:map過程的輸出是寫入本地磁盤而不是HDFS,但是一開始數據並不是直接寫入磁盤而是緩沖在內存中,緩存的好處就是減少磁盤I/O的開銷,提高合並和排序的速度。又因為默認的內存緩沖大小是100M(當然這個是可以配置的),所以在編寫map函數的時候要盡量減少內存的使用,為shuffle過程預留更多的內存,因為該過程是最耗時的過程。)
2、寫磁盤前,要進行partition、sort和combine等操作。通過分區,將不同類型的數據分開處理,之后對不同分區的數據進行排序,如果有Combiner,還要對排序后的數據進行combine。等最后記錄寫完,將全部溢出文件合並為一個分區且排序的文件。(注意:在寫磁盤的時候采用壓縮的方式將map的輸出結果進行壓縮是一個減少網絡開銷很有效的方法!)
3、最后將磁盤中的數據送到Reduce中,從圖中可以看出Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其他Reducer任務中。而圖示的Reducer任務的其他的三個輸入則來自其他節點的Map輸出。
Reduce端:
1、Copy階段:Reducer通過Http方式得到輸出文件的分區。
reduce端可能從n個map的結果中獲取數據,而這些map的執行速度不盡相同,當其中一個map運行結束時,reduce就會從JobTracker中獲取該信息。map運行結束后TaskTracker會得到消息,進而將消息匯報給 JobTracker,reduce定時從JobTracker獲取該信息,reduce端默認有5個數據復制線程從map端復制數據。
2、Merge階段:如果形成多個磁盤文件會進行合並
從map端復制來的數據首先寫到reduce端的緩存中,同樣緩存占用到達一定閾值后會將數據寫到磁盤中,同樣會進行partition、combine、排序等過程。如果形成了多個磁盤文件還會進行合並,最后一次合並的結果作為reduce的輸入而不是寫入到磁盤中。
3、Reducer的參數:最后將合並后的結果作為輸入傳入Reduce任務中。(注意:當Reducer的輸入文件確定后,整個Shuffle操作才最終結束。之后就是Reducer的執行了,最后Reducer會把結果存到HDFS上。)
