1. MapReduce - 映射、化簡編程模型
1.1 MapReduce 的概念
1.1.1 map 和 reduce
1.1.2 shufftle 和 排序
MapReduce 保證每個 reducer 的輸入都已經按鍵排序。
1.1.3 MapReduce 類型和輸入輸出
MapReduce 中的 map 和 reduce 函數遵循以下形式:
map: (K1, V1) ----> list(K2, V2)
reduce: (K2, list(V2)) ----> list (K3, V3)
通常來說,map 函數輸入的鍵值對的類型與用於輸出的不同,然而,reduce 函數的輸入類型必須與 map 函數的輸出類型相一致。
1.1.4 一個示例
下面是一個示例。該實例從很多行的氣象數據中獲取每年的最高氣溫值。
(1)原始數據是氣象局的氣象數據:
我們以每一行在文件中的偏移量作為鍵,構建 map 函數需要的鍵/值對:
黑體部分說明的是年份和該年氣溫。
(2)map 函數的功能正是提取每一行的年份和氣溫,得到下面的list:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(3)map 函數的輸出先由 MapReduce 框架處理,然后再發給 reduce 函數。這一處理過程會根據鍵來對鍵/值對進行排序和分組,其結果是:
(1949, [111, 78])
(1950, [0, 22, -11])
每年的年份后都有一系列氣溫數據。
(4)所有reduce 函數限制必須重復這個列表並從中找出做大的讀數:
(1949, 111)
(1950, 22)
這是最后的輸出:全球氣溫記錄中每年的最高氣溫。 下面是總體過程:
1.1.5 該示例的 Java 代碼實現
Mapper 的實現:
可見:
(1)map 方法的輸入是 <LongWritable key, Text value>,即長整形的 key 和文本類型的 value,對應的是氣象文件中的每一行的行偏移量和行的內容。
(2)map 方法的輸出是 <Text, IntWritable>,即年分和氣溫。
(3)該方法的功能就是對每一個鍵值對,取其年份值和氣溫值,然后寫入 output。
reduce 的實現:
可見:
(1)方法的輸入是 <Text key, Iterator<IntWritable> values>,即 年份值 和 該年份的氣溫值列表。
(2)方法的輸出是 <Text, IntWritable>,即 年分 和 計算得出的該年最該氣溫。
(3)該方法的功能就是求 values 中的最大值,然后寫入 output。
運行該 Job 的代碼:
可見:
(1)創建 JobConf 對象,它包括該 JOb 的各種參數。
(2)它指定輸出和輸出路徑。
(3)它指定 map 和 reduce 方法。
(4)它指定輸出的鍵值對類型。
(5)最后提交作業。
當我們在 Hadoop 集群運行該 Job 時,我們把代碼打包成一個 JAR 文件,然后 Hadoop 會在集群內分發這個包。
注意:以上代碼使用的是舊的 MR API。 Hadoop 在0.20.0 中包含了 最新版 API,它 包括一個全新的 MR Java API。該示例摘自《Hadoop 權威指南》一書。
1.2 分布式 MapReduce
前面的例子展示了 MR 針對小量輸入的工作原理,它使用的是本地文件系統中的文件。為了分不華,我們需要把數據存儲在分布式文件系統中,典型的如 HDFS,以允許 Hadoop 把 MR 的計算移到承載部分數據的各個機器。
1.2.1 基本概念
作業 Job:客戶端的執行單位,它包括輸入數據、MR 程序和配置信息。Hadoop 把 Job 分成若干小任務 task 來工作,其包括兩種類型的任務:map 任務和 reduce 任務。
控制節點:兩種類型的節點控制着作業執行過程:jobtracker 和 多個 tasktracker。jobtracker 通過調度任務在 tasktracker 上運行,來協調所有運行在系統上的作業。tasktracker 運行任務的同時,把進度報告給 jobtracker,jobtracker 記錄每項任務的總體運行情況。
輸入數據:Hadoop 把輸入數據划分成等長的小數據發給 MR,成為輸入分片 split。Hadoop 為每個 split 創建一個 map 任務,該任務會運行用戶自定義的 map 函數。map 任務的執行節點和輸入數據的存儲節點是同一節點時,Hadoop 的性能達到最佳。
map 的輸出:map 任務把輸出寫入本地硬盤,而不是 HDFS,這是因為 map 的輸出只是中間輸出,在 reduce 任務完成后作業完成時,map 的輸出會被刪除。
reduce 的過程:Map 任務結束后進入 Reduce 過程。每個 Reduce 任務包括 組合(shuffle)、排序(sort)和聚合數據(Reduce)三個階段。MR 框架根據中間結果中的鍵 key,將多個 mapper 產生的同一個 key 的中間結果通過 HTTP 協議傳給處理這個key 的 reducer。在組合和排序階段,將來自不同 mapper 具有相同 key 值得 <key,value>對合並到一起,最后將通過組合和排序后得到的 <key,(list of value)> 做為輸入送到 Reduce 方法中處理,將得到的結果寫入由 Hadoop 的分布式文件系統 HDFS 管理的輸出文件中。
comniner:為了減少 map 和 reduce 任務之間數據傳輸量,Hadoop 允許聲明一個 combiner。它運行在 map 的輸出之上,它的輸出作為 reduce 的輸入。
假設 1950、1951 年的氣象數據都由倆個 mappper 進行處理,它們的輸出分別是:
mapper1 的輸出:(1950, 0) (1950, 20) (1950, 10) (1951, 25) (1951, 15)
mapper2 的輸出:(1950, 25) (1950, 11) (1951, 8)
假設有兩個 reducer,分別處理 1950 年和 1951 年的 數據。
reducer1 的輸入: (1950, [0, 20, 10, 25, 11]),其輸出是 (1950, 25)。
reducer2 的輸入: (1951, [25, 15,8]),其輸出是 (1951, 25)。
有 combiner 的情況下,
mapper1 的輸出:(1950, 20)(1951, 25)
mapper2 的輸出:(1950, 25) (1951, 8)
兩個reducer 的輸入將是 (1950, [20, 25]) 和 (1951, [25, 8]) ,其輸出任然是 (1950, 25)和(1951, 25)。
1.2.2 運行過程
組件:
- JobClient : 每一個 job 都會在用戶端通過 JobClient 類將應用程序以及配置參數打包成 jar 文件存儲到 HDFS 中,並把路徑提交給 JobTracker,然后由 JobTracker 創建每個 task,並將它們分發到各個 tasktracker 服務中去執行。
- jobtracker:它是一個 Master 服務,負責調度 job 的每個 task 運行於 tasktracker 之上,並監控它們,如果發現有失敗的 task,就重新啟動它。一般應將 tasktracker 部署在單獨的機器上。它包含一個 job 隊列來跟蹤和調度所有 job。
- tasktracker:它是運行在多個節點上的 slaver 服務,負責直接執行每個 task。tasktracker 都需要運行在 HDFS 的 datanode 上。
簡要流程:
(1)JobTracker 一直在等待 JobClient 通過 RPC 提交作業。
(2) MR 自定義程序通過 JobClient.runJob(job)向 master 節點上的 JobTracker 提交 job。JobTracker 接到請求后將其放入到作業隊列中。
(3)TaskTracker 一直通過 RPC 向 JobTracker 發心跳 heartbeat 詢問有沒有任務可做。如果有,讓其派發任務給它執行。
(4)如果 JobTracker 的作業隊列不為空,JobTracker 會將任務派發給詢問它的 tasktracker。
(5)slave 節點上的 tasktracker 接到任務后在其本地發起 task 來執行任務。
詳細過程:
(2)向 jobtracker 請求一個新的作業 ID (通過調用 JobTracker 的 getNewJobID())(步驟2)。它會檢查作業的輸出說明,比如,如果沒有執行輸出目錄或者它已經存在,作業就不會被提交,並有錯誤返回給 MapReduce 程序;以及檢查作業的輸入划分,如果划分無法計算,或者輸入路徑不存在,作業就不會被提交,並有錯誤返回給 MapReduce 程序。
(3)將運行作業所需要的資源 - 包括作業 JAR 文件、配置文件和計算所得輸入划分 - 復制到一個以作業ID號命名的目錄中。(步驟3)
(4)告訴 jobtracker 作業准備執行(通過調用 JobTracker 的 submitJob()方法)(步驟4)
(5)JobTracker 接收到對其 submitJob 方法的調用后,會把此調用放入一個內部隊列中,交由作業調度器進行調度,並對其進行初始化。初始化包括創建一個代表該正在運行的作業的對象,它封裝任務和記錄信息,以便跟蹤任務的狀態和進程(步驟5)
(6)要創建運行任務列表,作業調度去首先從共享文件系統中獲取 JobClient 已計算好的輸入划分信息。(步驟6)然后為某個划分創建一個 map 任務。創建的 reduce 任務的數量由 JobConf 的 mapped.reduce.tasks 屬性決定。
(7)TaskTracker 執行一個簡單的循環,定期發送心跳方法調用 JobTracker。心跳方法告訴 jobtracker,tasktracker 是否還存活,同時也充當兩者之間的消息通道。做為心跳方法調用的一部分,tasktracker 會指明它是否已經准備運行新的任務,如果是,jobtracker 會為它分配一個任務,並使用心跳方法的返回值與 tasktracker 進行通信。
(8)現在 tasktracker 已經被分配了任務,下一步是運行任務。首先,它本地化作業的 JAR 文件,將它從共享文件系統復制到 tasktracker 所在的文件系統。同時,將應用程序所需要的全部文件從分布式緩存復制到本地磁盤。然后,為任務新建一個本地工作目錄,並把 JAR 文件中的內容解壓到這個文件夾下。第三步,新建一個 taskrunner 實例來運行任務。
(9)TaskRunner 啟動一個新的 Java 虛擬機。
(10)開始運行任務。新的虛擬機使得用戶定義的 map 和 rudue 函數的任何缺陷都不會影響 tasktracker。但是不同任務之間重用 JVM 還是可能的。
jobtracker 收到作業最后一個任務已經完成的通知后,便把作業的狀態設置為”成功“。然后,當 JobClient 查詢狀態時,它將得知任務已經成功完成,所以便顯示一條消息告知客戶,然后從 runJob 方法返回。
1.3 失敗處理
子任務失敗:最常見的是map 或者 reduce 任務中的代碼拋出運行時異常。如果發生這種情況,子任務 JVM 進程會再退出之前向其 tasktracker 父進程發送錯誤報告。除了寫日志外,tasktracker 會將此次任務嘗試標記為 failed,釋放一個slot以便運行另一個任務。
tasktracker 失敗:如果 tasktracker 停止向 jobtracker 發送心跳,jobtracker 會注意到此停止,它會將該 tasktracker 從等待任務調度的 tasktracker 池中刪除。任何進行中的任務都會被重新調度。
jobtracker 失敗:這是所有失敗中最嚴重的一種,目前該失敗是 HDFS 一個單點故障。
1.4 作業調度
早起版本的 Hadoop 使用一種非常簡單的方法來調度用戶作業:FIFO。現在,已經有多種調度器可選,比如一種叫做公平調度器(Fair Scheduler)的多用戶調度器。
1.5 MR 程序設計過程
(1)<key,value>對
這是MR 編程框架中基本的數據單元,其中 key 實現了 WritableComparable 接口, value 實現了 Writable 接口,這使得框架可對其序列號並可對 key 執行排序。
(2)數據輸入
InputFormat、InputSplit、RecordReader 是數據輸入的主要編程接口。InputFormat 主要實現的功能是將輸入數據分切成多塊,每個塊是 InputSplit 類型;而 RecordReader 負責將每個 InputSplit 塊分解成對個 <key 1, value 1>對傳給 Map。
(3)Mapper 階段
此階段涉及的主要編程接口有 Mapper,Reducer 和 Partioner。實現 Mapper 接口主要是其 map 方法,該方法主要用來處理輸入 <key 1,value 1>對並對其產生輸出 <key 2,value 2>對。在 map 處理過 <key 1,value 1>對之后,可實現一個 Combiner 類對 map 的輸出進行初步的規約操作,此類實現了 Reducer 接口。而 Partioner 主要根據 map 輸出的 <key 2,value 2> 對的值,將其分發給不同的 reduce 任務。
(4)Reducer 階段
此階段主要實現 Reduce 接口,主要是實現 reduce 方法。框架將map 輸出的中間結果根據相同的 key2組合成 <key 2,list(value 2)>對作為 reduce 方法的輸入數據並對其處理,同時產生 <key 3, value 3>對。
(5)數據輸出
此階段主要實現兩個編程接口,其中 FileOutputFormat 接口涌來將輸出數據輸出到文件;RecordWriter 接口負責輸出到一個<key,value>對。
2. Hadoop V1 中的 MapReduce 的實現
2.1 Hadoop V1 中 MapReduce 的實現
- NameNode 中記錄了文件是如何被拆分成 block 以及這些 block 都存儲到了那些DateNode節點。NameNode 同時保存了文件系統運行的狀態信息.
- DataNode中存儲的是被拆分的blocks.
- Secondary NameNode 幫助 NameNode收集文件系統運行的狀態信息.
- JobTracker 當有任務提交到 Hadoop 集群的時候負責 Job 的運行,負責調度多個TaskTracker.
- TaskTracker 負責某一個 map 或者 reduce 任務.
2.2 MapReduce 的資源管理
Hadoop V1 中,MapReduce 除了數據處理外,還兼具資源管理功能。
Hadoop 1.0 資源管理由兩部分組成:資源表示模型和資源分配模型,其中,資源表示模型用於描述資源的組織方式,Hadoop 1.0采用“槽位”(slot)組織各節點上的資源,而資源分配模型則決定如何將資源分配給各個作業/任務,在Hadoop中,這一部分由一個插拔式的調度器完成。
Hadoop引入了“slot”概念表示各個節點上的計算資源。為了簡化資源管理,Hadoop將各個節點上的資源(CPU、內存和磁盤等)等量切分成若干份,每一份用一個slot表示,同時規定一個task可根據實際需要占用多個slot 。通過引入“slot“這一概念,Hadoop將多維度資源抽象簡化成一種資源(即slot),從而大大簡化了資源管理問題。
更進一步說,slot相當於任務運行“許可證”,一個任務只有得到該“許可證”后,才能夠獲得運行的機會,這也意味着,每個節點上的slot數目決定了該節點上的最大允許的任務並發度。為了區分Map Task和Reduce Task所用資源量的差異,slot又被分為Map slot和Reduce slot兩種,它們分別只能被Map Task和Reduce Task使用。Hadoop集群管理員可根據各個節點硬件配置和應用特點為它們分配不同的map slot數(由參數mapred.tasktracker.map.tasks.maximum指定)和reduce slot數(由參數mapred.tasktrackerreduce.tasks.maximum指定)。
Hadoop 1.0中的資源管理存在以下幾個缺點:
(1)靜態資源配置。采用了靜態資源設置策略,即每個節點實現配置好可用的slot總數,這些slot數目一旦啟動后無法再動態修改。
(2)資源無法共享。Hadoop 1.0將slot分為Map slot和Reduce slot兩種,且不允許共享。對於一個作業,剛開始運行時,Map slot資源緊缺而Reduce slot空閑,當Map Task全部運行完成后,Reduce slot緊缺而Map slot空閑。很明顯,這種區分slot類別的資源管理方案在一定程度上降低了slot的利用率。
(3) 資源划分粒度過大。這種基於無類別slot的資源划分方法的划分粒度仍過於粗糙,往往會造成節點資源利用率過高或者過低 ,比如,管理員事先規划好一個slot代表2GB內存和1個CPU,如果一個應用程序的任務只需要1GB內存,則會產生“資源碎片”,從而降低集群資源的利用率,同樣,如果一個應用程序的任務需要3GB內存,則會隱式地搶占其他任務的資源,從而產生資源搶占現象,可能導致集群利用率過高。
(4) 沒引入有效的資源隔離機制。Hadoop 1.0僅采用了基於jvm的資源隔離機制,這種方式仍過於粗糙,很多資源,比如CPU,無法進行隔離,這會造成同一個節點上的任務之間干擾嚴重。
2.3 MapReduce 架構的局限
1. JobTracker 是 Map-reduce 的集中處理點,存在單點故障。
2. JobTracker 完成了太多的任務,造成了過多的資源消耗,當 map-reduce job 非常多的時候,會造成很大的內存開銷,潛在來說,也增加了 JobTracker fail 的風險,這也是業界普遍總結出老 Hadoop 的 Map-Reduce 只能支持 4000 節點主機的上限。
3. 在 TaskTracker 端,以 map/reduce task 的數目作為資源的表示過於簡單,沒有考慮到 cpu/ 內存的占用情況,如果兩個大內存消耗的 task 被調度到了一塊,很容易出現 OOM。
4. 在 TaskTracker 端,把資源強制划分為 map task slot 和 reduce task slot, 如果當系統中只有 map task 或者只有 reduce task 的時候,會造成資源的浪費,也就是前面提過的集群資源利用的問題。
5. 源代碼層面分析的時候,會發現代碼非常的難讀,常常因為一個 class 做了太多的事情,代碼量達 3000 多行,,造成 class 的任務不清晰,增加 bug 修復和版本維護的難度。
6. 從操作的角度來看,現在的 Hadoop MapReduce 框架在有任何重要的或者不重要的變化 ( 例如 bug 修復,性能提升和特性化 ) 時,都會強制進行系統級別的升級更新。更糟的是,它不管用戶的喜好,強制讓分布式集群系統的每一個用戶端同時更新。這些更新會讓用戶為了驗證他們之前的應用程序是不是適用新的 Hadoop 版本而浪費大量時間。

3. Hadoop 2.0 中的 MapReduce
- 受限的擴展性;
- JobTracker 單點故障;
- 難以支持MR之外的計算;
- 多計算框架各自為戰,數據共享困難,比如MR(離線計算框架),Storm實時計算框架,Spark內存計算框架很難部署在同一個集群上,導致數據共享困難等
- 新的資源管理器 YARN 全局管理所有應用程序計算資源的分配,每一個應用的 ApplicationMaster 負責相應的調度和協調。一個應用程序無非是一個單獨的傳統的 MapReduce 任務或者是一個 DAG( 有向無環圖 ) 任務。ResourceManager 和每一台機器的節點管理服務器能夠管理用戶在那台機器上的進程並能對計算進行組織。
- 任務的調度和監控依然由 MapReduce 負責。
