對於MapReduce編程,大概率的流程用過的人或多或少都清楚,但是歸結到細節上,就有的地方不清楚了,下面根據自己的疑問,加上從網上各處,找到的被人的描述,最自己的疑問做出回答。
1. MapReduce 和 HDFS有什么關系?
首先,HDFS和MapReduce是Hadoop最核心的設計;
對於HDFS,即Hadoop Distributed File System,它是Hadoop的存儲基礎,是數據層面的,提供海量的數據存儲;而MapReduce,則是一種引擎或者編程模型,可以理解為數據的上一層,我們可以通過編寫MapReduce程序,對海量的數據進行計算處理。這就類似於我們通過 檢索(MapReduce)所有文件(HDFS),找到我們想要的結果。
其次,MapReduce中JobTracker和TaskTracker分別對應於HDFS中的NameNode和DataNode
2. MapReduce處理中,數據的流程是什么?
在客戶端、JobTracker、TaskTracker的層次來分析MapReduce的工作原理的:
a. 在客戶端啟動一個作業。
b. 向JobTracker請求一個Job ID。
c. 將運行作業所需要的資源文件復制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入划分信息。這些文件都存放在JobTracker專門為該作業創建的文件夾中。文件夾名為該作業的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制);輸入划分信息告訴了JobTracker應該為這個作業啟動多少個map任務等信息。
d. JobTracker接收到作業后,將其放在一個作業隊列里,等待作業調度器對其進行調度(這里是不是很像微機中的進程調度呢,呵呵),當作業調度器根據自己的調度算法調度到該作業時,會根據輸入划分信息為每個划分創建一個map任務,並將map任務分配給TaskTracker執行。對於map和reduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽。這里需要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這里有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包復制到該TaskTracker上來運行,這叫“運算移動,數據不移動”。而分配reduce任務時並不考慮數據本地化。
e. TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶着很多的信息,比如當前map任務完成的進度等信息。當JobTracker收到作業的最后一個任務完成信息時,便把該作業設置成“成功”。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。
如果具體從map端和reduce端分析,可以參考上面的圖片,具體如下:
Map端:
a. 每個輸入分片會讓一個map任務來處理,默認情況下,以HDFS的一個塊的大小(默認為64M)為一個分片,當然我們也可以設置塊的大小。map輸出的結果會暫且放在一個環形內存緩沖區中(該緩沖區的大小默認為100M,由io.sort.mb屬性控制),當該緩沖區快要溢出時(默認為緩沖區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中創建一個溢出文件,將該緩沖區中的數據寫入這個文件。
b. 在寫入磁盤之前,線程首先根據reduce任務的數目將數據划分為相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣做是為了避免有些reduce任務分配到大量數據,而有些reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。然后對每個分區中的數據進行排序,如果此時設置了Combiner,將排序后的結果進行Combia操作,這樣做的目的是讓盡可能少的數據寫入到磁盤。
c. 當map任務輸出最后一個記錄時,可能會有很多的溢出文件,這時需要將這些文件合並。合並的過程中會不斷地進行排序和combia操作,目的有兩個:1.盡量減少每次寫入磁盤的數據量;2.盡量減少下一復制階段網絡傳輸的數據量。最后合並成了一個已分區且已排序的文件。為了減少網絡傳輸的數據量,這里可以將數據壓縮,只要將mapred.compress.map.out設置為true就可以了。
d. 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎么知道它對應的reduce是哪個呢?其實map任務一直和其父TaskTracker保持聯系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就ok了哦。
到這里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是“洗牌”,其實Shuffle也是一個很復雜的過程,這里可以參照下面的第三個問題。
Reduce端:
a.Reduce會接收到不同map任務傳來的數據,並且每個map傳來的數據都是有序的。如果reduce端接受的數據量相當小,則直接存儲在內存中(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間的百分比),如果數據量超過了該緩沖區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對數據合並后溢寫到磁盤中。
b.隨着溢寫文件的增多,后台線程會將它們合並成一個更大的有序的文件,這樣做是為了給后面的合並節省時間。其實不管在map端還是reduce端,MapReduce都是反復地執行排序,合並操作,現在終於明白了有些人為什么會說:排序是hadoop的靈魂。
c.合並的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據盡可能地少,並且最后一次合並的結果並沒有寫入磁盤,而是直接輸入到reduce函數。
3. Map處理數據后,到Reduce得到數據之前,數據的流程是什么?
其實,將map處理的結果,傳輸到reduce上的過程,在MapReduce中,可以看做shuffle的過程。
在map端,每個map task都有一個內存緩沖區,存儲着map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合並,生成最終的正式輸出文件,然后等待reduce task來拉數據。
a. 在map task執行時,它的輸入數據來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關系可能是多對一,默認是一對一。
b. 在經過mapper的運行后,我們得知mapper的輸出是這樣一個key/value對。到底當前的key應該交由哪個reduce去做呢,是需要現在決定的。 MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。默認對key hash后再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制並設置到job上。接下來,需要將數據寫入內存緩沖區中,緩沖區的作用是批量收集map結果,減少磁盤IO的影響。我們的key/value對以及Partition的結果都會被寫入緩沖區。當然寫入之前,key與value值都會被序列化成字節數組。
c. 這個內存緩沖區是有大小限制的,默認是100MB。當map task的輸出結果很多時,就可能會撐爆內存,所以需要在一定條件下將緩沖區中的數據臨時寫入磁盤,然后重新利用這塊緩沖區。這個從內存往磁盤寫數據的過程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,所以整個緩沖區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩沖區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB內存中寫,互不影響。
當溢寫線程啟動后,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這里的排序也是對序列化的字節做的排序。 存緩沖區沒有對將發送到相同reduce端的數據做合並,那么這種合並應該是體現是磁盤文件中的。從官方圖上也可以看到寫到磁盤中的溢寫文件是對不同的reduce端的數值做過合並。所以溢寫過程一個很重要的細節在於,如果有很多個key/value對需要發送到某個reduce端去,那么需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。
在map端的過程,可參考下圖:
至此,map端的所有工作都已結束,最終生成的這個文件也存放在TaskTracker夠得着的某個本地目錄內。每個reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息,如果reduce task得到通知,獲知某台TaskTracker上的map task執行完成,Shuffle的后半段過程開始啟動。
簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job里每個map task的最終結果,然后對從不同地方拉取過來的數據不斷地做merge,也最終形成一個文件作為reduce task的輸入文件。見下圖:
a. Copy過程,簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。
b. Merge階段。這里的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩沖區中,這里的緩沖區大小要比map端的更為靈活,它基於JVM的heap size設置,因為Shuffle階段Reducer不運行,所以應該把絕大部分的內存都給Shuffle用。這里需要強調的是,merge有三種形式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認情況下第一種形式不啟用,讓人比較困惑,是吧。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。
c. Reducer的輸入文件。不斷地merge后,最后會生成一個“最終文件”。為什么加引號?因為這個文件可能存在於磁盤上,也可能存在於內存中。對我們來說,當然希望它存放於內存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。然后就是Reducer執行,把結果放到HDFS上。
上面的文章,部分是自己寫的,一部分摘自別人通俗易懂的理解,具體可參考:
http://weixiaolu.iteye.com/blog/1474172
http://langyu.iteye.com/blog/992916