多個Mapper和Reducer的Job


多個Mapper和Reducer的Job

@(Hadoop)


對於復雜的mr任務來說,只有一個map和reduce往往是不能夠滿足任務需求的,有可能是需要n個map之后進行reduce,reduce之后又要進行m個map。

在hadoop的mr編程中可以使用ChainMapper和ChainReducer來實現鏈式的Map-Reduce任務。

ChainMapper

以下為官方API文檔翻譯:
ChainMapper類允許在單一的Map任務中使用多個Mapper來執行任務,這些Mapper將會以鏈式或者管道的形式來調用。
第一個Mapper的輸出即為第二個Mapper的輸入,以此類推,直到最后一個Mapper則為任務的輸出。
這個特性的關鍵功能在於,在鏈中的Mappers不必知道他們是否已經被執行,這可以在一個單一的任務中讓一些Mapper進行重用,組合在一起完成復雜的操作。
使用的時候需要注意,每個Mapper的輸出都會在下一個Mapper的輸入中進行驗證,這里假設所有的Mapper和Reduce都使用相匹配的key和value作為輸入和輸出,因為在鏈式執行的代碼中並沒有對其進行轉換。
使用ChainMapper和ChainReducer可以將Map-Reduce任務組合成[MAP+ / REDUCE MAP*]的形式,這個模式最直接的好處就是可以大大減少磁盤的IO開銷。
注意:沒有必要為ChainMapper指定輸出的key和value的類型,使用addMapper方法添加最后一個Mapper的時候回自動完成。

使用的格式:

Job = new Job(conf);
//mapA的配置,如果不是特殊配置可傳入null或者共用一個conf
Configuration mapAConf = new Configuration(false);
//將Mapper加入執行鏈中
ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
   Text.class, Text.class, true, mapAConf);
Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
   LongWritable.class, Text.class, false, mapBConf);

 job.waitForComplettion(true);

addMapper函數的定義如下:

public static void addMapper(Job job,
             Class<? extends Mapper> klass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration mapperConf) throws IOException

ChainReducer

基本描述同ChainMapper。
對於每條reduce輸出的數據,Mappers將會以鏈或者管道的形式調用。 ?
ChainReducer有兩個基本函數可以調用,使用格式:

Job = new Job(conf);
Configuration reduceConf = new Configuration(false);
//這里是在setReducer之后才調用addMapper
ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class,
   Text.class, Text.class, true, reduceConf);
ChainReducer.addMapper(job, CMap.class, Text.class, Text.class,
   LongWritable.class, Text.class, false, null);
ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class,
   LongWritable.class, LongWritable.class, true, null);
job.waitForCompletion(true);

setReducer定義:

public static void setReducer(Job job,
              Class<? extends Reducer> klass, Class<?> inputKeyClass, Class<?> inputValueClass, Class<?> outputKeyClass, Class<?> outputValueClass, Configuration reducerConf)

addMapper定義同ChainMapper。

實際的測試過程

在demo程序測試中觀察結果得到兩條比較有用的結論:

  1. 對於reduce之后添加的Mapper,每條reduce的輸出都會馬上調用一次該map函數,而不是等待reduce全部完成之后再調用map,如果是有多個map,應該是一樣的道理。
  2. reduce之后的Mapper只執行map過程,並不會有一個完整map階段(如,map之后的排序,分組,分區等等都沒有了)。

**另注:**reduce之前設置多個Mapper使用ChainMapper的addMapper,reduce之后設置多個Mapper使用ChainReducer的addMapper。

多個job連續運行

有時候鏈式的設置多個Mapper仍然無法滿足需求,例如,有時候我們需要多個reduce過程,或者map之后的分組排序等,這就需要多個job協同進行工作。
使用的方法很簡單,直接在第一個job.waitForCompletion之后再次實例化一個Job對象,按照八股文的格式進行設置即可,注意輸入和輸出的路徑信息。

example:

Job newJob = Job.getInstance(conf, jobName + "-sort");
        newJob.setJarByClass(jarClass);

        FileInputFormat.setInputPaths(newJob, new Path(outPath + "/part-*"));
        newJob.setInputFormatClass(TextInputFormat.class);

        newJob.setMapperClass(SortMapper.class);
        newJob.setMapOutputKeyClass(SortKey.class);
        newJob.setMapOutputValueClass(NullWritable.class);

        FileOutputFormat.setOutputPath(newJob, new Path(outPath + "/sort"));
        newJob.setOutputFormatClass(TextOutputFormat.class);

        newJob.waitForCompletion(true);

和MultipleInputs的區別

MultipleInputs類可以設置多個輸入路徑,每個路徑使用指定的Mapper進行處理。
和ChainMapper也是執行多個map過程不一樣的是,ChainMapper是一個輸入文件經歷多個map過程,就像流水線一樣。
而MultipleInputs則只是設置多個輸入文件,每個文件一個map過程。

作者:@小黑


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM