Hadoop MapReduce 一文詳解MapReduce及工作機制


@

前言-MR概述

MapReduce是一個分布式計算框架,是用戶開發“基於Hadoop的數據分析應用”的核心框架。主要由兩部分組成:編程模型和運行時環 境。其中,編程模型為用戶提供了非常易用的編程接口,用戶只需要像編寫串行程序 一樣實現幾個簡單的函數即可實現一個分布式程序,而其他比較復雜的工作,如節點 間的通信、節點失效、數據切分等,全部由MapReduce運行時環境完成,用戶無須 關心這些細節。

1.Hadoop MapReduce設計思想及優缺點

設計思想

Hadoop MapReduce誕生於搜索領域,主要解決搜索引擎面臨的海量數據處理擴展性差的問 題。它的實現很大程度上借鑒了谷歌MapReduce的設計思想,包括簡化編程接口、 提高系統容錯性等。

優點:

易於編程

​ 傳統的分布式程序設計(如MPI)非常復雜,用戶需要關注的細節 非常多,比如數據分片、數據傳輸、節點間通信等,因而設計分布式程序的門檻非常 高。Hadoop的一個重要設計目標便是簡化分布式程序設計,將所有並行程序均需要關注的設計細節抽象成公共模塊並交由系統實現,而用戶只需專注於自己的應用程序邏輯實現,這樣簡化了分布式程序設計且提高了開發效率。

良好的擴展性

​ 隨着公司業務的發展,積累的數據量(如搜索公司的網頁量) 會越來越大,當數據量增加到一定程度后,現有的集群可能已經無法滿足其計算能力和存儲能力,這時候管理員可能期望通過添加機器以達到線性擴展集群能力的目的。

高容錯性

​ 在分布式環境下,隨着集群規模的增加,集群中的故障率(這里 的“故障”包括磁盤損壞、機器宕機、節點間通信失敗等硬件故障和壞數據或者用戶 程序bug產生的軟件故障)會顯著增加,進而導致任務失敗和數據丟失的可能性增 加。為此,Hadoop通過計算遷移或者數據遷移等策略提高集群的可用性與容錯性。

適合PB級以上海量數據的離線處理

​ 可以實現上千台服務器集群並發工作,提供數據處理能力。

缺點:

不擅長實時計算

​ MapReduce無法像MySQL一樣,在毫秒或者秒級內返回結果。

不擅長流式計算

​ 流式計算的輸入數據是動態的,而MapReduce的輸入數據集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了數據源必須是靜態的。

不擅長DAG(有向圖)計算

​ 多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce並不是不能做,而是使用后,每個MapReduce作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。

2. Hadoop MapReduce核心思想

從MapReduce自身的命名特點可以看出,MapReduce由兩個階段組成:Map階段 和Reduce階段。

(1)分布式的運算程序往往需要分成至少2個階段。

(2)第一個階段的MapTask並發實例,完全並行運行,互不相干。

(3)第二個階段的ReduceTask並發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask並發實例的輸出。

(4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行運行。

每一個Map階段和Reduce階段都可以由多個Map Task和Reduce Task

實際應用中我們只需編寫map()和reduce()兩個函數,即可完成簡單的分布式程序的 設計。

map()函數以key/value對作為輸入,產生另外一系列key/value對作為中間輸出 寫入本地磁盤。

MapReduce框架會自動將這些中間數據按照key值進行聚集,且key 值相同(用戶可設定聚集策略,默認情況下是對key值進行哈希取模)的數據被統一 交給reduce()函數處理。

reduce()函數以key及對應的value列表作為輸入,經合並key相同的value值后, 產生另外一系列key/value對作為最終輸出寫入HDFS。

hadoop MapReduce對外提供了5個可編程組件,分別是 InputFormat、Mapper、Partitioner、Reducer和OutputFormat

3.MapReduce工作機制

剖析MapReduce運行機制

過程描述

  • 客戶端:提交MapReduce作業
  • YARN資源管理器,負責協調集群上計算機資源的分配
  • YARN節點管理器,負責啟動和監視集群中機器上的計算容器(container)
  • MapReduce的application master,負責協調運行MapReduce作業的任務。他和MapReduce任務在容器中運行,這些容器有資源管理器分配並由節點管理器進行管理
  • 分布式文件系統(一般為HDFS),用來與其他實體間共享作業文件

在這里插入圖片描述

第一階段:作業提交(圖1-4步)

步驟:

Job的submit()方法創建一個內部的JobSummiter實例,並且調用其submitJobInternal()方法。提交作業后,waitForCompletion()每秒輪詢作業的進度,如果發現自上次報告后有改變,便把進度報告到控制台。作業完成后,如果成功,就顯示作業計數器;如果失敗,則導致作業失敗的錯誤被記錄到控制台。

1.客戶端提交作業Job,並輪詢監控作業進度和狀態;

2.Job tracker向RM申請一個新應用ID,用作MR作業的ID;(圖中整個流程的步驟2)

3.Job 計算作業的輸入分片。如果無法進行分片計算,比如:輸入路徑不存在,作業就不提交,將錯誤返回給MR程序;

4.如果上一步OK,Job將運行作業所需要的資源(包括作業的JAR文件、配置文件和計算所得的輸入分片)復制到一個以作業ID命名的目錄下的共享文件系統中(一般都是HDFS)。作業JAR的復本較多(復本的數量有作業提交的時候MR的參數控制:mapreduce.client.submit.file.replication屬性,默認為10),因此運行作業的任務時,集群中有很多個復本可供節點管理器(Node Manager )訪問讀取;

5.上傳完畢后,通過調用資源管理器的submitApplication()方法提交作業。

第二階段:作業初始化(圖5-7步)

步驟:

1.資源管理器收到調用它的submitApplication()消息后,便將請求傳遞給YARN調度器。調度器分配一個容器(container),並讓容器運行MRAppMaster程序,本質是RM(資源管理器)在NM(節點管理器)的管理下在容器中啟動application master的進程;(5a、5b)

2.MR作業的application master是一個Java應用程序,其主類是MRAppMaster。由於MRAppMaster要能相應客戶端對於應用程序運行狀態的查詢和狀態,因此application master對作業的初始化是通過創建多個簿記對象(就像一個多重賬單,形象的命名為簿記對象)以保持對作業進度的跟蹤來完成的。接下來,它接受來自共享文件系統的、在客戶端計算的輸入分片。然后對每一個分片創建一個map任務對象以及有mapreduce.job.reduces屬性(通過作業的setNumReducetasks()方法設置)確定的多個reduce任務對象。任務ID在此時分配。

application master必須決定如何運行構成MapReduce作業的各個任務。如果作業很小,就選擇和自己在同一個JVM上運行任務。與在一個節點上順序運行這些任務相比,當application master判斷在新的容器中分配和運行任務的開銷大於並行運行它們的開銷時,就會發生這一情況。這樣的作業稱為uberized。或者作為Uber任務運行。

小作業是如何定義的?

默認情況下,小作業就是小於10個mapper且只有1個reducer且輸入大小小於一個HDFS塊的作業。其參數可以通過如下屬性進行設置:

  • mapreduce.job.ubertask.maxmaps :最大map任務數量
  • mapreduce.job.ubertask.maxreduces:最大reduce任務數量
  • mapreduce.job.ubertask.maxbytes:處理文件的最大容量,byte為單位
  • mapreduce.job.ubertask.enable:true表示為啟用Uber任務

最后,在任何任務運行之前,application master調用setupJob()方法設置OutputCommitter。FileOutputCommitter為默認值,表示將建立作業的最終輸出目錄及任務輸出的臨時工作空間。

第三階段:任務的分配(圖8)

當作業不適合作業Uber任務運行,那么application master就會為該作業中的所有map任務和reduce任務向資源管理器(RM)請求容器。

步驟:

  1. 首先Map任務發出請求,該請求優先級要高於reduce任務的請求,這是因為所有的map任務必須在reduce排序階段能夠啟動前完成。直到有5% 的map任務已經完成時,為reduce任務申請容器的請求才會發出。
  2. reduce任務能夠在集群中任意位置運行,但是map任務的請求有着數據本地化局限,這也是調度器所關注的。在理想情況下,任務時數據本地化的。也就是說任務在分片駐留的同一節點上運行。次選的情況是,任務是可以機架本地化(rack local),即和分片在同一機架上而非同一節點上運行。有一些任務既不是數據本地化也不是機架本地化,他們會從別的機架,而不是運行所在的機器上獲取自己的數據。對於一個特定的作業運行,可以通過查看做的計數器來確定在每個本地化層次上運行的任務的數量。
  3. 請求為任務指定了內存需求和CPU數。默認情況下,每個map任務和reduce任務都分配到1G內存和一個虛擬內核。這些值也是可以在每個作業的基礎上進行配置。配置屬性如下:
    • mapreduce.map.memory.mb:map任務內存大小
    • mapreduce.reduce.memory.mb:reduce任務內存大小
    • mapreduce.map.cpu.vcores:map任務cpu核數
    • mapreduce.reduce.cpu.vcores:reduce任務CPU核數

第四階段:任務的執行(圖9-11)

步驟:

  1. 一旦資源管理器的調度器為任務分配了一個特點節點上的容器,application master就通過與節點管理器通信來啟動容器.
  2. 該任務有主類為YarnChild的一個Java應用程序執行。在它運行任務之前,首先將任務需要的資源本地化,包括任務的配置、JAR文件和所有來自分布式緩存的文件;
  3. 最后運行map任務或reduce任務。

YarnChild 在指定的JVM中運行,因此用戶定義的map和reduce函數(甚至是YarnChild)中的任何缺陷不會影響到節點管理器,例如導致其崩潰或掛起。

每個任務都能夠運行搭建(setup)和提交(commit)動作,他們和任務本身在同一個JVM中運行,並有作業的OutputCommitter確定。對於基於文件的作業,提交動作將任務輸出由臨時位置搬移到最終位置。提交協議確保當推測執行被啟用時,只有一個任務副本被提交,其他都取消。

推測執行

MapReduce模型將作業分解成任務,然后並行地運行任務以使作業的整體執行時間少於各個任務順序執行的時間。這就使得作業執行時間對運行緩慢的任務很敏感,因為只運行一個緩慢的任務會使整個作業所用的時間遠遠長於執行其他任務的時間,當一個作業由幾百或幾千個任務組成時,可能出現少數“拖后腿”的任務,這是很常見的。

任務執行緩慢的可能有很多種,但是檢測具體原因是比較困哪的(比如,硬件方面當前節點的性能低於其他節點或者軟件應用配置如內存,JVM,reduce個數等等的問題),盡管執行時間比預期長,但是任務最終是成功執行的。hadoop 不會嘗試診斷或者修復執行慢的任務,相反,在下一個任務運行比預期慢的時候,它會盡量檢測,並啟動另一個相同的任務作為備份,這就是所謂的推測執行

我們很容易想到的是,如果同時啟動了兩個重復的任務,對於資源的開銷時比較大,且相同任務之間會產生互相競爭(比如資源,當同任務在同一節點啟動時的內存、cpu等),這都不是我們想要的,在什么情況下對某個任務啟動推測執行的時機是沒有絕對的,我們可以相對於各自類型任務(map和reduce)的平均執行進度作為一個基准【比如,map階段的任務平均耗時10s,當一個map任務執行了20S還沒有執行完成,那我們有理由相信這個任務可能是出問題,我們應該啟動一個副本任務】。在此基准上將執行速度明顯低於平均水平的那一部分任務進行推測執行副本。當一個推測執行的任務完成之后,其他正在運行的重復任務都將被中止執行,這個很容易理解,我們已經得到了想要的結果,就不需要副本任務了。

但是推測執行開啟就一定是好事么?小可愛們可以思考下,什么情況下推測執行會產生負面的影響。在這里插入圖片描述

我們都清楚推測執行是需要開啟相同的任務,那就是一個任務需要的資源是更多的,那對於一個繁忙的集群,執行推測執行就會消耗更多的資源,減少了集群整體的吞吐量,因而此時推測執行對於整體而言是不利的,就是因為一個任務慢導致整個集群都慢。

對於reduce任務,我們知道Reduce任務是需要將Map任務的輸出結果匯集之后,如果reduce任務有大量的任務推測執行,對於集群的網絡IO是會產生較大的影響,對於集群整體也會產生影響。另外,reduce可能會產生數據傾斜,對於此,一個reduce任務因為數據散列問題本身執行就慢,開啟推測執行反而不會有積極的影響。因此,對於reduce任務,關閉推測執行是相對好的選擇。

第五階段:作業完成

當application master收到作業最后一個任務已完成的通知后,便把作業的狀態設置為成功。然后,在Job輪詢狀態時,便知道任務已成功完成。於是Job打印一條消息告知用戶,然后從waitForCompletion()方法返回。Job的統計信息和計數值也在這個時候輸出到控制台。

如果我們希望當任務執行完成之后,application master可以主動通知我們,而不是等待Job輪詢才能獲取到任務的完成狀態。可以在application master進行相應的設置,這時application master會發送一個HTTP作業通知。客戶端通過設置屬性【mapreduce.job.end-notification.url】進行相應的設置。

最后,作業完成時,各個分配的container和application master會清理其工作狀態,任務運行期間的中間輸出將被刪除,OutputCommitter的commitJob()方法會被調用。作業信息有作業歷史服務器存檔,以便日后需要時查看。

在這我們就已經走完了MR程序整個運行過程,對於其中的部分細節我們在下邊在來介紹一下。


Tips 知識點:進度和狀態更新

MapReduce作業是長時間運行的批量作業,運行時間范圍從數秒到數小時。在這個很長的時間內,用戶想要去獲取關於作業的一些運行反饋是很重要的。一個作業和它的每個任務都有一個狀態(status),包括:作業或任務的狀態(比如:運行中,成功完成,失敗)、map和reduce的進度、作業計數器的值、狀態消息或描述(可以由用戶代碼設置)。之前我們也有提到用戶是可以隨時查看作業的狀態信息的,那這些狀態是怎么通過客戶端進行通信得到的呢?

任務在運行時,對其進度(progress,即任務完成百分比)保持追蹤。對Map任務,任務進度是已處理輸入所占的比例。對reduce任務,情況會復雜一些,但系統仍然會估計已處理reduce輸入的比例。整個過程分為三部分,與shuffle的三個階段相對應。比如,如果任務已經執行reducer一半的輸入,那么任務的進度便是5/6,這是因為已經完成復制和排序階段(每個占1/3),並且完成reduce階段的一半(1/6)。

任務也有一組計數器,負責對任務運行過程中各個時間進行計數,這些計數器要么內置框架中,比如寫入的map輸出記錄數,要么是用戶自定義的。

當map任務或reduce任務運行時,子進程和自己的父application master通過umbilical接口通信。默認每隔3秒,任務通過這個umbilical接口向自己的application master報告進度和狀態(包括計數器),application master會形成一個作業的匯聚視圖。注意:application master不會主動獲取任務的進度,是被動接受task任務的上報。

在作業期間,客戶端每秒鍾輪訓一次application master以接收最新狀態(輪詢間隔通過mapreduce.client,progressmonitor.ploointerval設置)。客戶端也可以通過使用job的getStatus()方法得到一個JobStatus實例,其內包含作業的所有狀態信息。


4.MR各組成部分工作機制原理

4.1概覽:

在這里插入圖片描述

4.2 MapTask工作機制

在這里插入圖片描述
MapTask的整體計算如上圖所示,共分為5個階段,如下:

其中最重要的部分是輸出結果在內存和磁盤中的組織方式,具體涉及Collect、Spill和Combine三個階段,對於這三個階段我們介紹時會深入介紹。

(1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。

(2)Map階段:該階段主要是將解析出的key/value交給用戶編寫map()函數處理,並產生一系列新的key/value。

(3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成后,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩沖區中。
輸出深入講解
當map函數處理完一對key/value產生新的key/value后,會調用collect()函數輸出結果。在輸出結果時,OutputCollecter對象會根據作業是否有Reduce Task進行不同的處理,如果沒有Reduce Task階段,則把結果直接輸出到HDFS。如果后續有對應的Reduce Task,則開始組織封裝結果:

  • 1.獲取對應記錄的分區號partition,然后寫到環形緩沖區;
  • 2.環形緩沖區中,當數據寫入到一定閾值后,會有專屬的寫出線程(SpillThread)將數據寫到一個臨時文件中此操作稱之為落盤,當所有數據處理完畢后,對所有臨時文件進行一次合並以生成一個最終文件。環形緩存區使得Collect階段和Spill階段可以並行進行。
  • 3.數據(新的key/value)寫入是由兩部分組成,索引和真實key/value。通過讓索引和數據共享環形緩沖區,提升整個緩沖區的效率:存放如下

在這里插入圖片描述
指針equator,該指針界定了索引和數據的共同起始存放位置,從該位置開始,索引和數據分別沿相反的方向增長內存使用空間。當內存使用達到80%(默認情況下) 的時候,落盤也是從該指針開始讀取數據到指定的位置,數據和索引的落盤是分開進行的,索引落盤一般是當索引大小超過1MB才開始進行落盤。
在這里插入圖片描述

(4)Spill階段:即“溢寫”,當環形緩沖區滿后,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,並在必要時對數據進行合並、壓縮等操作。

溢寫階段詳情

步驟1:利用快速排序算法對緩存區區間內的數據進行排序,排序方式是,先按照分區編號Partition進行排序,然后按照key進行排序。這樣,經過排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。

步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置了Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。

步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。

環形緩沖區認知

(5)Combine階段:當所有數據處理完成后,MapTask對所有臨時文件進行一次合並,以確保最終只會生成一個數據文件。

​ 當所有數據處理完后,MapTask會將所有臨時文件合並成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。

​ 在進行文件合並過程中,MapTask以分區為單位進行合並。對於某個分區,它將采用多輪遞歸合並的方式。每輪合並io.sort.factor(默認10)個文件,並將產生的文件重新加入待合並列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。

​ 讓每個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

4.3 ReduceTask工作機制

在這里插入圖片描述

(1)Copy階段/Shuffle階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。

(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后台線程對內存和磁盤上的文件進行合並,以防止內存使用過多或磁盤上文件過多。

總體上看,Shuffle&Merge階段可進一步划分為三個子階段。
(1)准備運行完成的Map Task列表:
GetMapEventsThread線程周期性通過RPC從TaskTracker獲取已完成Map Task列表,並保存到映射表mapLocations(保存了TaskTracker Host與已完成任務列表的映射關系)中。為防止出現網絡熱點,Reduce Task通過對所有TaskTracker Host進行“混洗”操作以打亂數據拷貝順序,並將調整后的Map Task輸出數據位置保存到scheduledCopies列表中。
(2)遠程拷貝數據
Reduce Task同時啟動多個MapOutputCopier線程,這些線程從scheduledCopies列表中獲取Map Task輸出位置,並通過HTTP Get遠程拷貝數據。對於獲取的數據分片,如果大小超過一定閾值,則存放到磁盤上,否則直接放到內存中。
(3)合並內存文件和磁盤文件
為了防止內存或者磁盤上的文件數據過多,Reduce Task啟動了LocalFSMerger和InMemFSMergeThread兩個線程分別對內存和磁盤上的文件進行合並。

(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop采用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數據進行一次歸並排序即可。

(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。

前面提到,各個Map Task已經事先對自己的輸出分片進行了局部排序,因此,Reduce Task只需進行一次歸並排序即可保證數據整體有序。為了提高效率,Hadoop將Sort階段和Reduce階段並行化。在Sort階段,Reduce Task為內存和磁盤中的文件建立了小頂堆,保存了指向該小頂堆根節點的迭代器,且該迭代器保證了以下兩個約束條件:

1.磁盤上文件數目小於io.sort.factor(默認是10)。

2.當Reduce階段開始時,內存中數據量小於最大可用內存(JVM Max HeapSize)的mapred.job.reduce.input.buffer.percent(默認是0)。

在Reduce階段,Reduce Task不斷地移動迭代器,以將key相同的數據順次交給reduce()函數處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程,這樣,Sort和Reduce可並行進行。

4.4shuffle 階段

4.4.1 定義:

我們將MR過程中,將Map輸出作為輸入傳給reducer的過程成為shuffle,但一般情況下,我們把從map端產生輸出到reduce消化輸入的整個過程都稱之為shuffle。

大致流程如下:

在這里插入圖片描述

也就是說shuffle階段包括了從Map端Collect階段開始一直到Reduce端Sort階段的整個過程,也可以看出這是整個MR的核心過程,在生產中優化MR更多的是在shuffle階段的各個過程做文章,提高整個MR的處理效率。

小結:

通過這篇文章我們了解了什么是MapReduce。在Hadoop中MapReduce是計算處理的邏輯關鍵,而Shuffle階段又是整個MR的核心關鍵,Shuffle是部分Map task階段和Reduce Task階段的處理過程誠摯為Shuffle階段。對於我們進行MR調優的大部分操作都是在Shuffle階段的機制中去優化各個節點的處理,進而提升MR的處理效率。針對於如何調優MR,我們會在下個篇章進行一些生產中常見的調優的策略,而對於MR的理解能幫助我們更好的去進行調優處理。

好了,今天的文章就到這里結束了。路漫漫其修遠兮,吾將上下而求索。希望這篇文章對於大家有所幫助。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM