mapreduce設置setMapOutputKeyClass與setMapOutputValueClass原因


一般的mapreduce的wordcount程序如下:

public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {

        String[] words = value.toString().split(" ");
        for (int i = 0; i < words.length; i++) {
            ctx.write(new Text(words[i]), new LongWritable(1L));
        }
    }
}

 

public class WcReduer extends Reducer<Text, LongWritable, Text, LongWritable> {

    LongWritable count = new LongWritable();
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context ctx) throws IOException, InterruptedException {
        Iterator<LongWritable> itr = values.iterator();
        long sum = 0L;
        while (itr.hasNext()) {
            sum = sum + itr.next().get();
        }
        count.set(sum);
        ctx.write(key, count);
    }
}

  

驅動作業代碼:

public class JobClient {

    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance();
        job.setJarByClass(JobClient.class);
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReduer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setJobName("wordcount");
        FileInputFormat.addInputPath(job, new Path("/daxin/hadoop-mapreduce/words"));
        FileOutputFormat.setOutputPath(job, new Path("/daxin/hadoop-mapreduce/wordcount-result"));
        job.waitForCompletion(true);
    }
}

  

提交作業會報錯:

Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
	at com.daxin.blog.WcMapper.map(WcMapper.java:20)
	at com.daxin.blog.WcMapper.map(WcMapper.java:13)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

通過異常信息我們可以定位錯誤在源碼中的位置:org.apache.hadoop.mapred.MapTask.MapOutputBuffer#collect,具體關鍵源碼如下:

 public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }
   .....
}

此處key.getClass可以確定是Text,需要確定keyClass是什么類型。下面就將確定一下keyClass類型,可以發現keyClass賦值源碼:

 keyClass = (Class<K>)job.getMapOutputKeyClass();

 getMapOutputKeyClass源碼:

  public Class<?> getMapOutputKeyClass() {
    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
    if (retv == null) {
      retv = getOutputKeyClass();
    }
    return retv;
  } 

其中MAP_OUTPUT_KEY_CLASS則是獲取map輸出的key的類型,由於我們驅動代碼沒有設置因此此處得到的值為默認值null,接下在調用getOutputKeyClass方法:

  public Class<?> getOutputKeyClass() {
    return getClass(JobContext.OUTPUT_KEY_CLASS,
                    LongWritable.class, Object.class);
  }

 

 public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";

  

通過獲取OUTPUT_KEY_CLASS的類型,OUTPUT_KEY_CLASS表示的是作業的key的輸出類型,但是由於我們沒有設置因此獲取默認值為LongWritable。但是實際上我們的MapTask輸出的key為Text,因而報如上類型不匹配錯誤。同理Map的value也有類似問題。為了解決此問題就需要顯式的設置MapTask的Key、Value輸出類型。代碼如下:

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

 

最后也可以分析org.apache.hadoop.mapred.ReduceTask#run方法可以得知:當我們不顯式設置Map的Key/Value輸出時候,默認Map的key類型為LongWritable,Value為Text,獲取類型的關鍵代碼:

 Class keyClass = job.getMapOutputKeyClass();
 Class valueClass = job.getMapOutputValueClass();

 

其實我有一個困惑,因為我們在寫Mapper與Reducer任務時候,Mapper與Reducer都是泛型類,由於泛型類的泛型信息可以保留,為什么還要我們顯式設置Map的Key、Value輸出類型呢?

我個人分析,可能存在錯誤,如果有錯誤望各位指正:

雖然泛型類可以保留信息,也可以在運行時獲取泛型信息,但是能夠得到的信息是一個整體並不是每一個具體的泛型的信息,說的有點模糊,以Mapper為例,Mapper定義如下:

public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {

     //......
    }
}

  

 當我們獲取該泛型信息時候只能獲取到:

org.apache.hadoop.mapreduce.Mapper<org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.LongWritable>

 而不是獲取到四個泛型組成的數組,個人覺着可能mapreduce處於此考慮所以要求顯示設置輸出的類型信息(此處需要具體類型信息的目的是為了序列化)。(當然如果有人說通過解析過去四個泛型信息,這樣的確可以,但是這樣實現的話代碼是不是不太優雅?)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM