Hadoop(十七)之MapReduce作業配置與Mapper和Reducer類


前言

  前面一篇博文寫的是Combiner優化MapReduce執行,也就是使用Combiner在map端執行減少reduce端的計算量。

一、作業的默認配置

  MapReduce程序的默認配置  

1)概述

  在我們的MapReduce程序中有一些默認的配置。所以說當我們程序如果要使用這些默認配置時,可以不用寫。

  

  我們的一個MapReduce程序一定會有Mapper和Reducer,但是我們程序中不寫的話,它也有默認的Mapper和Reducer。

  當我們使用默認的Mapper和Reducer的時候,map和reducer的輸入和輸出都是偏移量和數據文件的一行數據,所以就是相當於原樣輸出!

2)默認的MapReduce程序

/**
* 沒有指定Mapper和Reducer的最小作業配置
*/
public class MinimalMapReduce {
public static void main(String[] args) throws Exception{
// 構建新的作業
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "MinimalMapReduce");
job.setJarByClass(MinimalMapReduce.class);
// 設置輸入輸出路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// ᨀ交作業運行
System.exit(job.waitForCompletion(true)?0:1);
  }
}

  輸入是:

    

  輸出是:

    

二、作業的配置方式

  MapReduce的類型配置

  1)用於配置類型的屬性

    

    

    在命令行中,怎么去配置呢?

      比如說mapreduce.job.inputformat.class。首先我們要繼承Configured實現Tool工具才能這樣去指定:

      -Dmapreduce.job.inputformat.class = 某一個類的類全名(一定要記得加報名)

    這是Map端的輸出類型控制

    這是整個MapReduce程序輸出類型控制,其實就是reduce的類型格式控制

  2)No Reducer的MapReduce程序--Mapper

    第一步:寫一個TokenCounterMapper繼承Mapper

/**
* 將輸入的文本內容拆分為word,做一個簡單輸出的Mapper
*/
public class TokenCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text word=new Text();
private static final IntWritable one=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
        }
    }
}
TokenCounterMapper

    第二步:寫一個NoReducerMRDriver完成作業配置

/**
*沒有設置Reducer的MR程序
*/
public class NoReducerMRDriver {
public static void main(String[] args) throws Exception {
// 構建新的作業
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "NoReducer");
job.setJarByClass(NoReducerMRDriver.class);
// 設置Mapper
job.setMapperClass(TokenCounterMapper.class);
// 設置reducer的數量為0
job.setNumReduceTasks(0);
// 設置輸出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// ᨀ交運行作業
System.exit(job.waitForCompletion(true)?0:1);
    }
}
NoReducerMRDriver

    輸入:

      

    結果:

      

    注意:如果作業擁有0個Reducer,則Mapper結果直接寫入OutputFormat而不經key值排序。

  3)No Mapper的MapReduce程序--Reducer

    第一步:寫一個TokenCounterReducer繼承Reducer

/**
* 將reduce輸入的values內容拆分為word,做一個簡單輸出的Reducer
*/
public class TokenCounterReducer extends Reducer<LongWritable, Text, Text, IntWritable>{
private Text word=new Text();
private static final IntWritable one=new IntWritable(1);
@Override
protected void reduce(LongWritable key, Iterable<Text> values,Reducer<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
for(Text value:values){
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
            }
        }
    }
}    
TokenCounterReducer

    第二步:寫一個NoMapperMRDrive完成作業配置

/**
*沒有設置Mapper的MR程序
*/
public class NoMapperMRDriver {
public static void main(String[] args) throws Exception {
// 構建新的作業
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "NoMapper");
job.setJarByClass(NoMapperMRDriver.class);
// 設置Reducer
job.setReducerClass(TokenCounterReducer.class);
// 設置輸出格式
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// ᨀ交運行作業
System.exit(job.waitForCompletion(true)?0:1);
    }
}
NoMapperMRDrive

    輸入:

      

    輸出:

      

三、Mapper類和Reducer類以及它們的子類(實現類)

3.1、Mapper概述

  Mapper:封裝了應用程序Mapper階段的數據處理邏輯

   

  1)ChainMapper

    方便用戶編寫鏈式Map任務, 即Map階段包含多個Mapper,即可以別寫多個自定義map去參與運算。
  2)InverseMapper

    一個能交換key和value的Mapper
  3)RegexMapper

    檢查輸入是否匹配某正則表達式, 輸出匹配字符串和計數器(用的很少)
  4)TockenCounterMapper

    將輸入分解為獨立的單詞, 輸出個單詞和計數器(以空格分割單詞,value值為1)

3.2、Reducer概述

  Mapper:封裝了應用程序Mapper階段的數據處理邏輯

  

  1)ChainMapper:

    方便用戶編寫鏈式Map任務, 即Map階段只能有一個Reducer,后面還可以用ChainMapper去多加Mapper。

  2)IntSumReducer/LongSumReducer

    對各key的所有整型值求和

3.2、寫一個實例去使用

  注意:這里用到了一個輸入格式為KeyValueTextInputFormat,我們查看源碼注釋:

    

    我們需要用mapreduce.input.keyvaluelinerecordreader.key.value.separator去指定key和value的分隔符是什么,它的默認分隔符是"\t"也就是tab鍵。

    這個需要在配置文件中去指定,但是我們知道在配置文件中能設置的在程序中也是可以設置的。

    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

 

  代碼實現: 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PatentReference_0010 extends Configured implements Tool{

    static class PatentReferenceMapper extends Mapper<Text,Text,Text,IntWritable>{
        private IntWritable one=new IntWritable(1);
        @Override
        protected void map(Text key,Text value,Context context) throws IOException, InterruptedException{
            context.write(key,one);
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        ChainMapper.addMapper(job,InverseMapper.class,
            // 輸入的鍵值類型由InputFormat決定
            Text.class,Text.class,
            // 輸出的鍵值類型與輸入的鍵值類型相反
            Text.class,Text.class,conf);

        ChainMapper.addMapper(job,PatentReferenceMapper.class,
            // 輸入的鍵值類型由前一個Mapper輸出的鍵值類型決定
            Text.class,Text.class,
            Text.class,IntWritable.class,conf);

        ChainReducer.setReducer(job,IntSumReducer.class,
            Text.class,IntWritable.class,
            Text.class,IntWritable.class,conf);

        ChainReducer.addMapper(job,InverseMapper.class,
            Text.class,IntWritable.class,
            IntWritable.class,Text.class,conf);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        KeyValueTextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new P00010_PatentReference_0010(),args));
    }
}

 

  在Job job=Job.getInstance(conf,this.getClass().getSimpleName());設置中,job把conf也就是配置文件做了一個拷貝,因為hadoop要重復利用一個對象,如果是引用的話,發現值得改變就都改變了。        

  

    

    

喜歡就點個“推薦”哦!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM