MapReduce類型與格式(輸入與輸出)


一、輸入格式

(1)輸入分片記錄

①JobClient通過指定的輸入文件的格式來生成數據分片InputSplit;

②一個分片不是數據本身,而是可分片數據的引用;

③InputFormat接口負責生成分片;

源碼位置:org.apache.hadoop.mapreduce.lib.input包(新)

          org.apache.hadoop.mapred.lib 包(舊)

查看其中FileInputFormat類中的getSplits()方法;

computeSplitSize()函數決定分片大小;

各種輸入類的結構關系圖:

MapReduce類型與格式

(2)文件輸入

抽象類:FileInputFormat

①FileInputFormat是所有使用文件作為數據源的InputFormat實現的基類;

②FileInputFormat輸入數據格式的分配大小由數據塊大小決定;

 

 

抽象類:CombineFileInputFormat

①可以使用CombineFileInputFormat來合並小文件;

②因為CombineFileInputFormat是一個抽象類,使用的時候需要創建一個CombineFileInputFormat的實體類,並且實現getRecordReader()的方法;

③避免文件分割的方法:

A.數據塊大小盡可能大,這樣使文件的大小小於數據塊的大小,就不用進行分片;

B.繼承FileInputFormat,並且重載isSplitable()方法;

 

(3)文本輸入

類名:TextInputFormat

①TextInputFormat是默認的InputFormat,每一行數據就是一條記錄;

②TextInputFormat的key是LongWritable類型的,存儲該行在整個文件的偏移量,value是每行的數據內容,Text類型;

③輸入分片與HDFS數據塊關系:TextInputFormat每一條記錄就是一行,很有可能某一行跨數據塊存放;

 

類名:KeyValueInputFormat類

可以通過key為行號的方式來知道記錄的行號,並且可以通過key.value.separator.in.input設置key與value的分割符;

 

類名:NLineInputFormat類

可以設置每個mapper處理的行數,可以通過mapred.line.input.format.lienspermap屬性設置;

 

 

(4)二進制輸入

類名:SequenceFileInputFormat

SequenceFileAsTextInputFormat

SequenceFileAsBinaryInputFormat

由於SequenceFile能夠支持Splittable,所以能夠作為mapreduce輸入文件的格式,能夠很方便的得到已經含有,value>的分片;

 

 

(5)多文件輸入

類名:MultipleInputs

①MultipleInputs能夠提供多個輸入數據類型;

②通過addInputPath()方法來設置多路徑;

 

 

(6)數據庫格式輸入

類名:DBInputFormat

①DBInputFormat是一個使用JDBC並且從關系型數據庫中讀取數據的一種輸入格式;

②避免過多的數據庫連接;

③HBase中的TableInputFormat可以讓MapReduce程序訪問HBase表里的數據;

 

實驗部分:

新建項目TestMRInputFormat,新建包com.mr,導入相關依賴包

 

實驗①,以SequenceFile作為輸入,故預先運行SequenceFileWriter.java產生一個b.seq文件;

新建類:TestInputFormat1.java(基於WordCount.java修改):

package com.mr;

 

import java.io.IOException;

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class TestInputFormat {

 

  public static class TokenizerMapper

       extends Mapper< IntWritable, Text, Text, IntWritable>{

   

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

     

    public void map(IntWritable key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

 

  public static class IntSumReducer

       extends Reducer {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println("Usage: wordcount ");

      System.exit(2);

    }

    Job job = new Job(conf, "word count");

    job.setJarByClass(TestInputFormat.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);//輸入格式的設定

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

 

}

Eclipse中運行,參數配置如下圖:

MapReduce類型與格式

輸出統計結果如下:

MapReduce類型與格式

 

 

 

實驗②,多種來源輸入:

TestInputFormat2.java:

package com.mr;

 

import java.io.IOException;

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

 

 

public class TestInputFormat2 {

 

  public static class Mapper1  //第一個mapper類

       extends Mapper<<font color="#ed1c24">LongWritable, Text, Text, IntWritable>{

    

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

      

    public void map(LongWritable key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

 

public static class Mapper2 extends  //第二個mapper類

Mapper {

 

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

 

public void map(IntWritable key, Text value, Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

  

  

  public static class IntSumReducer 

       extends Reducer {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable values, 

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = new Job(conf, "word count");

    job.setJarByClass(TestInputFormat2.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    Path path1 = new Path("/a.txt");

    Path path2 = new Path("/b.seq");

   //多輸入

    MultipleInputs.addInputPath(job, path1,TextInputFormat.class, Mapper1.class);

    MultipleInputs.addInputPath(job, path2,SequenceFileInputFormat.class, Mapper2.class);

    FileOutputFormat.setOutputPath(job, new Path("/output2"));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

}

創建輸入文本文件a.txt:

aaa bbb

ccc aaa

ddd eee

 

將項目打包為jar(不知道為什么eclipse中不能運行,還沒找到原因,用jar命令可以運行):

File->Export->Runnable JAR file,命名jar文件為testMR.jar。

命令行中運行:

$hadoop jar testMR.jar com.mr.TestInputFormat2 

MapReduce類型與格式

 

輸出統計結果如下:

MapReduce類型與格式

 

 

二、輸出格式

各種類關系結構圖:

MapReduce類型與格式

 

 

 

 

(1)文本輸出

類名:TextOutputFormat

①默認的輸出方式,key是LongWritable類型的,value是Text類型的;

②以“key \t value”的方式輸出行;

 

(2)二進制輸出

類名:SequenceFileOutputFormat

SequenceFileAsTextOutputFormat

SequenceFileAsBinaryOutputFormat

MapFileOutputFormat

 

(3)多文件輸出

類名:MultipleOutputFormat

      MultipleOutputs

 

區別:MultipleOutputs可以產生不同類型的輸出;

 

(4)數據庫輸出

 

類名:DBOutputFormat

 http://blog.sina.com.cn/s/blog_4438ac090101qfuh.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM