hadoop之mapper類妙用


1. Mapper類

首先 Mapper類有四個方法:

(1) protected void setup(Context context)

(2) Protected void map(KEYIN key,VALUEIN value,Context context)

(3) protected void cleanup(Context context)

(4) public void run(Context context)

setup()方法一般用來加載一些初始化的工作,像全局文件\建立數據庫的鏈接等等;cleanup()方法是收尾工作,如關閉文件或者執行map()后的鍵值分發等;map()函數就不多說了.

默認的Mapper的run()方法的核心代碼如下:

public void run(Context context) throws IOException,InterruptedException
{
    setup(context);
    while(context.nextKeyValue())
          map(context.getCurrentKey(),context,context.getCurrentValue(),context);
    cleanup(context);
}

從代碼中也可以看出先執行setup函數,然后是map處理代碼,最后是cleanup的收尾工作.值得注意的是,setup函數和cleanup函數由系統作為回調函數只做一次,並不像map函數那樣執行多次.

2.setup函數應用

   經典的wordcount在setup函數中加入黑名單就可以實現對黑名單中單詞的過濾,詳細代碼如下:

 
public class WordCount {  
   static private String blacklistFileName= "blacklist.dat";
  
    public static class WordCountMap extends  
            Mapper<LongWritable, Text, Text, IntWritable> {  
  
        private final IntWritable one = new IntWritable(1);  
        private Text word = new Text(); 
        private Set<String> blacklist;
  
        protected void setup(Context context) throws IOException,InterruptedException {
            blacklist=new TreeSet<String>();

            try{
              FileReader fileReader=new FileReader(blacklistFileName);
              BufferedReader bufferedReader=bew BufferedReader(fileReader);
              String str;
              while((str=bufferedReader.readLine())!=null){
                blacklist.add(str);
              }
            } catch(IOException e){
                e.printStackTrace();
            }
        } 

        public void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String line = value.toString();  
            StringTokenizer token = new StringTokenizer(line);  
            while (token.hasMoreTokens()) {  
                word.set(token.nextToken());
                if(blacklist.contains(word.toString())){
                   continue;
                }
                context.write(word, one);  
            }  
        }  
    }  
  
    public static class WordCountReduce extends  
            Reducer<Text, IntWritable, Text, IntWritable> {  
  
        public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable val : values) {  
                sum += val.get();  
            }  
            context.write(key, new IntWritable(sum));  
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        Configuration conf = new Configuration();  
        Job job = new Job(conf);  
        job.setJarByClass(WordCount.class);  
        job.setJobName("wordcount");  
  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
  
        job.setMapperClass(WordCountMap.class);  
        job.setCombinerClass(WordCountReduce.class);
        job.setReducerClass(WordCountReduce.class);  
  
        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  
  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
}  

3.cleanup應用

      求最值最簡單的辦法就是對該文件進行一次遍歷得出最值,但是現實中數據比量比較大,這種方法不能實現。在傳統的MapReduce思想中,將文件的數據經 過map迭代出來送到reduce中,在Reduce中求出最大值。但這個方法顯然不夠優化,我們可采用“分而治之”的思想,不需要map的所有數據全部 送到reduce中,我們可以在map中先求出最大值,將該map任務的最大值送reduce中,這樣就減少了數據的傳輸量。那么什么時候該把這個數據寫 出去呢?我們知道,每一個鍵值對都會調用一次map(),由於數據量大調用map()的次數也就多了,顯然在map()函數中將該數據寫出去是不明智的, 所以最好的辦法該Mapper任務結束后將該數據寫出去。我們又知道,當Mapper/Reducer任務結束后會調用cleanup函數,所以我們可以 在該函數中將該數據寫出去。了解了這些我們可以看一下程序的代碼:

public class TopKApp {
    static final String INPUT_PATH = "hdfs://hadoop:9000/input2";
    static final String OUT_PATH = "hdfs://hadoop:9000/out2";
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        final Path outPath = new Path(OUT_PATH);
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }
        
        final Job job = new Job(conf , WordCountApp.class.getSimpleName());
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, outPath);
        job.waitForCompletion(true);
    }
    static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
        long max = Long.MIN_VALUE;
        protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
            final long temp = Long.parseLong(v1.toString());
            if(temp>max){
                max = temp;
            }
        }
        
        protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
            context.write(new LongWritable(max), NullWritable.get());
        }
    }

    static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
        long max = Long.MIN_VALUE;
        protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) 
         throws java.io.IOException ,InterruptedException {
            final long temp = k2.get();
            if(temp>max){
                max = temp;
            }
        }
        
        protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
            context.write(new LongWritable(max), NullWritable.get());
        }
    }        
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM