Hadoop(十三)分析MapReduce程序


前言

  剛才發生了悲傷的一幕,本來這篇博客馬上就要寫好的,花了我一晚上的時間。但是剛才電腦沒有插電源就沒有了。很難受!想哭,但是沒有辦法繼續站起來。

  前面的一篇博文中介紹了什么是MapReduce,這一篇給大家詳細的分享一下MapReduce的運行原理。

一、寫一個MapReduce程序例子

1.1、數據准備

  准備要處理的數據(假定數據已經存放在hdfs的/data目錄下)
    $> hdfs dfs -ls /data
  看到測試數據目錄。天氣數據目錄/data/weather,專利數據目錄/data/patent。
  若沒有,則自行將數據上傳到上述目錄基本步驟如下:
    $> hdfs dfs -mkdir /data
    $> hdfs dfs -mkdir /data/weather
    $> hdfs dfs -put ~/weather/999999-99999-1992 /data/weather

  天氣數據格式:
    每行一條記錄,記錄了年份,氣象站編號,溫度及數據質量
    014399999999999 1992 012912004+11900+163700FM-13+9999KGWU

    V0200601N00871220001CN0100001N9+026+02201101531ADDAG12001AY101061AY201061GF103991031021004501999999MD1110081

    +9999MW1151SA1+2789UA1M030159999UG1080250809REMSYN02922233 00278 20303 308// 40805

  數據分析: 

    (0, 15)氣象站編號
    (15,19)年份
    (87, 92) 檢查到的溫度,如果為+9999則表示沒有檢測到溫度
    (92, 93)溫度數據質量,為【01459】表示該溫度是合理溫度

1.2、需求分析

  1)需求 

    以/data/weather/999999-99999-1992數據,請計算出每個氣象站檢測到的最高氣溫(這個程序也可以計算每年的最高溫度)

  2)分析

1.3、編寫一個解析類解析天氣數據

  WeatherRecordParser

import org.apache.hadoop.io.Text;

public class WeatherRecordParser{
    private String stationId;
    private int year;
    private int temperature;
    private boolean valid;

    /**
     * 87-92  +9999
     * @param line
     */
    public void parse(String line){
        if(line.length()<94){
            valid=false;
            return;
        }
        if(line.substring(87,92).equals("+9999")){
            valid=false;
            return;
        }
        if(line.substring(92,93).matches("[01459]")){
            valid=true;
            stationId=line.substring(0,15);
            year=Integer.parseInt(
                line.substring(15,19));
            temperature=Integer.parseInt(
                line.substring(87,92));
        }
    }

    public void parse(Text line){
        parse(line.toString());
    }

    public String getStationId(){
        return stationId;
    }

    public void setStationId(String stationId){
        this.stationId=stationId;
    }

    public int getYear(){
        return year;
    }

    public void setYear(int year){
        this.year=year;
    }

    public int getTemperature(){
        return temperature;
    }

    public void setTemperature(int temperature){
        this.temperature=temperature;
    }

    public boolean isValid(){
        return valid;
    }

    public void setValid(boolean valid){
        this.valid=valid;
    }
}
WeatherRecordParser

1.4、編寫一個MapReduce程序求1992I年的最高溫度

  MaxTemperatureByYear_0010 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxTemperatureByYear_0010 extends Configured implements Tool{
    static class MaxTempMapper
        extends Mapper<LongWritable,Text,
                       IntWritable,IntWritable>{

        private WeatherRecordParser parser=
            new WeatherRecordParser();

        private IntWritable year=new IntWritable();
        private IntWritable temp=new IntWritable();

        @Override
        protected void map(LongWritable key,
            Text value,Context context) throws IOException, InterruptedException{
            parser.parse(value);
            if(parser.isValid()){
                year.set(parser.getYear());
                temp.set(parser.getTemperature());
                context.write(year,temp);
            }
        }
    }

    static class MaxTempReducer
        extends Reducer<IntWritable,IntWritable,
                        IntWritable,IntWritable>{
        @Override
        protected void reduce(IntWritable key,
            Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException{
            int max=Integer.MIN_VALUE;
            for(IntWritable v:values){
                if(v.get()>max) max=v.get();
            }
            context.write(key,new IntWritable(max));
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        // 作業配置
        // 構建作業所處理的數據的輸入輸出路徑
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));
        // 構建作業配置
        Job job=Job.getInstance(conf,
            this.getClass().getSimpleName()+":Kevin");
        // 設置該作業所要執行的類
        job.setJarByClass(this.getClass());

        // 設置自定義的Mapper類以及Map端數據輸出時的數據類型
        job.setMapperClass(MaxTempMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設置自定義的Reducer類以及數據輸出數據類型
        job.setReducerClass(MaxTempReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        // 設置讀取最原始數據的格式信息以及
        // 數據輸出到HDFS集群中的格式信息
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 設置數據讀入和輸出的路徑到相關的Format類中
        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);
        // 控制Reduce的個數
        job.setNumReduceTasks(5);
        // 提交作業
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new MaxTemperatureByYear_0010(),args));
    }
}
MaxTemperatureByYear_0010

1.5、使用Maven打包Jar包上傳到Hadoop客戶端的Linux服務器中

  1)執行測試 

   

  2)查詢作業進度

    

     我們可以通過Yarn集群的Web控制頁面:http://ip:8088去查看作業的進度(ip是resourcemanager所在的ip)

    

  3)作業執行完成

    

    我們可以去查看執行文件:發現1992年的最高溫度是605

    

二、分析上面MapReduce程序

1.1、查看作業歷史服務器

  我們通過http://ip:8088去 查看在執行完成MapReduce程序后的歷史記錄

  

  在最右邊有一個history的按鈕:

  

  查看歷史:

  

  我們發現在剛才執行的MapReduce程序中,map有8個在並行執行,而reduce只有1個在執行,為什么呢?

  分析:

    查看1992年數據的詳細信息:

    

    查看詳細信息:

    

    所以說map 的個數是和你的數據塊的個數有關系的。reduce的個數默認是1個

    那我們也想修改reduce的個數,怎么辦?

      1)通過修改配置文件

        

        修改配置為:

        

      2)通過程序去修改

        

     3)通過命令行參數去指定(這個后面在介紹)

2.2、經過洗牌后的數據怎么選擇reduce

1)假設我們有一個HDFS集群有4個節點分別是us1,us2,us3,us4。Yarn集群的主節點在分配資源的時候,當你客戶端將作業提交的時候,resourcemanager在分配資源(或者說分配作業)的時候,

  盡量將應用程序分發到有數據的節點上。

  比如:在我們下面這個集群中有4個節點,數據存儲在us1,us2,us3中。在resourcemanager分發作業到集群上的時候,盡量將作業分發到有數據的節點上,也就是會分發到us1,us2,us3中。

       (這樣就避免了節點與節點的數據傳輸),當然在資源特別緊張的時候,us4頁有可能有map任務。但是一般不會去分配。

  

2)那么在us1,us2,us3中都至少有一個map任務,當map輸出后經過洗牌,會根據key值得不同生成很多組以key不同的數據,假如生成了 v21  [v21]和v22  [v22]這兩組數據。

  我們知道前面的map是並行執行的(多個map同時運行,因為處理的數據在不同的數據塊),當我們的reduce為默認的時候是有1個。當我們這里的運行的時候,是有一個reduce所以不可能是並行。

  問題:我們的reduce只有一個,而又兩組數據那么哪個先執行呢?

  解決:Hadoop是這樣規定的,我們對數據進行分組是根據key值來分組的。那么Hadoop會讓這一系列的key去比較大小,最小的先進入執行,執行完成后,按照從小到大去執行

    比如:我們這里有兩組數據key值分別為k21,k22,如果k21<k22,那么k21先執行,執行完成之后再執行k22。

  

3)當reduce任務執行完成之后會生成一個文件:part-r-00000。

  比如在我們上面那個程序中生成的存放結果的文件:

    

    點擊查看:

    

    我們查看到這里有兩個文件:

      第一個文件_SUCCESS:當MapReduce執行成功會產生這個文件,如果失敗就沒有這個文件。

      第二個文件:用來存放執行結果。

4)reduce默認情況下是一個,我們可以通過更改集群配置或程序設置或命令行參數指定來修改reduce的個數。

  假如:我們有2個reduce,也有2組數據。那么reduce就可以 進行並行計算了。

  問題:兩組數據,2個reduce。到底哪組數據進入哪個reduce呢

  解決: Hadoop會讓每一組數據的key值得hash值去和reduce的個數取余,余數是幾那么就進入哪個reduce。

     當然前提是給reduce編號(編號是Hadoop內部自己會去編)。

  比如:我們2個reduce編號分別為0和1,v21生成的hash值為3,通過3/2取余為1,說明它進入到編號為1的reduce中。

     而v22生成的hash值為4,說明它進入編號為0的reduce中。

     但是如果我們v22生成的hash值是5呢?那么它也會進入編號為1的reduce中。導致0號reduce沒有任何數據。

  那么就相當於編號為0的reduce什么事情都沒有干,但是當reduce任務執行完成之后,一個reduce會生成一個文件。 

  第一個reduce生成的是part-r-00000,第二個則是part-r-00001(后面的00000和00001就是reduce的編號)

  注意:

      即使reduce沒有處理任何的數據也會生成一個文件,只不過文件大小為0。所以說當我們的程序設置了多少個reduce就會產生多少個文件

2.3、洗牌過程

  我們來看一下上圖中那個shuffle(洗牌)干了什么事情?

  其實很簡單就是做了一個分組的功能。

  

  這三個節點中的map都有可能產生k21和k22,它需要把key值相同的合並起來,形成 k21  [k22](類似集合一樣的value)這樣的數據。

  前面我們說了resourcemanager在分發 作業的時候,會將作業盡量分發到有數據的節點上。

  其實還有就是:

    運行作業的節點盡可能的要求它所處理的數據來自於自己所在的節點上。  

  比如說我們上面的那個例子:us1,us2,us3都是有數據的節點,

  問題:

    那有沒有可能us1中處理的數據來自於us2中

  分析:

    1)我們的map(map中是map方法在處理數據)在處理數據的時候,是一行一行處理的。

    2)我們的數據分塊是默認128MB一塊(可以自行設置)。比如下圖中的是一個文件,以32MB分為一個數據塊,

      這個文件的大小是80MB那么就會分成3個數據塊。

      

    3)當我們分塊的時候,我們並不能保證它按照完整的行來進行數據分塊。數據分塊之后不同的數據塊可能會分到不同的節點上。

      我們從上圖可以看出,紅色數據塊為32MB,由2行半的數據組成。而黃色數據塊也為32MB,它也不是完整的行。

      那我們紅色數據塊的數據會用一個map去處理(因為我們知道一個數據塊盡量會用一個map去處理),黃顏色也會用一個map去處理。

      前面我們就說了map處理數據的時候,一次是處理一行的

    問題:

      在紅色數據塊中處理時候的時候,處理前兩行沒有問題,當處理到第三行是不能構成一行,怎么辦?

    解決:

      在紅顏色數據塊中,Hadoop會將標記向后移動,直至處理的數據是一個整行的數據。黃顏色的map的處理數據的時候,發現第一行的數據不是從

      一行的開始位置去處理的數據,它也會去移動到下一行的開始處理數據。  

      

    4)那么我們知道不同的數據塊可能會存儲到不同的 節點上。 在這里中假設紅顏色和黃顏色的數據塊在不同的節點上。

      那么紅顏色數據塊的map就要去黃顏色的節點上去讀取數據了。

    總結:一個map處理的數據不一定都來自於自己的節點上。但resourcemanager在分配作業的時候會盡量讓這個map處理的數據來自於本節點。

2.4、數據分片與數據分組

  1)數據分片

  我們把進入map端的數據叫做數據分片。每一個數據塊進入MapReudce中的map程序的時候,我們把它叫做數據分片。

  那什么樣的數據是一個數據分片?HDFS集群上的一個數據塊的數據對應我們所說的數據分片。 

  也就是每一個數據分片由每一個map任務去處理。

  2)數據分組  

  數據進過洗牌之后分成不同的組形成數據的過程叫做數據分組

  

 

 

喜歡就點個“推薦”哦!

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM