MapReduce編程job概念原理


  在Hadoop中,每個MapReduce任務都被初始化為一個job,每個job又可分為兩個階段:map階段和reduce階段。這兩個階段分別用兩個函數來表示。Map函數接收一個<key,value>形式的輸入,然后同樣產生一個<ey,value>形式的中間輸出,Hadoop會負責將所有具有相同中間key值的value集合在一起傳遞給reduce函數,reduce函數接收一個如<key,(list of values)>形式的輸入,然后對這個value集合進行處理,每個reduce產生0或1個輸出,reduce的輸出也是<key,value>形式。

  

簡易代碼:

public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{
    //設置常量1,用來形成<word,1>形式的輸出
    private fianll static IntWritable one = new IntWritable(1)
    private Text word = new Text();

public void map(LongWritable key,Text value,OutputCollector<Text,output,Reporter reporter) throws IOException{
   //hadoop執行map函數時為是一行一行的讀取數據處理,有多少行,就會執行多少次map函數
    String line = value.toString();
    //進行單詞的分割,可以多傳入進行分割的參數
    StringTokenizer tokenizer = new StringTokenizer(line);
    //遍歷單詞
    while(tokenizer.hasMoreTokens()){
       //往Text中寫入<word,1>
        word.set(tokenizer.nextToken());
        output.collect(word,one);
    }
    }
}
//需要注意的是,reduce將相同key值(這里是word)的value值收集起來,形成<word,list of 1>的形式,再將這些1累加
public static class Reduce extends MapReduceBase implements Reducer<Text IntWritable,Text,IntWritable>{
        public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException{
    //初始word個數設置
    int sum = 0;
    while(values,hasNext()){
     //單詞個數相加   
        sum += value.next().get();
    }
    output.collect(key,new IntWritbale(sum));
    }
}

執行概念總結:

job.setInputFormatClass(TextInputFormat.class);

1.InputFormat()和inputSplit

  inputSplit是Hadoop定義的用來傳送給每個單獨的map的數據,InputSplit存儲的並非數據本身,而是一個分片長度和一個記錄數據位置的數組,生成InputSplit的方法可以通過InputFormat(I)來設置。當數據傳送給map時,map會將輸入分片傳送到inputFormat上,InputFormat則調用getREcordReduer()方法生成RecordReader,RecordReader再通過createKey()、createValue()方法創建可供map處理的<key,value>對,即<k1,v1>,簡而言之InputFormat方法是用來生成可供map處理的<key,value>對的。

  在這里如果不設置的話,TextInputFormat會是Hadoop默認的輸入方法,在TextInputFormat中,每個人間(或其一部分)都會單獨地作為map的輸入,繼承自FileInputFormat,之后,每行數據都會生成一條記錄,每條記錄則表示成<key,value>形式:

  其中,key值是每個數據的記錄在數據分片中的字節偏移量,數據類型是LongWritable.

  value值是每行的內容,數據類型是Text。

job.setOutputValueClass(TextInputFormat.class);

2、OutputFormat

  每一種輸入格式都有一種輸出格式與其對應。同樣,默認的輸出格式是TextOutputFormat,這種輸出方式與輸入類似,會將每條記錄以一行的形式存入文本文件。不過它的鍵和值都可以以任意形式的,因為程序內部會調用toString()方法將鍵和值轉換為String類型再輸出。

3、map和reduce

  map函數接收經過inputFormat處理產生的<k1,v1>,然后輸出<k2,v2>,map函數老的版本寫法是繼承MapReduceBase然后實現Mapper接口,但是現在可以直接繼承Mapper接口,此接口是一個泛型類型,有4種形式的參數,分別用來指定map的輸入key值類型(LongWritable key),輸入value值類型(Text value)、輸出key值類型和(Text)輸出value值類型(IntWritable,本例是reporter)

  reduce函數以map的輸出作為輸入,因此reduce的輸入類型是<Text,IntWritable>.而reduce的輸出是單詞和它的數目,因此,它的輸出類型是<Text,IntWritable>

4、任務調度

  計算方面:Hadoop總會有限將任務分配給空閑的機器,使所有的任務能公平地分享系統資源,I/O方面:Hadoop會盡量將map任務分配給InputSplit所在機器,以減少網絡I/O的消耗。

5、數據預處理與InputSplit的大小。

  Hadoop會在處理每個block后將其作為一個InputSplit,因此合理地甚至block塊大小是很重要的。也可通過合理地設置map任務的數量來調節map任務的數據輸入。

6、map和reduce任務的數量

  設置map任務槽和reduce任務槽,map/reduce任務槽是這個集群能夠同時運行的map/reduce任務的最大數量。可以通過hadoop的配置文件設置每台機器最多可以同時運行map任務和reduce任務的個數,比如有10台機器,設置每台最多可以同時運行10個map任務和5個reduce任務,那么這個集群的map任務槽就是1000,reduce任務槽就是500.一般來說,設置的reduce任務數量應該是reduce任務槽的0.95或是1.75倍

7、combine函數

  combine函數是用於在本地合並數據的函數,從wordcount程序中,詞頻是一個接近於zipf分布的,每個map任務可能會產生成千上萬個<the,i>記錄,若將這些記錄一一傳給reduce任務是很耗時的,所以可以設置一個combine函數,用於本地合並,大大減少網絡I/O操作的消耗。

job.setCombinerClass(combine.class);
//指定reduce函數為combine函數
job.setReducerClass(Reduce.class);

8、Hadoop流的工作原理

  當一個可執行文件作為Mapper時,每個map任務會以一個獨立的進程啟動這個可執行文件,然后在map任務運行時,會把輸入切分成行提供給可執行文件,並作為它的標准輸入(stdin)內容。當可執行文件運行處結果時,map從標准輸出(stdout)中手機數據,並將其轉化為<key,value>對,作為map的輸出。

參考:<Hadoop實戰>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM