MapReduce編程模型及其在Hadoop上的實現


轉自:https://www.zybuluo.com/frank-shaw/note/206604

MapReduce基本過程

關於MapReduce中數據流的傳輸過程,下圖是一個經典演示: 
mr1.png-33.9kB

關於上圖,可以做出以下逐步分析:

  1. 輸入數據(待處理)首先會被切割分片,每一個分片都會復制多份到HDFS中。上圖默認的是分片已經存在於HDFS中。
  2. Hadoop會在存儲有輸入數據分片(HDFS中的數據)的節點上運行map任務,可以獲得最佳性能(數據TaskTracker優化,節省帶寬)。
  3. 在運行完map任務之后,可以看到數據並不是存回HDFS中,而是直接存在了本地磁盤上,因為map輸出數據是中間數據,該中間數據由reduce任務處理之后才會產生最終輸出結果,reduce任務完成之后這些數據是要被刪除掉的。
  4. map的輸出結果會在本地進行分區,並進行排序,這是為之后的reduce階段做准備。分區方法常用的是對key值進行Hash轉換之后求模,這樣就可以將相同key值的數據放在同一個分區,reduce階段同一個分區的數據會被安排到同一個reduce中。
  5. 如果有必要,可以再map階段設置combine方法,combine方法與reduce方法的函數體是同一個(做的事是一樣的),只不過combine方法針對的對象只是當前map中key值相同的數據,而reduce方法處理的是所有輸入數據中相同的key對應的數據。也就是說,它只是reduce的一個小分身。這么做的目的是為了減輕從map階段傳送到reduce階段的IO傳送負擔,也是節省帶寬的一種方式(做了combine優化之后,傳送的數據量會大大減少)。
  6. 每一個reduce都會將所有map對應分區的數據通過IO復制過來,進行合並。合並的過程包含排序的過程,因為要將相同的key值對應的數據統一處理。在reduce計算階段,reduce的輸入鍵是key,而輸入值是相同的key數據對應的value所構成的一個迭代器數據結構。
  7. 經過reduce處理之后,最后的輸出結果就是我們想要的結果。該輸出會存儲在HDFS中,第一塊副本存儲在本地節點上,其他副本存儲在其他機架節點中。進一步,可以將這些輸出結果作為另一個MapRuce任務的輸入,進行更多的任務計算。

OK,大致的步驟就是這樣。這里面有很多實踐上的細節值得注意。本人經驗為0,說不出任何實際的經驗之談。下一節寫的只是嘗試解決自己在認知過程中冒出的一些疑惑,查閱網上資料之后得到的一些解惑。

 

疑問及解答

1. 如何設置分片的數量?

已知map的數量與分片的數量相同(一般情況下都是這樣)。如果分片數量太少(每個分片很大),那么mapper的數量就會太少,整個任務的執行就會慢。如果分片分得太多(每個分片很小),那么管理分片的總時間和構建map任務的總時間將決定着整個任務的執行時間。由此看來,合理設置一個分片的大小很重要。 
在《Hadoop權威指南 第四版英文版》一書中,有講到:一個合理的分片大小趨向於HDFS的一個塊的大小(最新的版本一個塊大小為128M)。當然我們可以自己設置分片的大小。

在新版的org.apache.hadoop.mapred.InputFormat抽象類中,有getSplits(JobConf job, int numSplits)方法,其中的numSplits只是一個hint,實際上的分片數量計算公式是這樣的:

minSize=max{minSplitSize,mapred.min.split.size} (minSplitSize大小默認為1B) 
maxSize=mapred.max.split.size(不在配置文件中指定時大小為Long.MAX_VALUE) 
splitSize=max{minSize,min{maxSize,blockSize}}

其中mapred.min.split.size、mapred.max.split.size、blockSize均可以再配置文件中配置,前面兩個在mapred-site.xml中,最后一個可在在hdfs-site.xml中進行配置,單位均為B。

可以看到,如果參數mapred.min.split.size、mapred.max.split.size都設置為默認值的話,那么splitSize的大小即為blockSize的大小。

實際運算過程中還會有一些小小的優化:

文件大小/splitSize>1.1,創建一個split0,文件剩余大小=文件大小-splitSize

   .....

剩余文件大小/splitSize<= 1.1 將剩余的部分作為一個split

每一個分片對應一個map任務,這樣map任務的數目也就顯而易見啦。

參考:http://www.cnblogs.com/yueliming/p/3251285.html

2.是不是一個map就一台機器呢?

這就是小白的問題了。其實並不是一個map一台機器。一個TaskTracker(具體含義下面會講到)就是一台機器,而每一個TaskTracker都有固定數量的map槽與reduce槽。也就是說,一台機器上面有可能會有多個map與reduce同時運行,當然這些map與reduce之間可能運行的任務都是不同的。

 

MapReduce在Hadoop上的具體實現

可以說以上編程模型其實就是Google提出來的最基本的MapReduce編程模型, 來源於谷歌論文《MapReduce: Simplified Data Processing on Large Clusters》。但是在Hadoop上的具體實現是怎樣的呢?這個我們同樣需要了解。來看一下這張圖: 

這個實現機制就是MapReduce1,在Hadoop2.x的時候實現機制變成了YARN。了解MapReduce1對於我們理解Hadoop非常有幫助,晚些時候會寫一篇專門關於YARN的文章。

如果細看,可以發現,MapReduce1實現圖其實與一開始的MapReduce的工作流程總體是一致的,只不過多了JobTracker、TaskTracker以及Client這幾個角色。map和reduce任務分配給了多個TaskTracker來執行。這幾個角色非常重要,有必要詳細了解。

 

關於client

客戶端(client):這個是程序員主要工作的部分,工作分別是編寫mapreduce程序,配置相應的文件信息,提交作業。如果出現錯誤了,需要找出錯誤,修改程序,直到完美運行。

 

JobTracker與TaskTracker介紹

JobTracker與TaskTracker之間服從的是主從結構。從圖中可以看到:主節點JobTracker只有一個,而從節點TaskTracker有很多個。

 

JobTracker負責:

  • 接收客戶提交的計算任務
  • 把計算任務分配給TaskTracker執行
  • 監控TaskTracker的執行情況
 

TaskTrackers負責:

  • 完成JobTracker分配的計算任務

JobTracker與TaskTrackers之間的關系就是項目經理與開發人員的關系。項目經理接到用戶的需求清單,那么將用戶的需求分配給開發人員來完成。

 

實現機制

了解了如何從MapReduce遷移到JobTracker TaskTrackers之后,我們來詳細講講其中的實現機制(下面講到的每一點都對應圖上的相應數字):

 

作業的提交

1.寫好的一個MapReduce程序就是一個job。點擊運行。此時會生成一個JobClient,它會做一系列的准備工作,當准備工作做好了之后,才會向JobTracker提交任務。

2.JobClient向JobTracker請求一個新的job ID。與此同時,JobClient會先做如下檢查:

  • 檢查作業的輸出目錄,如果未指定或已存在則不提交作業並拋錯誤給程序;
  • 檢查輸入目錄是否存在,如果不存在同樣拋出錯誤;如果存在,JobTracker會根據輸入計算輸入分片(Input Split)並生成分片,如果分片計算不出來也會拋出錯誤。

3.JobClient將運行作業所需要的資源(包括作業jar文件,配置文件和計算所得的輸入分片)復制到JobTracker的文件系統中以job ID命名的目錄下(即HDFS中)。值得注意的是,作業jar副本較多(默認mapred.submit.replication = 10)。

4.上面的准備工作做好了之后,它會給JobTracker提交任務(它會告知JobTracker:大哥,我們這邊准備好了,隨時可以戰斗。哎呀呀,逗比了)。

 

作業的初始化

5.JobTracker接收到作業提交信息后,將其放入內部隊列,交由job scheduler進行調度,並對其進行初始化(初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程)。

6.初始化完畢后,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。關於分片的數量問題(即map數量),前面已經有提及。關於reduce數量,則是由用戶在配置文件里指定的。

除了map和reduce任務,還有setupJob和cleanupJob需要建立:由每一個TaskTrackers在所有map開始前和所有reduce結束后分別執行。setupJob()創建輸出目錄和任務的臨時工作目錄,cleanupJob()刪除臨時工作目錄。

 

作業的分配

7.每個TaskTracker定期發送心跳給JobTracker,告知自己還活着,並附帶消息說明自己是否已准備好接受新任務。JobTracker以此來分配任務,並使用心跳的返回值與TaskTracker通信。JobTracker利用調度算法先選擇一個job然后再選此job的一個task分配給TaskTracker.

每個TaskTracker會有固定數量的map和reduce任務槽,數量有TaskTracker核的數量和內存大小來決定。JobTracker會先將TaskTracker的所有的map槽填滿,然后才填此TaskTracker的reduce任務槽。 
JobTracker分配map任務時會選取與輸入分片最近的TaskTracker,即數據TaskTracker優化。在分配reduce任務用不着考慮數據TaskTracker。

 

任務的執行

8.TaskTracker分配到一個任務后,首先從HDFS中把作業的jar文件及運行所需要的全部文件(DistributedCache設置的)復制到TaskTracker**本地**。接下來TaskTracker為任務新建一個本地工作目錄,並把jar文件的內容解壓到這個文件夾下(此時需要用到的就是前面提及的setupJob(),其作用是創建輸出目錄和任務的臨時工作目錄)。

9.TaskTracker新建一個taskRunner實例來運行該任務。

10.TaskRunner啟動一個新的JVM來運行每個任務。

此時圖中顯示的所有動作都已經寫下來了。只不過還有一些細節需要把握。請看下面:

 

進度和狀態的更新

Child JVM有獨立的線程每隔3秒檢查任務更新標志,如果有更新就會報告給此TaskTracker; 
TaskTracker每隔5秒給JobTracker發心跳;(當然這個時間可以設置) 
job tracker合並這些更新,產生一個表明所有運行作業及其任務狀態的全局視圖。 
JobClient.monitorAndPrintJob()每秒查詢這些信息。

 

作業的完成

當JobTracker收到最后一個任務(this will be the special job cleanup task)的完成報告后,便把job狀態設置為successful。Job得到完成信息便從waitForCompletion()返回。 
最后,JobTracker清空作業的工作狀態,並指示TaskTracker也清空作業的工作狀態(如刪除中間輸出)。

 

失敗處理機制

分布式計算過程中節點失敗是很常見的。作為一個成熟的實現機制,應該有一套完善的失敗處理機制。

在Hadoop的MapReduce1架構中常見失敗有三種:任務失敗、TaskTracker失敗、JobTracker失敗。

 

任務失敗

  • 子任務失敗。當map或者reduce子任務中的代碼拋出異常,JVM進程會在退出之前向服進程TaskTracker進程發送錯誤報告,TaskTracker會將此(任務嘗試)task attempt標記為failed狀態,釋放一個槽以便運行另外一個任務。
  • jvm失敗。JVM突然退出,即JVM錯誤,這時TaskTracker會注意到進程已經退出,標記為failed。

值得注意的是: 
1)任務失敗有重試機制,重試次數map任務設置是mapred.map.max.attempts屬性控制,reduce是mapred.reduce.max.attempts屬性控制。 
2)一些job可以完成任務總體的一部分就能夠接受,這個百分比由mapred.map.failures.precent和mapred.reduce.failures.precent參數控制。 
3)任務嘗試(task attempt)是可以中止(killed)的。

 

TaskTracker失敗

作業運行期間,TaskTracker會通過心跳機制不斷與系統JobTracker通信,如果某個TaskTracker運行緩慢、失敗或者出現故障,TaskTracker就會停止或者很少向JobTracker發送心跳,JobTracker會注意到此TaskTracker發送心跳的情況,從而將此TaskTracker從等待任務調度的TaskTracker池中移除。

由於TaskTracker中包含有一定數量的map和reduce子任務,這個時候這些子任務怎么處理呢?

1) 如果是map並且成功完成的話, JobTracker會安排此TaskTracker上一成功運行的map任務返回。 
2) 如果是reduce並且成功的話,數據直接使用,因為reduce只要執行完了的就會把輸出寫到HDFS上。 
3) 如果他們屬於未完成的作業的話,reduce階段無法獲取該TaskTracker上的本地map輸出文件,任何任務都需要重新調度。

另外,即使TaskTracker沒有失敗,如果其上的失敗子任務遠遠高於集群的平均失敗子任務數,也會被列入黑名單。可以通過重啟從JobTracker的黑名單移除。

###jobtracker失敗 
jobtracker失敗應該說是最嚴重的一種失敗方式了,而且在Hadoop中存在單點故障的情況下是相當嚴重的,因為在這種情況下作業會最終失敗,盡管這種故障的概率極小。

參考: 
http://blog.csdn.net/thomas0yang/article/details/41211101 
http://www.cnblogs.com/sharpxiajun/p/3151395.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM