多個Mapper和Reducer的Job
@(Hadoop)
對於復雜的mr任務來說,只有一個map和reduce往往是不能夠滿足任務需求的,有可能是需要n個map之后進行reduce,reduce之后又要進行m個map。
在hadoop的mr編程中可以使用ChainMapper和ChainReducer來實現鏈式的Map-Reduce任務。
ChainMapper
以下為官方API文檔翻譯:
ChainMapper類允許在單一的Map任務中使用多個Mapper來執行任務,這些Mapper將會以鏈式或者管道的形式來調用。
第一個Mapper的輸出即為第二個Mapper的輸入,以此類推,直到最后一個Mapper則為任務的輸出。
這個特性的關鍵功能在於,在鏈中的Mappers不必知道他們是否已經被執行,這可以在一個單一的任務中讓一些Mapper進行重用,組合在一起完成復雜的操作。
使用的時候需要注意,每個Mapper的輸出都會在下一個Mapper的輸入中進行驗證,這里假設所有的Mapper和Reduce都使用相匹配的key和value作為輸入和輸出,因為在鏈式執行的代碼中並沒有對其進行轉換。
使用ChainMapper和ChainReducer可以將Map-Reduce任務組合成[MAP+ / REDUCE MAP*]的形式,這個模式最直接的好處就是可以大大減少磁盤的IO開銷。
注意:沒有必要為ChainMapper指定輸出的key和value的類型,使用addMapper方法添加最后一個Mapper的時候回自動完成。
使用的格式:
Job = new Job(conf);
//mapA的配置,如果不是特殊配置可傳入null或者共用一個conf
Configuration mapAConf = new Configuration(false);
//將Mapper加入執行鏈中
ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
Text.class, Text.class, true, mapAConf);
Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
LongWritable.class, Text.class, false, mapBConf);
job.waitForComplettion(true);
addMapper函數的定義如下:
public static void addMapper(Job job,
Class<? extends Mapper> klass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration mapperConf) throws IOException
ChainReducer
基本描述同ChainMapper。
對於每條reduce輸出的數據,Mappers將會以鏈或者管道的形式調用。 ?
ChainReducer有兩個基本函數可以調用,使用格式:
Job = new Job(conf);
Configuration reduceConf = new Configuration(false);
//這里是在setReducer之后才調用addMapper
ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class,
Text.class, Text.class, true, reduceConf);
ChainReducer.addMapper(job, CMap.class, Text.class, Text.class,
LongWritable.class, Text.class, false, null);
ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class,
LongWritable.class, LongWritable.class, true, null);
job.waitForCompletion(true);
setReducer定義:
public static void setReducer(Job job,
Class<? extends Reducer> klass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration reducerConf)
addMapper定義同ChainMapper。
實際的測試過程
在demo程序測試中觀察結果得到兩條比較有用的結論:
- 對於reduce之后添加的Mapper,每條reduce的輸出都會馬上調用一次該map函數,而不是等待reduce全部完成之后再調用map,如果是有多個map,應該是一樣的道理。
- reduce之后的Mapper只執行map過程,並不會有一個完整map階段(如,map之后的排序,分組,分區等等都沒有了)。
**另注:**reduce之前設置多個Mapper使用ChainMapper的addMapper,reduce之后設置多個Mapper使用ChainReducer的addMapper。
多個job連續運行
有時候鏈式的設置多個Mapper仍然無法滿足需求,例如,有時候我們需要多個reduce過程,或者map之后的分組排序等,這就需要多個job協同進行工作。
使用的方法很簡單,直接在第一個job.waitForCompletion之后再次實例化一個Job對象,按照八股文的格式進行設置即可,注意輸入和輸出的路徑信息。
example:
Job newJob = Job.getInstance(conf, jobName + "-sort");
newJob.setJarByClass(jarClass);
FileInputFormat.setInputPaths(newJob, new Path(outPath + "/part-*"));
newJob.setInputFormatClass(TextInputFormat.class);
newJob.setMapperClass(SortMapper.class);
newJob.setMapOutputKeyClass(SortKey.class);
newJob.setMapOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(newJob, new Path(outPath + "/sort"));
newJob.setOutputFormatClass(TextOutputFormat.class);
newJob.waitForCompletion(true);
和MultipleInputs的區別
MultipleInputs類可以設置多個輸入路徑,每個路徑使用指定的Mapper進行處理。
和ChainMapper也是執行多個map過程不一樣的是,ChainMapper是一個輸入文件經歷多個map過程,就像流水線一樣。
而MultipleInputs則只是設置多個輸入文件,每個文件一個map過程。
作者:@小黑