MapReduce面試題


什么是mapreduce

Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基於hadoop的數據分析應用”的核心框架。容錯高,擴展好,適合pB數據處理

MapReduce 執行過程分析

第一階段map
1.map task讀取HDFS文件。每個block,啟動一個map task。每個map task按照行讀取一個block中的內容,對每一行執行map函數
2.map函數對輸入的數據進行拆分split,得到一個數組,組成一個鍵值對<word, 1>
3.分區,每個分區對應1個reduce task
4.對每個分區中的數據,按照key進行分組並排序
5.在map段執行小reduce,輸出<key,times>

第二階段reduce
1.每個分區對應一個reduce task,這個reduce task會讀取相同分區的map輸出;reduce task對接收到的所有map輸出,進行排序分組<hello,{1,1}><me,{1}><you,{1}>
2.執行reduce 操作,對一個分組中的value進行累加<hello,2><me,1><you,1>
3.每個分區輸出到一個HDFS文件中

Mapreduce數據傾斜原因和解決方案

原因:
    簡單來說數據傾斜就是數據的key 的分化嚴重不均,造成一部分數據很多,一部分數據很少的局面。
    情形:group by 維度過小,某值的數量過多
    后果:處理某值的reduce非常耗時
    去重 distinct count(distinct xx)
    情形:某特殊值過多
    后果:處理此特殊值的reduce耗時
    連接 join
    情形1:其中一個表較小,但是key集中
    后果1:分發到某一個或幾個Reduce上的數據遠高於平均值
    情形2:大表與大表,但是分桶的判斷字段0值或空值過多
    后果2:這些空值都由一個reduce處理,非常慢
解決:
    調優參數
  set hive.map.aggr=true:在map中會做部分聚集操作,效率更高但需要更多的內存。
  set hive.groupby.skewindata=true:數據傾斜時負載均衡,當選項設定為true,生成的查詢計划會有兩個MRJob。第一個MRJob 中,Map的輸出結果集合會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MRJob再根據預處理的數據結果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。

2、在 key 上面做文章,在 map 階段將造成傾斜的key 先分成多組,例如 aaa 這個 key,map 時隨機在 aaa 后面加上 1,2,3,4 這四個數字之一,把 key 先分成四組,先進行一次運算,之后再恢復 key 進行最終運算。
3、能先進行 group 操作的時候先進行 group 操作,把 key 先進行一次 reduce,之后再進行 count 或者 distinct count 操作。
4、join 操作中,使用 map join 在 map 端就先進行 join ,免得到reduce 時卡住。

 Java的序列化和hadoop序列化機制(Writable

1.緊湊 
緊湊的格式能讓我們充分利用網絡帶寬,而帶寬是數據中心最稀缺的資源
2.快速
進程通信形成了分布式系統的骨架,所以需要盡量減少序列化和反序列化的性能開銷,這是基本的;
3.可擴展
協議為了滿足新的需求變化,所以控制客戶端和服務器過程中,需要直接引進相應的協議,這些是新協議,原序列化方式能支持新的協議報文;
4.互操作
能支持不同語言寫的客戶端和服務端進行交互; 

Mapreduce的動態執行流程

MRAppMaster向RM申請2個container;
RM分配2個container,
NM啟動該container,把container的地址返回給master。
container中開始運行map task或者reduce task。
在運行過程中,master與map task、reduce task保持心跳通訊,監測task的執行情況。
當map task和reduce task運行結束時,回收container。
當job運行結束時,master告訴RM,可以把自己回收了。
RM就讓NM殺死master所在的container。
整個job運行完畢

 切片機制

一個split的大小是由goalSize, minSize, blockSize這三個值決定的。
computeSplitSize的邏輯是,先從goalSize和blockSize兩個值中選出最小的那個(比如一般不設置map數,這時blockSize為當前文件的塊size,而goalSize是文件大小除以用戶設置的map數得到的,如果沒設置的話,默認是1)。

 CombineTextInputFormat案例實操

// 如果不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

 suffer機制

 WritableComparable排序

bean對象實現WritableComparable接口重寫compareTo方法,就可以實現排序
@Override
public int compareTo(FlowBean o) {
    // 倒序排列,從大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
if(this.sumFlow==o.getSumFlow()){
This.downFlow>o.getDownFlow() ? -1 :1
}
}

 自定義Partitioner

1)自定義類繼承Partitioner,重寫getPartition()方法
    public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 獲取電話號碼的前三位
        String preNum = key.toString().substring(0, 3);
        
        int partition = 4;
        
        // 2 判斷是哪個省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}
2)在job驅動中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class); (3)自定義partition后,要根據自定義partitioner的邏輯設置相應數量的reduce task job.setNumReduceTasks(5);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM