MapReduce工作流程及Shuffle原理概述


引言:

   雖然MapReduce計算框架簡化了分布式程序設計,將所有並行程序需要關注的設計細節抽象成公共模塊並交由系統實現,用戶只需關注自己的應用程序的邏輯實現,提高了開發效率。但開發者如果對Mapreduce計算框架如何實現這樣的魔術沒有一個基本的了解,那么將無法利用框架本身提供的靈活性編寫MapReduce程序,在面臨多任務、大數據而出現大量數據傾斜,計算速度慢等問題時,也無法給出解決方案,所以了解MapReduce工作流程和Shuffle原理是學習MapReduce程序設計的必修課。初學Hadoop時,看過的書籍和課程都偏向於把Yarn和MapReduce的原理分開來介紹,這就像把廚師(Yarn)和做菜的流程(MapReduce)分開介紹一樣,初學者往往很難將兩者結合起來,本文努力將廚師和如何制作每道菜的整個流程結合起來,介紹MapReduce和Yarn的工作原理。筆者才疏學淺,此文為個人愚見,如有不正,希望讀者不吝指正。

一、Yarn是什么東西?

  首先,不嚴謹的說:Yarn是由Resourcemanager、NodeManager、ApplicationMaster、Container等組件構成,他就像是一個分布式的操作系統,負責整個MapReduce程序計算過程中所有數據和硬件資源的調度,這種資源的調度表現在某一MapReduce任務進行到具體的某一階段時,資源的調度就由Yarn某一具體的組件負責,你可以把他比作廚師在每道菜的制作進行到各個流程時,會使用不同的廚具。

  對於Yarn的介紹就先到這里,對Yarn的各個組件有個映像即可。下面來介紹MapReduce執行之前需要做哪些准備工作:

  在前面關於任務提交的源碼分析一文中指出:在使用API客戶端提交MapReduce程序到Hadoop集群上的過程中,程序執行到 job.waitForCompletion(ture),首先調用connect方法,初始化Cluster對象,然后用這個對象創建JobRunner,如果獲得的Cluster是Yarn集群的抽象封裝,那么這個Cluster返回的JobRunner就是YarnRunner了,而如果獲得的Cluster是本機文件系統的抽象封裝,那么Cluster返回的JobRunner就是LocalJobRunner(這里不再贅述LocalJobRunner以及本地運行模式是如何執行)。YarnRunner會向Resourcemanager節點發送一個Application,Resourcemanager收到(Application)應用請求后返回給申請所在節點一個hdfs上的路徑(hdfs://.../.staging/)以及application_id,隨后YarnRunner將數據的切片信息(job.split)、本次job的配置信息(job.xml)、MapReduce程序打成的jar包提交到hdfs://.../.staging/application_id/這個路徑下。至此,MapReduce程序的准備工作完成了。

  當MapReduce相關的配置信息提交完成后,Yarn的工作邊開始了,下面詳細介紹Yarn在MapReduce計算的全流程中所做的具體工作:

  當job.split、job.xml、jar包都提交到hdfs://.../.staging/application_id/目錄以后,YarnRunner會通知Resourcemanager資源提交完畢,進而向Resourcemanager申請一個MR ApplicationMaster,Resourcemanager將此次請求初始化為一個Task,並放入資源調度器中(老版本的資源調度器是一個單列的FIFO結構,現在的版本是多列FIFO結構)。此后某一空閑的NodeManager節點(大概率是此MapReduce計算數據所在的節點)會從Resourcemanager節點的調度隊列中領取此Task任務,領取到此Task后,會初始化一個MR ApplicationMaster並向Resourcemanager節點注冊自己以表明身份信息,注冊完成后創建Container用於保存抽象化的本地硬件資源(注意:並不是一個節點只能存在一個Container,而是當一個節點接收到一個MapReduce計算任務時,會為該任務所需的資源單獨創建一個Container,如果一個節點處理多個MapReduce任務時,就會創建多個Container,所以我們說MapReduce是運行在Container中的,由於Container封裝了本次計算在本節點鎖分攤的任務量所需的計算資源,所以可以把他想象成你在windows上開了一個虛擬機,用於部署web服務),這些資源包括CPU和RAM等等。完成Container的創建后,此NodeManager會將hdfs://.../.staging/application_id/下的本次job的數據信息(不是數據本體,而是包含了切片信息和數據所在節點的信息)下載到本節點。隨后MR ApplicationMaster會根據split切片數量向Resourcemanager申請相應數量的節點作為計算節點(map節點),我們把用於map計算的節點稱為MapTask容器。Resourcemanager收到申請后同樣會在調度隊列中創建Task,然后會有相應數量的節點(大概率是數據本體所在節點)成為MapTask容器,即Map計算節點,MapTask也會為此MapReduce任務創建Container,當一個節點有能力並且同時處理多個MapReduce任務時,會為每一個任務所需的資源單獨創建一個Container保存抽象的資源。Resourcemanager會把這些節點的地址通知給MR ApplicationMaster所在節點,隨后,MR ApplicationMaster向MapTask節點發送程序啟動腳本。等Map計算完成后(不嚴謹,事實上是部分的Map計算完成后就會啟動ReduceTask,而非所有map),會將數據落到磁盤。最后MR ApplicationMaster會向Resourcemanager申請ReduceTask容器用於處理reduce計算,ReduceTask也會放入調度隊列中。申請ReduceTask容器的數量是在客戶端指定的,如果沒有指定,則默認為一個ReduceTask。ReduceTask所在節點對從map節點中拷貝屬於自己分區內的數據進行reduce操作。最后將計算結果輸出給context,由OutputFormat格式化后保存到文件。

  大概的工作流程如下圖(下圖忽略了map落盤reduce輸出落盤):

  

  關於Yarn的介紹到這里就結束了,可以看出Yarn的各個組件支持了MapReduce程序的執行過程,負責整個MapReduce程序計算過程中所有數據和硬件資源的調度,這種資源的調度表現在某一MapReduce任務進行到具體的某一階段時,資源的調度就由Yarn某一具體的組件負責。進行的下面介紹MapReduce程序的全流程。

二、MapReduce

盡管MapReduce的業務邏輯靈活多變,但也可以從宏觀角度來研究一個MapReduce程序的各個階段:

  上面介紹說:MR Applicationmaster根據hdfs://.../.staging/application_id/下的信息計算出切片數后,Resourcemanager會分配相應數量的節點作為MapTask節點進行map計算。這里取其中一個MapTask為例,來研究一個map節點的工作細節:

  MapTask1會根據配置信息讀取自己需要處理的片,並使用對應的RecordReader將文件數據轉化為K-V對,這個階段為format(格式化)階段。格式化的意義在於將數據變成Mapper方法可以接收的K-V類型(即參數類型),map方法經處理后又以K-V對的形式將數據寫回context中。然后由outputCollector將數據進行收集,outputCollector在將數據寫到shuffle之前,會計算該數據(K-V)的分區號,然后將K-V和分區號一並寫入shuffle中。默認情況下是按照Key的哈希值和Integer的最大值做與運算然后和RedueTask數量進行模運算,進而計算出分區數,由於在reduce階段各節點所分配的數據是由這個分區號決定的,所以這樣做的目的是初步平衡個reduce節點計算量,防止數據傾斜,但是這個ReduceTask的數量是可以在Driver中人為指定的,如果不指定則默認為1,也就說默認情況下(不指定分區方法和ReduceTask數量)所有的Key-Value只會計算出一個分區號,也就是說所有的計算都在一個節點上進行。

  上圖所示為兩個分區號,這是自定義分區和RedueTask數為2。但是注意,這里只是調用了ReduceTask的數值,還沒有進入Reduce階段,現階段屬於Map階段。

  你可能會問,這不也是固定的流程嗎?怎么會體現“設計理念”呢?其實設計就體現在InputFormat、RecordReader、OutputCollector這些都是可以自定義的,由於自定義的InputFormat和RecordReader(事實上自定義InputFormat的核心就是自定義RecordReader),同樣的數據會以不同的K-V形式輸給map方法,數據處理自然也會有更多的靈活性,自定義OutputCollector中自定義分區計算方法,會讓map輸出的key-value對根據開發者的意願計算出一個分區號,並交給shuffle處理,Shuffle會對帶有同一分區號的數據進行區內排序,當數據排序完成后會溢寫到磁盤中,溢寫出的數據具有分區且區內有序的特點,而往往(並非絕對)數據的分區號也決定了該區號的數據最終交給哪一個ReduceTask節點處理。關於自定義InputFormat和OutputCollector不是本文的重點,這里不再贅述。

  當數據進入Shuffle后,又經歷了什么呢?下面詳細介紹上圖紅框內的工作流程:

   首先,進入Shuffle的數據會在Shuffle中進行一次快排:

   排序流程大致如下:Shuffle可以看作為內存中的一個環形集合,首尾相接,結構上類似於雙向鏈表。當標記有分區號的K-V對進入Shuffle后,Shuffle會將同一分區的數據進行一次快排,Shuffle分為兩個部分,按圖示,兩個部分分別用於存儲索引標識和K-V對。在排序時采用如下方案:假如索引為2的K-V大於索引為1的K-V值,就會將標識為2的K-V的索引放到第一位。也就是說:Shuffle排序並不對數據本身做移動,而是對數據體量較小的索引標識做移動,這樣降低了IO壓力。當數據排序完成並且Shuffle中存儲的數據占了Shuffle空間(100mb)的80%時(這個可以自定義),會按照逆時針的順序(2>1>4>3)將對應的K-V值溢寫到磁盤中,保存在一個臨時文件。臨時文件中數據的特點就是分區且區內有序。在數據量較大時,會發生多次溢寫,產生多個臨時文件。

  Shuffle排序的規則也可以由開發者自定義,很多情況下,key是用戶自定義的Javabean類型,並且希望Shuffle在排序時按照該類型的一個或多個屬性值進行排序,而不是用Shuffle原生的排序規則,就可以自定義排序器

排序:

  1. 排序時MR框架在Shuffle階段自動進行的

  2. 在MapTask端發生兩次排序,在排序時,用戶唯一可以控制的時提供一個key的比較器

  3. 設置key的比較器

    ① 用戶可以自定義key的比較器,自定義的比較器必須是一個RawComparator類型的類,重點是實現compareTo()方法

    ② 用戶通過key,讓key實現WritableComparable接口,系統自動提供一個比較器,重點是實現compareTo()方法

  4. 排序的分類

    全排序: 對所有的數據進行排序,指的是生成一個結果文件,這個結果文件整體有序

    部分排序:最終生成N個文件,每個文件內部整體有序

    二次排序:在對key進行比較時,比較的條件為多個

    輔助排序:在進入reduce階段時,通過比較key是否相同,將相同的key分為1組

 

  為了方便講解,下面將使用案例的方式,本案例假設Shuffle存在多次溢寫,存在兩個分區,【】內的數據為一個分區,分區號為從左往右由0開始數。

  我們知道相同key的一組key-vlaue對會“組團”調用一次Reduce的ruduce方法。那么在數據從內存(Shuffle)溢寫到磁盤的過程中<第一處可以引入Combiner的地方>可以加一個Combiner,其本質就是一個Reducer,會對數據進行一輪的合並,比如第一輪溢寫出的兩個分區內的數據為{【(a,1),(a,1),(c,1)】,【(B,1),(B,1),(D,1),(D,1)】},兩個分區(本例以大小寫分區)內的數據雖然有序,但是數據有冗余,當相同的K-V值數量很多時,會對IO造成不必要的壓力,所以在此環節中引入Combiner將數據合並為{【(a,2),(c,1)】}、{【(B,2),(D,2)】}兩個分區(默認情況下Combiner是不開啟的,在不影響數據正確性的情況下建議開啟,Combiner適合 + - 操作,不適合*  / 操作),這樣再將數據從內存寫入硬盤時,IO壓力就會降低。在默認情況下Combiner是不開啟的,因為在很多業務場景中,Combiner可能會造成數據的誤差。附:Combiner其實就是一個Reducer,他的工作和Reducer一樣是對同一組的數據做處理,而且實現相同的接口(Reducer<>),所以很多情況(當Combiner與某一Reducer邏輯相同)下,可以直接使用已經定義好Reducer作Combiner。

   上圖雖然將分區和排序從Shuffle環中畫了出來,但實質上排序和分區都是在Shuffle中完成的,且順序並且沒有先后之分。

   多次溢寫出多組數據,產生多個文件,假設第一輪溢寫的結果為{【(a,2),(c,1)】}、{【(B,2),(D,2)】},第二輪溢寫並合並的結果為{【(a,3),(c,2)】}、{【(B,1),(D,3)】},產生里兩個臨時文件spill0.out/spill1.out。那么進行歸並排序后變成{【(a,2),(a,3),(c,1),(c,2)】},{【(B,2),(B,1),(D,2),(D,3)】}依舊會存在相同的Key的情況,那么也可以選擇在歸並排序時使用Combiner進行合並<第二處可以引入Combiner的地方>,結果為:{【(a,5),(c,3)】}、{【(B,3),(D,5)】}。這是一個MapTask的工作結果,當存在多個MapTask時,會產生多組輸出數據,假設存在第二個MapTask節點的輸出結果為:{【(a,3),(c,4)】}、{【(B,2),(D,6)】}。至此MapTask的工作完成了。

  map階段結束。

  現在進入ReduceTask階段,假設我們設定了兩個ReduceTask,並且自定義了分區規划:將分區一(key小寫字母的一組)交給ReduceTask1來處理,將分區2交給ReduceTask2處理,那么所有MapTask輸出結果中的分區1的數據都會進入ReduceTask1中(分區2的數據進入ReduceTask2中,不論是哪一個MapTask產生的)。此時shuffle線程從多個MapTask節點讀取同一個分區內的數據,然后進行歸並排序,按照相同的Key分組,比如分區1分為兩組:"(a,5),(a,3)"、"(c,3),(c,4)",在合並時,如果shuffle所使用的內存不夠,也會將部分數據溢寫到磁盤,如果此時設置了使用Combiner進行合並<第三處可以引入Combiner的地方>,數據也會被combine之后"(a,8) , (c,7)"再溢寫磁盤。這里假設沒有使用Combiner,"(a,5),(a,3)"、"(c,3),(c,4)"會一組一組的調用Reducer的reduce方法【即相同的Key作為一組數據調用一次reduce方法,把values保存在reduce方法的values數組中,然后循環反序列化,將一對一對的序列化K-V值反序列化注入(set方法)到reduce內自定義的key和value對象中,從始至終用於接收反序列化結果的key—value對象固定不變,但是循環反序列過程中自定義的對象的屬性值卻依次的按照本次處理的數據集合(values)內元素的順序而改變】,最后reduce方法將(a,8)、(c,7)寫回context中,交給OutputFormat處理,最終輸出兩個文件。

  注意1:此外,當存在多個ReduceTask節點時,默認情況下一個ReduceTask只處理一個分區的數據,但是可以通過自定義Partitioner(MapTask通過Partitioner來計算分區號)的方式(繼承HashPartitioner重寫getPartition方法)將具體的一個或多個分區路由給單個ReduceTask節點處理,也可以將一個分區內的一部分數據路由給某個ReduceTask處理。

分區:

  1. 分區是在MapTask中通過Partitioner來計算分區號來實現的

  2. Partitioner的初始化

    ① 計算總的分區數partitions,該數值取決於用戶設置的reduceTask的數量

    ② partitions>1時,默認嘗試獲取用戶設置的Partitioner,如果用戶沒有自定義,就會使用HashPartitioner,HashPartitioner根據key的hashcode計算分區號,相同的key或者hash值相同的key分到一個區

    ③ partitions<1時,默認初始化一個Partitioner,這個Partitioner計算的所有key的分區號都為0

  3. 注意

    通常在Job的設置中,希望將數據分為幾個區,就設置reduceTask的數量為對應的數量

    partitions=設置的reduceTask的數量,0<=分區器計算的區號<partitions

  注意2:雖然默認情況下是key相同的作為一組調用一次reduce方法,即以key為傳入key,以value組成的列表values傳入reduce方法,下一組key相同的再調用一次reduce方法,每組key-values分開調用。但是可以通過自定義GroupingComparator(k,knext)的方式“欺騙”reduce方法,強制的將不同的key划分成一組,調用一次reduce方法。事實上這只是改變了原有的分組方案。

分組:

  1. 分組通過分組比較器,對進入reduce的key進行對比,key相同的分為一組,一次性進入Reducer,被reduce方式使用

  2. 分組比較器的設置

    ① 用戶可以自定義key的分組比較器,自定義的比較器必須是一個RawComparator類型的類,重點要實現compareTo()方法

    ② 如果沒有設置key的分組比較器,默認采用在Map階段排序時key使用的比較器

   ReduceTask工作完成后,會將數據寫回context,至此reduce階段就結束了。由於存在兩個ReduceTask,會產生兩個輸出文件。

  需要額外補充:分區規划是可以自定義的,我們可以自定義哪些key進入同一分區,也可以自定義哪一分區交給那個ReduceTask處理,但需要注意,如果在指定某一分區交給一個不存在的ReduceTask時,會報錯,如果自定義了較多的ReduceTask數,而實際用於處理固定分區的ReduceTask又較少,就會產生節點資源的浪費,並且最終輸出幾個空文件。當某一Reducer節點接收的數據量大於其他Reducer節點時,計算速度會明顯慢於其他節點,這就是數據傾斜,解決數據傾斜的方法就是合理的分區和合理規划分區路由到ReduceTask的方案,盡量使各分區內的數據量保持均衡。

  最后OutputFormat以固定的格式將context中的數據寫到文件中,這個OutputFormat也可以自定義,與自定義InPutFormat相同,自定義OutputFormat的核心是自定義RecordWriter,是RecordWriter決定以什么樣的K-V格式寫到文件中的。

  MapReduce雖然將分布式計算的流程封裝成一個框架,讓用戶使用單線程的開發模式開發分布式程序,但是作為框架,MapReduce卻提供了比較高的靈活性。從自定義數據輸入格式化InputFormat、RecordReader,到Mapper處理數據,到OutPutCollector自定義分區規則改變分區方案平衡各區數據量,到自定義Shuffle排序比較器改變排序方案,到自定義分組比較器改變分組方案“組團”調用reduce方法,到reduce方法處理數據,最后自定義OutputFormat改變輸出數據的格式。從始至終,MapReduce提供的靈活性和遍歷性讓大規模數據的處理變得簡單易行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM