1. Mapper類
首先 Mapper類有四個方法:
(1) protected void setup(Context context)
(2) Protected void map(KEYIN key,VALUEIN value,Context context)
(3) protected void cleanup(Context context)
(4) public void run(Context context)
setup()方法一般用來加載一些初始化的工作,像全局文件\建立數據庫的鏈接等等;cleanup()方法是收尾工作,如關閉文件或者執行map()后的鍵值分發等;map()函數就不多說了.
默認的Mapper的run()方法的核心代碼如下:
public void run(Context context) throws IOException,InterruptedException { setup(context); while(context.nextKeyValue()) map(context.getCurrentKey(),context,context.getCurrentValue(),context); cleanup(context); }
從代碼中也可以看出先執行setup函數,然后是map處理代碼,最后是cleanup的收尾工作.值得注意的是,setup函數和cleanup函數由系統作為回調函數只做一次,並不像map函數那樣執行多次.
2.setup函數應用
經典的wordcount在setup函數中加入黑名單就可以實現對黑名單中單詞的過濾,詳細代碼如下:
public class WordCount { static private String blacklistFileName= "blacklist.dat"; public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); private Set<String> blacklist; protected void setup(Context context) throws IOException,InterruptedException { blacklist=new TreeSet<String>(); try{ FileReader fileReader=new FileReader(blacklistFileName); BufferedReader bufferedReader=bew BufferedReader(fileReader); String str; while((str=bufferedReader.readLine())!=null){ blacklist.add(str); } } catch(IOException e){ e.printStackTrace(); } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); if(blacklist.contains(word.toString())){ continue; } context.write(word, one); } } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName("wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMap.class); job.setCombinerClass(WordCountReduce.class); job.setReducerClass(WordCountReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3.cleanup應用
求最值最簡單的辦法就是對該文件進行一次遍歷得出最值,但是現實中數據比量比較大,這種方法不能實現。在傳統的MapReduce思想中,將文件的數據經 過map迭代出來送到reduce中,在Reduce中求出最大值。但這個方法顯然不夠優化,我們可采用“分而治之”的思想,不需要map的所有數據全部 送到reduce中,我們可以在map中先求出最大值,將該map任務的最大值送reduce中,這樣就減少了數據的傳輸量。那么什么時候該把這個數據寫 出去呢?我們知道,每一個鍵值對都會調用一次map(),由於數據量大調用map()的次數也就多了,顯然在map()函數中將該數據寫出去是不明智的, 所以最好的辦法該Mapper任務結束后將該數據寫出去。我們又知道,當Mapper/Reducer任務結束后會調用cleanup函數,所以我們可以 在該函數中將該數據寫出去。了解了這些我們可以看一下程序的代碼:
public class TopKApp { static final String INPUT_PATH = "hdfs://hadoop:9000/input2"; static final String OUT_PATH = "hdfs://hadoop:9000/out2"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final Job job = new Job(conf , WordCountApp.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{ long max = Long.MIN_VALUE; protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { final long temp = Long.parseLong(v1.toString()); if(temp>max){ max = temp; } } protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(max), NullWritable.get()); } } static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{ long max = Long.MIN_VALUE; protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException { final long temp = k2.get(); if(temp>max){ max = temp; } } protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(max), NullWritable.get()); } } }