序言
本文內容是整理和收集MapReduce基本問題,材料源於各類平台文章中,本文編寫僅作為學習參看使用。
其中有CSDN博主「Dota_Data」,原文鏈接:https://blog.csdn.net/dota_data/article/details/93342876
一、MapReduce基本常識
shuffle流程概括
- 其在MapReduce中所處的工作階段是map輸出后到reduce接收前,具體可以分為map端和reduce端前后兩個部分。
- 因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩沖區”,在寫入的過程中進行分區(partition),也就是對於每個鍵值對來說,都增加了一個partition屬性值,然后連同鍵值對一起序列化成字節數組寫入到緩沖區(緩沖區采用的就是字節數組,默認大小為100M)。
- 當寫入的數據量達到預先設置的闕值后(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%)便會啟動溢寫出線程將緩沖區中的那部分數據溢出寫(spill)到磁盤的臨時文件中,並在寫入前根據key進行排序(sort)和合並(combine,可選操作)。
- 當整個map任務完成溢出寫后,會對磁盤中這個map任務產生的所有臨時文件(spill文件)進行歸並(merge)操作生成最終的正式輸出文件,此時的歸並是將所有spill文件中的相同partition合並到一起,並對各個partition中的數據再進行一次排序(sort),生成key和對應的value-list
- 文件歸並時,如果溢寫文件數量超過參數min.num.spills.for.combine的值(默認為3)時,可以再次進行合並。
- 對於reduce端的shuffle過程來說,reduce task在執行之前的工作就是不斷地拉取當前job里每個map task的最終結果,然后對從不同地方拉取過來的數據不斷地做merge最后合並成一個分區相同的大文件,然后對這個文件中的鍵值對按照key進行sort排序,排好序之后緊接着進行分組,分組完成后才將整個文件交給reduce task處理。
二、MapReduce要點
1、combiner的組件需要注意什么?
因為combiner在MapReduce過程中可能調用也可能不調用,可能調用一次也可能調用多次,無法確定和控制。
所以,combiner的使用原則是:有或沒有都不能影響業務邏輯,是不是用combiner都不能影響最終reducer的結果。而且,combiner的輸出kv應該跟reducer的輸入kv對應起來。因為有時使用combiner不當的話會對統計結果造成錯誤結局,還不如不用。比如對所有數求平均數:
Mapper端使用combiner
3 5 7 ->(3+5+7)/3=5
2 6 -> ( 2+6)/3=4
Reducer
(5+4)/2=9/2≈4.5 不等於 (3+%+7+2+6)/5=23/5≈4.6
2、對於MapReduce的各個階段你覺得有什么優化空間?
-
數據輸入:默認情況下TextInputFormat對任務的切片是按文件切,無論文件大小,都會給一個單獨的切面,交給一個maptask,這時如果輸入的是大量小文件,就會產生大量的maptask,處理效率極低。最好的解決方法就是在預處理階段將小文件合並,再上傳到HDFS處理分析。但如果已經上傳到HDFS了,就可以用另一種切片方法來補救,CombineTextinputFormat,它的切片邏輯和TextInputFormat不同,可以將多個小文件從邏輯上規划到一個切片中,然后把這些小文件交給maptask。
-
運行時間:啟動一個MapReduce任務,map階段和reduce階段都會有並行的task共同處理任務,這些task都需要開jvm,然后初始化,而這些jvm很花費空間的,如果運行一個20-30s的任務需要進行開啟,初始化,停止jvm操作很是浪費,所以我們應該盡量吧數據量控制在能讓每個task運行1分鍾以上。
-
數據傾斜:可以通過對原始數據進行抽樣得到結果集來預設分區。
3、MapReduce怎么實現TopN?
可以自定義GroupingComparator,對結果進行最大值排序,然后再reduce輸出時,控制只輸出前n個數。就達到了TopN輸出的目的。
4、如何使用MapReduce實現兩表的join?
-
reduce side join:在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源key/value數據對,沒條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。
-
map side join:map side join 是針對一下場景進行的優化。兩個待連接的表中,有一個表非常大,而另一個非常小,以至於小表可以直接存放到內存中。這樣,我們可以將小表復制多份,讓每一個map task內存中存在一份(比如放在hash table中),然后只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否具有相同key的記錄,入股有,則連接后輸出即可。
5、MapReduce中是如何定義並行度的?
一個job的map階段並行度由客戶端提交的job決定。客戶端對map階段並行度的規划邏輯為:
將待處理數據執行邏輯切片。按照一個特定的切片大小,將待處理數據划分成邏輯上的多個split,然后每一個split分配一個maptask實例,並進行處理。
Reducetask並行度同樣影響整個job的執行並發度和執行效率,與maptask的並發度由切片數決定不同,Reducetask 數據的決定是可以直接手動設置:job.setNumReduceTask(4)。