hadoop之mapreduce詳解(優化篇)


一、概述

     優化前我們需要知道hadoop適合干什么活,適合什么場景,在工作中,我們要知道業務是怎樣的,能才結合平台資源達到最有優化。除了這些我們當然還要知道mapreduce的執行過程,比如從文件的讀取,map處理,shuffle過程,reduce處理,文件的輸出或者存儲。在工作中,往往平台的參數都是固定的,不可能為了某一個作業去修改整個平台的參數,所以在作業的執行過程中,需要對作業進行單獨的設定,這樣既不會對其他作業產生影響,也能很好的提高作業的性能,提高優化的靈活性。

現在回顧下hadoop的優勢(適用場景):
1、可構建在廉價機器上,設備成本相對低
2、高容錯性,HDFS將數據自動保存多個副本,副本丟失后,自動恢復,防止數據丟失或損壞
3、適合批處理,HDFS適合一次寫入、多次查詢(讀取)的情況,適合在已有的數據進行多次分析,穩定性好
4、適合存儲大文件,其中的大表示可以存儲單個大文件,因為是分塊存儲,以及表示存儲大量的數據

二、小文件優化

從概述中我們知道,很明顯hadoop適合大文件的處理和存儲,那為什么不適合小文件呢?

1、從存儲方面來說:hadoop的存儲每個文件都會在NameNode上記錄元數據,如果同樣大小的文件,文件很小的話,就會產生很多文件,造成NameNode的壓力。
2、從讀取方面來說:同樣大小的文件分為很多小文件的話,會增加磁盤尋址次數,降低性能
3、從計算方面來說:我們知道一個map默認處理一個分片或者一個小文件,如果map的啟動時間都比數據處理的時間還要長,那么就會造成性能低,而且在map端溢寫磁盤的時候每一個map最終會產生reduce數量個數的中間結果,如果map數量特別多,就會造成臨時文件很多,而且在reduce拉取數據的時候增加磁盤的IO。

好,我們明白小文件造成的弊端之后,那我們應該怎么處理這些小文件呢?

1、從源頭干掉,也就是在hdfs上我們不存儲小文件,也就是數據上傳hdfs的時候我們就合並小文件
2、在FileInputFormat讀取入數據的時候我們使用實現類CombineFileInputFormat讀取數據,在讀取數據的時候進行合並。

三、數據傾斜問題優化

我們都知道mapreduce是一個並行處理,那么處理的時間肯定是作業中所有任務最慢的那個了,可謂木桶效應?為什么會這樣呢?

1、數據傾斜,每個reduce處理的數據量不是同一個級別的,所有導致有些已經跑完了,而有些跑的很慢。
2、還有可能就是某些作業所在的NodeManager有問題或者container有問題,導致作業執行緩慢。

那么為什么會產生數據傾斜呢?

數據本身就不平衡,所以在默認的hashpartition時造成分區數據不一致問題,還有就是代碼設計不合理等。

那如何解決數據傾斜的問題呢?

1、既然默認的是hash算法進行分區,那我們自定義分區,修改分區實現邏輯,結合業務特點,使得每個分區數據基本平衡
2、既然有默認的分區算法,那么我們可以修改分區的鍵,讓其符合hash分區,並且使得最后的分區平衡,比如在key前加隨機數n-key。
3、既然reduce處理慢,我們可以增加reduce的內存和vcore呀,這樣挺高性能就快了,雖然沒從根本上解決問題,但是還有效果
4、既然一個reduce處理慢,那我們可以增加reduce的個數來分攤一些壓力呀,也不能根本解決問題,還是有一定的效果。

那么如果不是數據傾斜帶來的問題,而是節點服務有問題造成某些map和reduce執行緩慢呢?

那么我們可以使用推測執行呀,你跑的慢,我們可以找個其他的節點重啟一樣的任務競爭,誰快誰為准。推測執行時以空間換時間的優化。會帶來集群資源的浪費,會給集群增加壓力,所以我司集群的推測執行都是關閉的。其實在作業執行的時候可以偷偷開啟的呀

推測執行參數控制:

mapreduce.map.speculative
mapreduce.reduce.speculative

四、mapreduce過程優化

4.1、map端

上面我們從hadoop的特性場景等聊了下mapreduce的優化,接下來我們從mapreduce的執行過程進行優化。

好吧,我們就從源頭開始說,從數據的讀取以及map數的確定:

    在前面我們聊過小文件的問題,所以在數據的讀取這里也可以做優化,所以選擇一個合適數據的文件的讀取類(FIleInputFormat的實現類)也很重要我們在作業提交的過程中,會把jar,分片信息,資源信息提交到hdfs的臨時目錄,默認會有10個復本,通過參數mapreduce.client.submit.file.replication控制后期作業執行都會去下載這些東西到本地,中間會產生磁盤IO,所以如果集群很大的時候,可以增加該值,提高下載的效率。

分片的計算公式:

計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize))
minSize的默認值是1,而maxSize的默認值是long類型的最大值,即可得切片的默認大小是blockSize(128M)
maxSize參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的值
minSize參數調的比blockSize大,則可以讓切片變得比blocksize還大

     因為map數沒有具體的參數指定,所以我們可以通過如上的公式調整切片的大小,這樣我們就可以設置map數了,那么問題來了,map數該如何設置呢?

這些東西一定要結合業務,map數太多,會產生很多中間結果,導致reduce拉取數據變慢,太少,每個map處理的時間又很長,結合數據的需求,可以把map的執行時間調至到一分鍾左右比較合適,那如果數據量就是很大呢,我們有時候還是需要控制map的數量,這個時候每個map的執行時間就比較長了,那么我們可以調整每個map的資源來提升map的處理能力呀,我司就調整了mapreduce.map.memory.mb=3G(默認1G)mapreduce.map.cpu.vcores=1(默認也是1)

從源頭上我們確定好map之后。那么接下來看map的具體執行過程咯。

首先寫環形換沖區,那為啥要寫環形換沖區呢,而不是直接寫磁盤呢?這樣的目的主要是為了減少磁盤i/o。

每個Map任務不斷地將鍵值對輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是為了更有效地使用內存空間,在內存中放置盡可能多的數據。執行流程是,該緩沖默認100M(mapreduce.task.io.sort.mb參數控制),當到達80%(mapreduce.map.sort.spill.percent參數控制)時就會溢寫磁盤。每達到80%都會重寫溢寫到一個新的文件。那么,我們完全可以根據機器的配置和數據來兩種這兩個參數,當內存足夠,我們增大mapreduce.task.io.sort.mb完全會提高溢寫的過程,而且會減少中間結果的文件數量。我司調整mapreduce.task.io.sort.mb=512。當文件溢寫完后,會對這些文件進行合並,默認每次合並10(mapreduce.task.io.sort.factor參數控制)個溢寫的文件,我司調整mapreduce.task.io.sort.factor=64。這樣可以提高合並的並行度,減少合並的次數,降低對磁盤操作的次數。

mapreduce.shuffle.max.threads(默認為0,表示可用處理器的兩倍),該參數表示每個節點管理器的工作線程,用於map輸出到reduce。

那么map算是完整了,在reduce拉取數據之前,我們完全還可以combiner呀(不影響最終結果的情況下),此時會根據Combiner定義的函數對map的結果進行合並這樣就可以減少數據的傳輸,降低磁盤io,提高性能了。

終於走到了map到reduce的數據傳輸過程了:
這中間主要的影響無非就是磁盤IO,網絡IO,數據量的大小了(是否壓縮),其實減少數據量的大小,就可以做到優化了,所以我們可以選擇性壓縮數據,這樣在傳輸的過程中
就可以降低磁盤IO,網絡IO等。可以通過mapreduce.map.output.compress(default:false)設置為true進行壓縮,數據會被壓縮寫入磁盤,讀數據讀的是壓縮數據需要解壓,在實際經驗中Hive在Hadoop的運行的瓶頸一般都是IO而不是CPU,壓縮一般可以10倍的減少IO操作,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)參數設置。我司使用org.apache.hadoop.io.compress.SnappyCodec算法,但這個過程會消耗CPU,適合IO瓶頸比較大。

mapreduce.task.io.sort.mb        #排序map輸出所需要使用內存緩沖的大小,以兆為單位, 默認為100
mapreduce.map.sort.spill.percent #map輸出緩沖和用來磁盤溢寫過程的記錄邊界索引,這兩者使用的閾值,默認0.8
mapreduce.task.io.sort.factor    #排序文件時,一次最多合並的文件數,默認10
mapreduce.map.output.compress    #在map溢寫磁盤的過程是否使用壓縮,默認false
org.apache.hadoop.io.compress.SnappyCodec  #map溢寫磁盤的壓縮算法,默認org.apache.hadoop.io.compress.DefaultCodec
mapreduce.shuffle.max.threads    #該參數表示每個節點管理器的工作線程,用於map輸出到reduce,默認為0,表示可用處理器的兩倍

4.1、reduce端

接下來就是reduce了,首先我們可以通過參數設置合理的reduce個數(mapreduce.job.reduces參數控制),以及通過參數設置每個reduce的資源,mapreduce.reduce.memory.mb=5G(默認1G)
mapreduce.reduce.cpu.vcores=1(默認為1)。

reduce在copy的過程中默認使用5(mapreduce.reduce.shuffle.parallelcopies參數控制)個並行度進行復制數據,我司調了mapreduce.reduce.shuffle.parallelcopies=100.reduce的每一個下載線程在下載某個map數據的時候,有可能因為那個map中間結果所在機器發生錯誤,或者中間結果的文件丟失,或者網絡瞬斷等等情況,這樣reduce的下載就有可能失敗,所以reduce的下載線程並不會無休止的等待下去,當一定時間后下載仍然失敗,那么下載線程就會放棄這次下載,並在隨后嘗試從另外的地方下載(因為這段時間map可能重跑)。reduce下載線程的這個最大的下載時間段是可以通過mapreduce.reduce.shuffle.read.timeout(default180000秒)調整的。

Copy過來的數據會先放入內存緩沖區中,然后當使用內存達到一定量的時候才spill磁盤。這里的緩沖區大小要比map端的更為靈活,它基於JVM的heap size設置。這個內存大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7)控制的。意思是說,shuffile在reduce內存中的數據最多使用內存量為:0.7 × maxHeap of reduce task,內存到磁盤merge的啟動門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置。

copy完成后,reduce進入歸並排序階段,合並因子默認為10(mapreduce.task.io.sort.factor參數控制),如果map輸出很多,則需要合並很多趟,所以可以提高此參數來減少合並次數。

mapreduce.reduce.shuffle.parallelcopies #把map輸出復制到reduce的線程數,默認5
mapreduce.task.io.sort.factor  #排序文件時一次最多合並文件的個數
mapreduce.reduce.shuffle.input.buffer.percent #在shuffle的復制階段,分配給map輸出緩沖區占堆內存的百分比,默認0.7
mapreduce.reduce.shuffle.merge.percent #map輸出緩沖區的閾值,用於啟動合並輸出和磁盤溢寫的過程

 

更多hadoop生態文章見: hadoop生態系列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM