詳解MapReduce(Spark和MapReduce對比鋪墊篇)


本來筆者是不打算寫MapReduce的,但是考慮到目前很多公司還都在用這個計算引擎,以及后續要講的Hive原生支持的計算引擎也是MapReduce,並且為Spark和MapReduce的對比做鋪墊,筆者今天詳細闡述一下MapReduce。鑒於Hadoop1.X已過時,Hadoop3.X目前用的還不多,企業中目前大量運用的還是Hadoop2.X,所以以下都是基於Hadoop2.X版本的MapReduce(后續要講的HDFS和Yarn也是)。

MapReduce是Hadoop核心三劍客之一,設計思想來源於谷歌三篇論文之一的《分布式計算模型》。作為一個分布式運算程序編程框架,需要用戶實現業務邏輯代碼並和它自帶的默認組件整合成完整的分布式運算程序,並發運行在Hadoop集群上。一個完整的MapReduce程序在分布式運行時有三類實例進程:

1. MRAppMaster:負責整個程序過程調度及狀態協調

2. MapTask:負責map階段整個數據處理流程

3. ReduceTask:負責reduce階段整個數據處理流程

這里筆者還是要強調一點:MapTask和ReduceTask是進程級別,這一點很重要!

筆者畫了一張MapReduce處理的流程圖,並以處理單詞統計的例子作為示例:

 

MapReduce處理數據主要分為兩個階段:map和reduce,對應到上圖分別對應的處理實例就是MapTask和ReduceTask。數據處理先進內存然后刷磁盤,雖然有溢出比限制,但是筆者強調,落磁盤至少一次,通過上圖以及接下來的講解明白了MapReduce的整個處理流程、細節也就能掌握shuffle階段都干了什么。下面就圖說說里面核心的機制和涉及的組件:

切片機制

切片也就是把文件切成一個個block塊,但是此處的切片是邏輯切片而非物理切片。切片的邏輯可以查看接口InputFormat<K, V>的getSplits方法,通過它的一個實現類FileInputFormat看看切片的默認實現機制,直接看源碼:


這里咱們主要關注文件可切分的部分,通過分析源碼,在FileInputFormat中,默認切片機制如下:

1. 簡單的按照文件的內容長度進行切片

2. 切片大小,默認等於block大小

3. 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片

因此默認情況下,切片大小等於blocksize。但是,不論怎么調參數,都不能讓多個小文件“划入”一個split, 會影響性能, 后續講HDFS時會說明一下小文件的問題

了解完切片機制之后,初學者容易陷入一個誤區,就是比如我配置blocksize為128M,那么我一個文件就會按照128M等比例切分,切到最后不足128M部分單獨作為一個切片,但筆者強調這是要分情況的。其實細心的小伙伴會看到我源碼截圖中的注釋部分,關鍵的參數SPLIT_SLOP為1.1,同樣以blocksize為128M為例,假如對於一個130M的可切分文件會產生幾個block塊呢?很顯然130 < 128*1.1,就產生一個切片為130M的block,所以多看源碼很重要。

 

 

2. MapReduce並行度決定機制

 

1)MapTask並行度決定機制

了解了切片機制,就很容易了解MapTask的並行度機制了,因為MapTask的並行度主要取決於切片機制。一個任務的map階段並行度由客戶端在提交任務時決定,而客戶端對map階段並行度的規划的基本邏輯為:按照一個特定切片大小,將待處理數據划分成邏輯上的多個切片,然后每一個切片分配一個mapTask並行實例處理

 

2)ReduceTask並行度決定機制

ReduceTask設置方式就很簡單了,可以直接手動設置:job.setNumReduceTasks(4);,默認值是1,手動設置為4。ReduceTask的並行度同樣影響整個任務的執行效率,如果數據分布不均勻,就有可能產生數據傾斜。

注意:ReduceTask設置方式就很簡單了,可以直接手動設置:數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個ReduceTask設置方式就很簡單了,可以直接手動設置:。盡量不要運行太多的ReduceTask。對大多數任務來說,最好reduce的個數最多和集群中的reduce持平,或者比集群的reduce slots小。這個對於小集群而言,尤其重要。

並發數的選擇受多方面因素影響,比如運算節點的硬件配置、運算任務的類型:CPU密集型還是IO密集型、運算任務的數據量,這個還是要根據實際情況而定。

 

3)map|reduce端核心組件

   

a) partitioner組件

在map輸出數據溢出到磁盤之前調用。默認根據key.hashcode%reduce數量(HashPartition),可以自定義分區組件

 

 

b) combiner組件(繼承了Reducer)          

在map環形緩沖區、reduce輸入緩沖區溢出到磁盤之前、多個溢出文件合並時調用。目的是減少寫磁盤的數據量(磁盤IO)和傳遞給reduce的數據量(帶寬)。慎用:調用次數不一定,不能影響核心業務邏輯,如對數據求平均值:

2  5  6:加入combiner,2+5+6/3=13/3

4  3 :加入combiner,4+3/2=7/2,最終(13/3+7/2)/2 = 47/12
不加combiner組件:2+5+6+4+3/5 = 4                

 

c) 分組                 

reduce階段合並數據的規則,默認根據key相同分為一組

 

/**利用reduce端的GroupingComparator來實現將一組bean看成相同的key */
public class ItemidGroupingComparator extends WritableComparator {
    //傳入作為key的bean的class類型,以及制定需要讓框架做反射獲取實例對象
    protected ItemidGroupingComparator() {
            super(Order.class,true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
            //強轉
            Order aBean = (Order) a;
            Order bBean = (Order) b;
            //比較兩個bean時,指定只比較bean中itemId
       return aBean.getItemId().compareTo(bBean.getItemId());
    }
}


最后再說一下MapReduce的分布式緩存:MapReduce通過DistributedCache,可以將job指定的文件,在job執行前,先行分發到task執行的機器上,並提供相關機制對cache文件進行管理。但需要注意:需要分發的文件,必須提前放到hdfs上;需要分發的文件在任務運行期間最好是只讀的;不建議分發較大的文件,影響性能。主要用來分發第三方庫、多表數據join時小表數據簡便處理等。可以在自定實現Mapper類,重寫setup方法中進行分布式緩存處理。

對於MapReduce分布式緩存,很類似於Spark中的廣播變量,后續講到Spark廣播變量和累計變量時再細說。


關注微信公眾號:大數據學習與分享,獲取更對技術干貨


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM