一、輸入格式
(1)輸入分片記錄
①JobClient通過指定的輸入文件的格式來生成數據分片InputSplit;
②一個分片不是數據本身,而是可分片數據的引用;
③InputFormat接口負責生成分片;
源碼位置:org.apache.hadoop.mapreduce.lib.input包(新)
org.apache.hadoop.mapred.lib 包(舊)
查看其中FileInputFormat類中的getSplits()方法;
computeSplitSize()函數決定分片大小;
各種輸入類的結構關系圖:
(2)文件輸入
抽象類:FileInputFormat
①FileInputFormat是所有使用文件作為數據源的InputFormat實現的基類;
②FileInputFormat輸入數據格式的分配大小由數據塊大小決定;
抽象類:CombineFileInputFormat
①可以使用CombineFileInputFormat來合並小文件;
②因為CombineFileInputFormat是一個抽象類,使用的時候需要創建一個CombineFileInputFormat的實體類,並且實現getRecordReader()的方法;
③避免文件分割的方法:
A.數據塊大小盡可能大,這樣使文件的大小小於數據塊的大小,就不用進行分片;
B.繼承FileInputFormat,並且重載isSplitable()方法;
(3)文本輸入
類名:TextInputFormat
①TextInputFormat是默認的InputFormat,每一行數據就是一條記錄;
②TextInputFormat的key是LongWritable類型的,存儲該行在整個文件的偏移量,value是每行的數據內容,Text類型;
③輸入分片與HDFS數據塊關系:TextInputFormat每一條記錄就是一行,很有可能某一行跨數據塊存放;
類名:KeyValueInputFormat類
可以通過key為行號的方式來知道記錄的行號,並且可以通過key.value.separator.in.input設置key與value的分割符;
類名:NLineInputFormat類
可以設置每個mapper處理的行數,可以通過mapred.line.input.format.lienspermap屬性設置;
(4)二進制輸入
類名:SequenceFileInputFormat
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat
由於SequenceFile能夠支持Splittable,所以能夠作為mapreduce輸入文件的格式,能夠很方便的得到已經含有,value>的分片;
(5)多文件輸入
類名:MultipleInputs
①MultipleInputs能夠提供多個輸入數據類型;
②通過addInputPath()方法來設置多路徑;
(6)數據庫格式輸入
類名:DBInputFormat
①DBInputFormat是一個使用JDBC並且從關系型數據庫中讀取數據的一種輸入格式;
②避免過多的數據庫連接;
③HBase中的TableInputFormat可以讓MapReduce程序訪問HBase表里的數據;
實驗部分:
新建項目TestMRInputFormat,新建包com.mr,導入相關依賴包
實驗①,以SequenceFile作為輸入,故預先運行SequenceFileWriter.java產生一個b.seq文件;
新建類:TestInputFormat1.java(基於WordCount.java修改):
package com.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestInputFormat {
public static class TokenizerMapper
extends Mapper< IntWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(IntWritable key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount ");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(TestInputFormat.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileInputFormat.class);//輸入格式的設定
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Eclipse中運行,參數配置如下圖:
輸出統計結果如下:
實驗②,多種來源輸入:
TestInputFormat2.java:
package com.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestInputFormat2 {
public static class Mapper1 //第一個mapper類
extends Mapper<<font color="#ed1c24">LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class Mapper2 extends //第二個mapper類
Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(IntWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(TestInputFormat2.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path1 = new Path("/a.txt");
Path path2 = new Path("/b.seq");
//多輸入
MultipleInputs.addInputPath(job, path1,TextInputFormat.class, Mapper1.class);
MultipleInputs.addInputPath(job, path2,SequenceFileInputFormat.class, Mapper2.class);
FileOutputFormat.setOutputPath(job, new Path("/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
創建輸入文本文件a.txt:
aaa bbb
ccc aaa
ddd eee
將項目打包為jar(不知道為什么eclipse中不能運行,還沒找到原因,用jar命令可以運行):
File->Export->Runnable JAR file,命名jar文件為testMR.jar。
命令行中運行:
$hadoop jar testMR.jar com.mr.TestInputFormat2
輸出統計結果如下:
二、輸出格式
各種類關系結構圖:
(1)文本輸出
類名:TextOutputFormat
①默認的輸出方式,key是LongWritable類型的,value是Text類型的;
②以“key \t value”的方式輸出行;
(2)二進制輸出
類名:SequenceFileOutputFormat
SequenceFileAsTextOutputFormat
SequenceFileAsBinaryOutputFormat
MapFileOutputFormat
(3)多文件輸出
類名:MultipleOutputFormat
MultipleOutputs
區別:MultipleOutputs可以產生不同類型的輸出;
(4)數據庫輸出
類名:DBOutputFormat
http://blog.sina.com.cn/s/blog_4438ac090101qfuh.html