原文鏈接https://www.cnblogs.com/felixzh/p/8604188.html
Map階段包括:
第一讀數據:從HDFS讀取數據
1、問題:讀取數據產生多少個Mapper?
Mapper數據過大的話,會產生大量的小文件,由於Mapper是基於虛擬機的,過多的Mapper創建和初始化及關閉虛擬機都會消耗大量的硬件資源;
Mapper數太小,並發度過小,Job執行時間過長,無法充分利用分布式硬件資源;
2、Mapper數量由什么決定?
(1)輸入文件數目
(2)輸入文件的大小
(3)配置參數
這三個因素決定的。涉及參數:
mapreduce.input.fileinputformat.split.minsize //啟動map最小的split size大小,默認0
mapreduce.input.fileinputformat.split.maxsize //啟動map最大的split size大小,默認256M
dfs.block.size//block塊大小,默認64M
計算公式:splitSize = Math.max(minSize, Math.min(maxSize, blockSize));
例如默認情況下:例如一個文件800M,Block大小是128M,那么Mapper數目就是7個。6個Mapper處理的數據是128M,1個Mapper處理的數據是32M;
再例如一個目錄下有三個文件大小分別為:5M10M 150M 這個時候其實會產生四個Mapper處理的數據分別是5M,10M,128M,22M。
Mapper是基於文件自動產生的,如果想要自己控制Mapper的個數???
就如上面,5M,10M的數據很快處理完了,128M要很長時間;這個就需要通過參數的控制來調節Mapper的個數。
減少Mapper的個數的話,就要合並小文件,這種小文件有可能是直接來自於數據源的小文件,也可能是Reduce產生的小文件。
設置合並器:(set都是在hive腳本,也可以配置Hadoop)
設置合並器本身:
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000;//每個Mapper要處理的數據,就把上面的5M10M……合並成為一個
一般還要配合一個參數:
set mapred.max.split.size=256000000 // mapred切分的大小
set mapred.min.split.size.per.node=128000000//低於128M就算小文件,數據在一個節點會合並,在多個不同的節點會把數據抓過來進行合並。
Hadoop中的參數:可以通過控制文件的數量控制mapper數量
mapreduce.input.fileinputformat.split.minsize(default:0),小於這個值會合並
mapreduce.input.fileinputformat.split.maxsize 大於這個值會切分
第二處理數據:
1、Partition說明
對於map輸出的每一個鍵值對,系統都會給定一個partition,partition值默認是通過計算key的hash值后對Reduce task的數量取模獲得。如果一個鍵值對的partition值為1,意味着這個鍵值對會交給第一個Reducer處理。
自定義partitioner的情況:
1、我們知道每一個Reduce的輸出都是有序的,但是將所有Reduce的輸出合並到一起卻並非是全局有序的,如果要做到全局有序,我們該怎么做呢?最簡單的方式,只設置一個Reduce task,但是這樣完全發揮不出集群的優勢,而且能應對的數據量也很受限。最佳的方式是自己定義一個Partitioner,用輸入數據的最大值除以系統Reduce task數量的商作為分割邊界,也就是說分割數據的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執行partition后的數據是整體有序的。
2、解決數據傾斜:另一種需要我們自己定義一個Partitioner的情況是各個Reduce task處理的鍵值對數量極不平衡。對於某些數據集,由於很多不同的key的hash值都一樣,導致這些鍵值對都被分給同一個Reducer處理,而其他的Reducer處理的鍵值對很少,從而拖延整個任務的進度。當然,編寫自己的Partitioner必須要保證具有相同key值的鍵值對分發到同一個Reducer。
3、自定義的Key包含了好幾個字段,比如自定義key是一個對象,包括type1,type2,type3,只需要根據type1去分發數據,其他字段用作二次排序。
2、環形緩沖區
Map的輸出結果是由collector處理的,每個Map任務不斷地將鍵值對輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。
這個數據結構其實就是個字節數組,叫Kvbuffer,名如其義,但是這里面不光放置了數據,還放置了一些索引數據,給放置索引數據的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(字節序采用的是平台自身的字節序)的馬甲。數據區域和索引數據區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來划分兩者,分界點不是亘古不變的,而是每次Spill之后都會更新一次。初始的分界點是0,數據的存儲方向是向上增長,索引數據的存儲方向是向下增長Kvbuffer的存放指針bufindex是一直悶着頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之后,bufindex增長為4,一個Int型的value寫完之后,bufindex增長為8。
索引是對在kvbuffer中的鍵值對的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數據。比如Kvindex初始位置是-4,當第一個鍵值對寫完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然后Kvindex跳到-8位置,等第二個鍵值對和索引寫完之后,Kvindex跳到-12位置。
3、第三寫數據到磁盤
Mapper中的Kvbuffer的大小默認100M,可以通過mapreduce.task.io.sort.mb(default:100)參數來調整。可以根據不同的硬件尤其是內存的大小來調整,調大的話,會減少磁盤spill的次數此時如果內存足夠的話,一般都會顯著提升性能。spill一般會在Buffer空間大小的80%開始進行spill(因為spill的時候還有可能別的線程在往里寫數據,因為還預留空間,有可能有正在寫到Buffer中的數據),可以通過mapreduce.map.sort.spill.percent(default:0.80)進行調整,Map Task在計算的時候會不斷產生很多spill文件,在Map Task結束前會對這些spill文件進行合並,這個過程就是merge的過程。mapreduce.task.io.sort.factor(default:10),代表進行merge的時候最多能同時merge多少spill,如果有100個spill個文件,此時就無法一次完成整個merge的過程,這個時候需要調大mapreduce.task.io.sort.factor(default:10)來減少merge的次數,從而減少磁盤的操作;
Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到“命令”之后就開始正式干活,干的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。
Combiner存在的時候,此時會根據Combiner定義的函數對map的結果進行合並,什么時候進行Combiner操作呢???和Map在一個JVM中,是由min.num.spill.for.combine的參數決定的,默認是3,也就是說spill的文件數在默認情況下由三個的時候就要進行combine操作,最終減少磁盤數據;
減少磁盤IO和網絡IO還可以進行:壓縮,對spill,merge文件都可以進行壓縮。中間結果非常的大,IO成為瓶頸的時候壓縮就非常有用,可以通過mapreduce.map.output.compress(default:false)設置為true進行壓縮,數據會被壓縮寫入磁盤,讀數據讀的是壓縮數據需要解壓,在實際經驗中Hive在Hadoop的運行的瓶頸一般都是IO而不是CPU,壓縮一般可以10倍的減少IO操作,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)參數設置。但這個過程會消耗CPU,適合IO瓶頸比較大。
Shuffle和Reduce階段包括:
一、Copy
1、由於job的每一個map都會根據reduce(n)數將數據分成map 輸出結果分成n個partition,所以map的中間結果中是有可能包含每一個reduce需要處理的部分數據的。所以,為了優化reduce的執行時間,hadoop中是等job的第一個map結束后,所有的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分數據,因此map和reduce是交叉進行的,其實就是shuffle。Reduce任務通過HTTP向各個Map任務拖取(下載)它所需要的數據(網絡傳輸),Reducer是如何知道要去哪些機器取數據呢?一旦map任務完成之后,就會通過常規心跳通知應用程序的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有數據(如何知道提取完?)數據被reduce提走之后,map機器不會立刻刪除數據,這是為了預防reduce任務失敗需要重做。因此map輸出數據是在整個作業完成之后才被刪除掉的。
2、reduce進程啟動數據copy線程(Fetcher),通過HTTP方式請求maptask所在的TaskTracker獲取maptask的輸出文件。由於map通常有許多個,所以對一個reduce來說,下載也可以是並行的從多個map下載,那到底同時到多少個Mapper下載數據??這個並行度是可以通過mapreduce.reduce.shuffle.parallelcopies(default5)調整。默認情況下,每個Reducer只會有5個map端並行的下載線程在從map下數據,如果一個時間段內job完成的map有100個或者更多,那么reduce也最多只能同時下載5個map的數據,所以這個參數比較適合map很多並且完成的比較快的job的情況下調大,有利於reduce更快的獲取屬於自己部分的數據。 在Reducer內存和網絡都比較好的情況下,可以調大該參數;
3、reduce的每一個下載線程在下載某個map數據的時候,有可能因為那個map中間結果所在機器發生錯誤,或者中間結果的文件丟失,或者網絡瞬斷等等情況,這樣reduce的下載就有可能失敗,所以reduce的下載線程並不會無休止的等待下去,當一定時間后下載仍然失敗,那么下載線程就會放棄這次下載,並在隨后嘗試從另外的地方下載(因為這段時間map可能重跑)。reduce下載線程的這個最大的下載時間段是可以通過mapreduce.reduce.shuffle.read.timeout(default180000秒)調整的。如果集群環境的網絡本身是瓶頸,那么用戶可以通過調大這個參數來避免reduce下載線程被誤判為失敗的情況。一般情況下都會調大這個參數,這是企業級最佳實戰。
二、MergeSort
1、這里的merge和map端的merge動作類似,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩沖區中,然后當使用內存達到一定量的時候才spill磁盤。這里的緩沖區大小要比map端的更為靈活,它基於JVM的heap size設置。這個內存大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源碼里面寫死了) 來設置,這個參數其實是一個百分比,意思是說,shuffile在reduce內存中的數據最多使用內存量為:0.7 × maxHeap of reduce task。JVM的heapsize的70%。內存到磁盤merge的啟動門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置。也就是說,如果該reduce task的最大heap使用量(通常通過mapreduce.admin.reduce.child.java.opts來設置,比如設置為-Xmx1024m)的一定比例用來緩存數據。默認情況下,reduce會使用其heapsize的70%來在內存中緩存數據。假設 mapreduce.reduce.shuffle.input.buffer.percent 為0.7,reducetask的max heapsize為1G,那么用來做下載數據緩存的內存就為大概700MB左右。這700M的內存,跟map端一樣,也不是要等到全部寫滿才會往磁盤刷的,而是當這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁盤刷(刷磁盤前會先做sortMerge)。這個限度閾值也是可以通過參數 mapreduce.reduce.shuffle.merge.percent(default0.66)來設定。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。這種merge方式一直在運行,直到沒有map端的數據時才結束,然后啟動磁盤到磁盤的merge方式生成最終的那個文件。
這里需要強調的是,merge有三種形式:1)內存到內存(memToMemMerger)2)內存中Merge(inMemoryMerger)3)磁盤上的Merge(onDiskMerger)具體包括兩個:(一)Copy過程中磁盤合並(二)磁盤到磁盤。
(1)內存到內存Merge(memToMemMerger) Hadoop定義了一種MemToMem合並,這種合並將內存中的map輸出合並,然后再寫入內存。這種合並默認關閉,可以通過mapreduce.reduce.merge.memtomem.enabled(default:false)
打開,當map輸出文件達到mapreduce.reduce.merge.memtomem.threshold時,觸發這種合並。
(2)內存中Merge(inMemoryMerger):當緩沖中數據達到配置的閾值時,這些數據在內存中被合並、寫入機器磁盤。閾值有2種配置方式:
配置內存比例:前面提到reduceJVM堆內存的一部分用於存放來自map任務的輸入,在這基礎之上配置一個開始合並數據的比例。假設用於存放map輸出的內存為500M,mapreduce.reduce.shuffle.merge.percent配置為0.66,則當內存中的數據達到330M的時候,會觸發合並寫入。
配置map輸出數量: 通過mapreduce.reduce.merge.inmem.threshold配置。在合並的過程中,會對被合並的文件做全局的排序。如果作業配置了Combiner,則會運行combine函數,減少寫入磁盤的數據量。
(3)磁盤上的Merge(onDiskMerger):
(3.1)Copy過程中磁盤Merge:在copy過來的數據不斷寫入磁盤的過程中,一個后台線程會把這些文件合並為更大的、有序的文件。如果map的輸出結果進行了壓縮,則在合並過程中,需要在內存中解壓后才能給進行合並。這里的合並只是為了減少最終合並的工作量,也就是在map輸出還在拷貝時,就開始進行一部分合並工作。合並的過程一樣會進行全局排序。
(3.2)最終磁盤中Merge:當所有map輸出都拷貝完畢之后,所有數據被最后合並成一個整體有序的文件,作為reduce任務的輸入。這個合並過程是一輪一輪進行的,最后一輪的合並結果直接推送給reduce作為輸入,節省了磁盤操作的一個來回。最后(所以map輸出都拷貝到reduce之后)進行合並的map輸出可能來自合並后寫入磁盤的文件,也可能來及內存緩沖,在最后寫入內存的map輸出可能沒有達到閾值觸發合並,所以還留在內存中。
每一輪合並不一定合並平均數量的文件數,指導原則是使用整個合並過程中寫入磁盤的數據量最小,為了達到這個目的,則需要最終的一輪合並中合並盡可能多的數據,因為最后一輪的數據直接作為reduce的輸入,無需寫入磁盤再讀出。因此我們讓最終的一輪合並的文件數達到最大,即合並因子的值,通過mapreduce.task.io.sort.factor(default:10)來配置。
如上圖:Reduce階段中一個Reduce過程 可能的合並方式為:假設現在有20個map輸出文件,合並因子配置為5,則需要4輪的合並。最終的一輪確保合並5個文件,其中包括2個來自前2輪的合並結果,因此原始的20個中,再留出3個給最終一輪。
三、Reduce函數調用(用戶自定義業務邏輯)
1、當reduce將所有的map上對應自己partition的數據下載完成后,就會開始真正的reduce計算階段。reducetask真正進入reduce函數的計算階段,由於reduce計算時肯定也是需要消耗內存的,而在讀取reduce需要的數據時,同樣是需要內存作為buffer,這個參數是控制,reducer需要多少的內存百分比來作為reduce讀已經sort好的數據的buffer大小??默認用多大內存呢??默認情況下為0,也就是說,默認情況下,reduce是全部從磁盤開始讀處理數據。可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設置reduce的緩存。如果這個參數大於0,那么就會有一定量的數據被緩存在內存並輸送給reduce,當reduce計算邏輯消耗內存很小時,可以分一部分內存用來緩存數據,可以提升計算的速度。所以默認情況下都是從磁盤讀取數據,如果內存足夠大的話,務必設置該參數讓reduce直接從緩存讀數據,這樣做就有點Spark Cache的感覺;
2、Reduce在這個階段,框架為已分組的輸入數據中的每個 <key, (list of values)>對調用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable,Writable)寫入文件系統的。Reducer的輸出是沒有排序的。