MapReduce on Yarn運行原理


一、概念綜述

  MapReduce是一種可用於數據處理的編程模型(或計算模型),該模型可以比較簡單,但想寫出有用的程序卻不太容易。MapReduce能將大型數據處理任務分解成很多單個的、可以在服務器集群中並行執行的任務,而這些任務的計算結果可以合並在一起計算最終的結果。最重要的是,MapReduce的優勢在於易於編程且能在大型集群(上千節點)並行處理大規模數據集,以可靠,容錯的方式部署在商用機器上。

  從MapReduce的所有長處來看,它基本上是一個批處理系統,並不適合交互式分析。不可能執行一條查詢並在幾秒內或更短的時間內得到結果。典型情況下,執行查詢需要幾分鍾或更多時間。因此,MapReduce更適合那種沒有用戶在現場等待查詢結果的離線使用場景。

  在MapReduce整個過程可以概括為以下過程:

  input split --> map --> shuffle --> reduce --> output

  下圖是《Hadoop權威指南》給出的MapReduce運行過程:

             MapReduce運行過程圖   

  MapReduce作業是客戶端需要執行的一個工作單元:它包括輸入數據、MapReduce程序和配置信息。Hadoop將作業分成若干個任務(task)來執行,其中包括兩類任務:map任務和reduce任務。這些任務運行在集群的節點上,並通過YARN進行調度。如果一個任務失敗,它將在另一個不同的節點上自動重新調度運行。

Input Split:

  Hadoop將MapReduce的輸入數據划分成等長的小數據塊,稱為輸入分片(input split)或簡稱“分片”。Hadoop為每一個分片構建一個map任務,並由該任務來運行用戶自定義的map函數從而處理分片中的每條記錄。

  擁有許多分片,意味着處理每個分片所需要的時間少於處理整個輸入數據所花的時間。因此,如果我們並行處理每個分片,且每個分片數據比較小,那么整個處理過程將獲得更好的負載平衡,因為一台較快的計算機能夠處理的數據分片比一台較慢的計算機更多,且成一定的比例。即使使用相同的機器,失敗的進程或其他並發運行的作業能夠實現滿意的負載平衡,並且隨着分片被切分得更細,負載平衡的質量會更高。另一方面,如果分片切分得太小,那么管理分片得總時間和構建map任務得總時間將決定作業的整個執行時間。對於大多數作業來說,一個合理的分片大小趨向於HDFS的一個塊的大小,這樣可以確保存儲在單個節點上的最大輸入塊的大小。數據塊默認是128MB,不過可以針對集群調整這個默認值,或在每個文件創建時指定。

Map:

  map任務會將集合中的元素從一種形式轉化成另一種形式,在這種情況下,輸入的鍵值對會被轉換成零到多個鍵值對輸出。其中輸入和輸出的鍵必須完全不同,輸入和輸出的值則可能完全不同。

  Hadoop在存儲有輸入數據(HDFS中的數據)的節點上運行map任務,可以獲得最佳性能,因為它無需使用寶貴的集群帶寬資源。這就是所謂的“數據本地化優化”。但是,有時對於一個map任務的輸入分片來說,存儲該分片的HDFS數據塊復本的所有節點可能正在運行其他map任務,此時作業調度需要從某一數據塊所在的機架中的一個節點上尋找一個空閑的map槽(slot)來運行該map任務分片。僅僅在非常偶然的情況下(該情況基本上不會發生),會使用其他機架中的節點運行該map任務,這將導致機架與機架之間的網絡傳輸。下圖顯示了這三種可能性。

map任務的網絡傳輸的三種可能性圖

  map任務的輸出被稱為中間鍵和中間值,會被發送到reducer做后續處理。但輸出結果只寫入本地硬盤,而非HDFS。這是為什么?因為map的輸出是中間結果:該中間結果由reduce任務處理后才產生最終輸出結果,而且一旦作業完成,map的輸出結果就可以刪除。因此,如果把它存儲在HDFS中並實現備份,難免有些小題大做。如果運行map任務的節點在將map中間結果傳送給reduce任務之前失敗,Hadoop將在另一個節點上重新運行這個map任務以再次構建map中間結果。

Shuffle和排序:

  shuffle和排序在MapReduce流程圖中的執行過程

  MapReduce確保每個reducer的輸入都是按鍵排序的。系統執行排序、將map輸出作為輸入傳給reduce的過程稱為shuffle。在此,我們將學習shuffle是如何工作的,因為它有助於我們理解工作機制(如果需要優化MapReduce程序)。shuffle屬於不斷被優化和改進的代碼庫的一部分,因此下面的描述有必要隱藏一些細節。從許多方面來看,shuffle是MapReduce的“心臟”,是奇跡發生的地方。

Map端shuffle過程:

map端shuffle過程

     1. 讀取HDFS上的輸入分片input split,每一行解析成一個<key, value>。每一個鍵值對調用一次map函數。輸入<0,helloyou>,<10,hello me>。

  2. 覆蓋map(),接收1中產生的<key, value>,然后進行處理,轉換為新的<key, value>輸出。每個map任務都有一個環形內存緩存區,輸出結果會暫且放在環形內存緩沖區中(該緩沖區的大小默認為100MB,由mapreduce.task.io.sort.mb屬性控制),當該緩沖區快要溢出時(默認為緩沖區大小的80%,由mapreduce.map.sort.spill.percent屬性控制),會由單獨線程本地文件系統中創建一個臨時溢出文件(spill file),將該緩沖區中的數據寫入這個文件,但如果再此期間緩沖區被填滿,map會被堵塞直到寫入過程完成。溢出寫過程按輪詢方式將緩沖區中的內容寫到mapreduce.cluster.local.dir屬性在作業特定子目錄下指定的目錄中。輸出:<hello, 1>,<you, 1>,<hello, 1>,<me, 1>。

  注:當緩沖區的數據值達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢出寫線程啟動,鎖定這80MB的內存,執行溢出寫過程。map任務的輸出結果還可以往剩下的20MB內存中寫,互不影響。

  3. 對2輸出的<key, value>進行分區,默認分為一個區,MapReduce提供Partitioner接口,作用就是根據key或value及reduce的數量來決定當前的輸出數據最終應該交由哪個reduce任務處理。默認對key hash后再以reduce任務數據取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以定制並設置到job上。

  在寫入磁盤之前,線程首先根據reduce任務的數目將數據划分為相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣做是為了避免有些reduce任務分配到大量數據,而有些reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。接下來對每個分區內又調用job.setSortComparatorClass()設置的key比較函數類排序(如果沒有通過job.setSortComparatorClass()設置key比較函數類,則使用key的實現的compareTo方法)也就是對這80MB空間內的key做排序(sort),這里的排序是對序列化的字節做的排序。如果此時設置了Combiner,將排序后的結果進行Combiner操作,如果至少存在3個溢出文件(通過mapreduce.map.combine.minspills屬性設置)時,則combiner就會在輸出文件寫到磁盤之前再次運行,這樣做的目的是讓盡可能減少數據寫入到磁盤和傳遞給reduce的數據。排序后:<hello, 1>,<hello, 1>,<me, 1>,<you, 1>,Combiner后:<hello, {1, 1}>,<me, {1}>,<you, {1}>。

  注:combiner可以在輸入上反復運行,但並不影響最終結果。如果只有1或2個溢出文件,那么由於map輸出規模減少,因而不值得調用combiner帶來的開銷,因此不會為該map輸出再次運行combiner。

  4 當map任務輸出最后一個記錄時,可能會有很多的溢出文件,這時需要將這些文件合並(merge)。合並的過程中會不斷地進行排序和combiner操作,目的有兩個:

    ① 盡量減少每次寫入磁盤的數據量;

    ② 盡量減少下一復制階段網絡傳輸的數據量。

  最后合並成了一個已分區且已排序的輸出文件, 其中排序步驟是再次調用job.setSortComparatorClass()設置的key比較函數類對所有數據對排序(因為一個reducer接受多個mappers,需要重新排序),二次排序使用到了jobjob.setGroupingComparatorClass()設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器。配置屬性mapreduce.task.io.sort.factor控制着一次最多能合並多少流,默認值是10。為了減少網絡傳輸的數據量,節約磁盤空間和寫磁盤的速度更快,這里可以將數據壓縮,只要將mapreduce.map.output.compress設置為true就可以。數據壓縮算法有DEFLATE、gzip、bzip2、LZO、LZ4、Snappy等,可以通過mapreduce.map.output.compress.codec配置壓縮類型即可。

  5. 將分區中的數據拷貝給相對應的reduce任務(可選)。reducer通過HTTP得到輸出文件的分區。用於文件分區的工作線程的數量由任務的mapreduce.shuffle.max.threads屬性控制,此設置針對的是每一個節點管理器,而不是針對每個map任務。默認值0將最大線程數設置為機器中處理器數量的兩倍。

  有人可能會問:分區中的數據怎么知道它對應的reduce是哪個呢?其實map任務一直和其節點上的Application Master保持聯系,而Application Master又一直和Application Manager保持心跳。所以Application Manager中保存了整個集群中的宏觀信息。只要reduce任務向ApplicationManager獲取對應的map輸出位置就OK了。

   至此,map端的所有工作已經結束了,最終生成的這個文件也存放在運行map任務的tasktracker的本地磁盤上(但reduce輸出並不這樣)。每個reduce任務不斷地通過RPC從JobTracker那獲取map任務是否完成的信息,如果reduce任務得到通知,獲知某台TaskTracker上的map任務執行完成,shuffle的后半段過程開始啟動。

 Reduce端shuffle過程:

  現在,tasktracker需要為分區文件運行reduce任務。下圖是reduce端shuffle過程圖:

 reduce端shuffle過程圖

  1. copy過程,簡單地拉取數據。reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區文件。每個map任務的完成時間可能不同,因此在每個任務完成時,reduce任務就開始通過HTTP方式請求復制其輸出。 reduce任務由少量復制線程,因此能夠並行取得map輸出。默認值是5個線程,但這個默認值可以修改設置mapreduce.reduce.shuffle.parallelcopies屬性即可。

  如果map輸出相當小,會被復制到reduce任務JVM的內存(緩沖區大小由mapreduce.reduce.shuffle.input.buffer.percent屬性控制,指定用於此用途的堆空間的百分比),否則,map輸出被復制到磁盤。一旦內存緩沖區達到閾值大小(由mapreduce.reduce.shuffle.merge.percent決定,默認是0.66)或達到map輸出閾值(由mapreduce.reduce.merge.inmem.threshold控制),則合並后溢出寫到磁盤中。如果指定combiner,則在合並期間運行它以降低寫入硬盤的數據量。

  隨着磁盤上副本增多,后台線程會將它們合並為更大的、排好序的文件。這會為后面的合並節省一些時間。

  2. merge階段。從map端copy過來的數據會先放入JVM的內存緩沖區中,這里的緩沖區大小要比map端更為靈活,它基於JVM的heap size設置的,因為shuffle階段reducer不運行,所以絕大部分的內存都給shuffle使用。這個merge階段將合並map輸出,維持其順序排序。這是循環進行的。比如,如果由50個map輸出,而合並因子是10(10為默認設置,由mapreduce.task.io.sort.factor屬性設置,與map的合並類似),合並將進行5趟。每趟將10個文件合並成一個文件,因此最后有5個中間文件。

  Merge有三種形式:1、內存到內存;2、內存到磁盤;3、磁盤到磁盤。默認情況下第一種形式是不啟動的。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge。與map端類似,這也是溢寫的過程,在這個過程中如果設置了combiner,也是會啟動的,然后在磁盤中合並溢寫文件。第二種merge方式一直再運行,直到沒有map端的數據時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的輸出文件。

  3. reduce階段。這是最后階段了,直接把數據輸入reduce函數,也就是對已排序輸出中的每個鍵調用reduce函數,從而省略了一次磁盤往返行程,並沒有將這5個文件合並成一個已排序的文件作為最后一趟。最后的合並可以來自內存和磁盤片段。此階段的輸出直接寫到輸出文件系統,一般為HDFS。如果采用HDFS,由於節點管理器(NodeManager)也運行數據節點(DataNode),所以第一個塊復本將被寫到本地磁盤。

Reduce:

   reduce任務並不具備數據本地化的優勢,單個reduce任務的輸入通常來自於所有map任務的輸出,或者接收到不同map任務的輸出。在本例中,我們假設僅有一個reduce任務,其輸入是所有map任務的輸出。因此,排過序的map輸出需通過網絡傳輸發送到運行reduce任務的節點。數據在reduce端合並,然后由用戶定義的reduce函數處理。reduce的輸出通常存儲在HDFS中以實現可靠存儲。對於reduce輸出的每個HDFS塊,第一個復本存儲在本地節點上,其他復本出於可靠性考慮存儲在其他機架的節點中。因此,將reduce的輸出寫入HDFS確實需要占用網絡帶寬,但這與正常的HDFS管線寫入的消耗一樣。

  一個reduce任務的完整數據流如圖所示。虛線框表示節點,虛線箭頭表示節點內部的數據傳輸,而實線箭頭表示不同節點之間的數據傳輸。

一個reduce任務的MapReduce數據流圖

   reduce任務的數量並非由輸入數據的大小決定,相反是獨立指定的。 

  如果有好多個reduce任務,每個map任務就會針對輸出進行分區(partition),即為每個reduce任務建一個分區。每個分區有許多鍵(及其對應的值),但每個鍵對應的鍵-值對記錄都在同一個分區中。分區可由用戶定義的分區函數控制,但通常用默認的partitioner通過哈希函數來分區,很高效。 

  一般情況下,多個reduce任務的數據流如下圖所示。該圖很清晰地表明了為什么map任務和reduce任務之間的數據流稱為shuffle(混洗),因為每個reduce任務的輸入都來自許多map任務。shuffle一般比圖中所示的更復雜(上下節已描述了大概),而且調整混洗參數對作業總執行時間的影響非常打。

  多個reduce任務的數據流圖

  最后,當數據處理可以完全並行(即無需混洗時),可能會出現無reduce任務的情況。在這種情況下,唯一的非本地節點數據傳輸是map任務將結果寫入HDFS,參見下圖所示。

無reduce任務的MapReduce數據流

   

 二、環形內存緩沖區

2.1 什么是環形內存緩沖區

  Map的輸出結果是由Collector處理的,每個Map任務不斷地將鍵值對輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。

 

2.2  環形內存緩沖區的數據結構

  這個數據結構其實就是個字節數組,叫Kvbuffer,名如其義,但是這里面不光放置了數據,還放置了一些索引數據,給放置索引數據的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(字節序采用的是平台自身的字節序)的馬甲。數據區域和索引數據區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來划分兩者,分界點不是亘古不變的,而是每次Spill之后都會更新一次。初始的分界點是0,數據的存儲方向是向上增長,索引數據的存儲方向是向下增長,Kvbuffer的存放指針bufindex時指向數據區的,是一直悶着頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。

1 kvoffsets緩沖區:也叫偏移量索引數組,用於保存key/value信息在位置索引kvindices中的偏移量。當kvoffsets的使用率超過io.sort.spill.percent(默認為80%)后,便會觸發一次SpillThread線程的“溢寫”操作,也就是開始一次spill階段的操作。
索引數據區域:存元數據信息,都是整數,只存儲分區信息(整數)和kvbuffer在數組中的位置
2 kvindices緩沖區:也叫位置索引數組,用於保存key/value在數據緩沖區kvbuffer中的起始位置
3 kvbuffer數據緩沖區:用於保存實際的key/value的值。默認情況下該緩沖區最多可以使用io.sort.mb的95%,當kvbuffer使用率超過io.sort.spill.percent(默認80%)后,便會觸發一次SpillThread線程的“溢寫”操作,也就是開始一次spill階段的操作。

  索引是對在kvbuffer中的鍵值對的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數據。比如Kvindex初始位置是-4,當第一個鍵值對寫完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然后Kvindex跳到-8位置,等第二個鍵值對和索引寫完之后,Kvindex跳到-12位置。

 

三、剖析MapReduce作業運行機制

  整個過程描述如下圖所示。在最高層,有以下5個獨立的實體。

  • client(客戶端),提交MapReduce作業。
  • YARN ResourceManager(YARN資源管理器),負責協調集群上計算機資源的分配。
  • YARN NodeManager(YARN節點管理器),負責啟動和監視集群中機器上的計算容器(container)。
  • MapReduce的application master,負責協調運行MapReduce作業的任務。它和MapReduce任務在容器中運行,這些容器由資源管理器分配並由節點管理器進行管理。
  • 分布式文件系統(一般為HDFS),用來與其他實體間共享作業文件。

Hadoop運行MapReduce作業的工作原理圖

   1. 客戶端提交一個MapReduce作業,Job的submit()方法創建一個內部的JobSummiter實例,並且調用其submitJobInternal()方法。提交作業后,waitForCompletion()每秒輪詢作業的進度,如果發現自上次報告后有改變,便把進度報告到控制台。作業完成后,如果成功,就顯示作業計數器;如果失敗,則導致作業失敗的錯誤被記錄到控制台。

  2. Job向資源管理器請求一個新應用ID,用於MapReduce作業ID。資源管理器檢查作業的輸出說明和計算作業的輸入分片,如果沒有指定輸出目錄,輸出目錄已存在或者分片無法計算,那么作業就不提交,錯誤拋回給MapReduce程序。

  3. 將運行作業所需要的資源(包括作業JAR文件、配置文件和計算所得的輸入分片)復制到一個以作業ID命名的目錄下的共享文件系統中。作業JAR的復本較多(由mapreduce.client.submit.file.replication屬性控制,默認值為10),因此在運行作業的任務時,集群中有很多個復本可供節點管理器訪問。

  4. 通過調用資源管理器的submitApplication()方法提交作業。

  5. 資源管理器收到調用它的submitApplication()消息后,便將請求傳遞給YARN調度器(scheduler)。調度器分配一個容器,然后資源管理器在節點管理器的管理下在容器中啟動application master的進程。

  6. MapReduce作業的application master是一個Java應用程序,它的主類是MRAppMaster。由於將接受來自任務的進度和完成報告,因此application master對作業的初始化是通過創建多個薄記對象以保持對作業進度的跟蹤來完成的。

  7. 對每一個分片創建一個map任務對象以及由mapreduce.job.reduces屬性(通過作業的setNumReduceTasks()方法設置)確定的多個reduce任務對象。任務ID在此時分配。

  application master必須決定如何運行構成MapReduce作業的各個任務。如果作業很小,就選擇和自己在同一個JVM上運行任務。與在一個節點上順序運行這些任務相比,當application master判斷在新的容器中分配和運行任務的開銷大於並行運行它們的開銷時,就會發生這種情況。這樣的作業稱為uberized,或者uber任務(小作業)運行。

  默認情況下,小作業就是少於10個mapper且只有1個reducer且輸入大小小於一個HDFS塊的作業(通過設置mapreduce.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreduces和mapreduce.job.ubertask.maxbytes可以改變這幾個值)。必須明確啟動uber任務(對於單個作業,或者是對整個集群),具體方法是將mapreduce.job.ubertask.enable設置為true。

  最后,在任何任務運行之前,application master調用setupJob()方法設置OutputCommitter。FileOutputCommitter為默認值,表示將建立作業的最終輸出目錄及任務輸出的臨時工作空間。

  8. 如果作業不適合作為uber任務運行,那么application master就會為該作業中的所有map任務和reduce任務向資源管理器請求容器。首先為Map任務發出請求,該請求優先級要高於reduce任務的請求,這是因為所有的map任務必須在reduce的排序階段能夠啟動前完成。直到有5%的map任務已經完成時,為reduce任務的請求才會發出(慢啟動reduce)。

  reduce任務能夠在集群中任意位置運行,但map任務的請求有着數據本地化局限,這也是調度器所關注的。map任務的三種情況的詳見一、概念綜述中的map。

  請求也為任務指定了內存需求和CPU數。在默認情況下,每個map任務和reduce任務都分配到1024MB的內存和一個虛擬的內核,這些值可以在每個作業的基礎上進行配置,配置參考如下表:

屬性名稱 類型 默認值 說明
mapreduce.map.memory.mb int 1024 map容器所用的內存容量
mapreduce.reduce.memory.mb int 1024 reduce容器所用的內存容量
mapreduce.map.cpu.vcores int 1 map容器所用的虛擬內核
mapreduce.reduce.cpu.vcoresp.memory.mb int 1 reduce容器所用的虛擬內核


  9. 一旦資源管理器的調度器為任務分配了一個特定節點上的容器,application master就通過與節點管理器通信來啟動容器
。該任務由主類為YarnChild的一個Java應用程序執行。

  10. 在它運行任務之前,首先將任務需要的資源本地化,包括作業的配置、JAR文件和所有來自分布式緩存的文件。

  11. 最后,運行map任務或reduce任務。

 

 

參考資料:《Hadoop權威指南(第四版)》

        https://www.jianshu.com/p/1e542477b59a

        https://www.cnblogs.com/laowangc/p/8961946.html

       https://www.jianshu.com/p/9e4d01b74600


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM