hadoopMR自定義輸入格式


輸入格式

1、輸入分片與記錄 
2、文件輸入 
3、文本輸入 
4、二進制輸入 
5、多文件輸入 
6、數據庫格式輸入

 詳細的介紹:https://blog.csdn.net/py_123456/article/details/79766573

1、輸入分片與記錄

1、JobClient通過指定的輸入文件的格式來生成數據分片InputSplit。 
2、一個分片不是數據本身,而是可分片數據的引用。 
3、InputFormat接口負責生成分片。

InputFormat 負責處理MR的輸入部分,有三個作用: 
驗證作業的輸入是否規范。 
把輸入文件切分成InputSplit。 
提供RecordReader 的實現類,把InputSplit讀到Mapper中進行處理。

2、文件輸入

抽象類:FilelnputFormat 
1、FilelnputFormat是所有使用文件作為數據源的InputFormat實現的基類。 
2、FilelnputFormat輸入數據格式的分片大小由數據塊大小決定

FileInputFormat保存作為job輸入的所有文件,並實現了對輸入文件計算splits的方法。至於獲得記錄的方法是有不同的子類如TextInputFormat進行實現的。

 3.文本輸入

TextInputFormat: 默認的輸入方式,key是該行的字節偏移量,value是該行的內容<LongWritable,Text>
KeyValueTextInputFormat 

job.getConfiguration().setStrings(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, ":");//默認的分隔符‘\t’,可以設置分割符,分割之后以分割符前的作為key,分隔符后的作為vallue

如果不設置分隔符則key為一整行內容,value為空<Text,Text>

NlineInputFormat

NLineInputFormat.setNumLinesPerSplit(job, 3);// 設置每次讀3行內容為mapper輸入<Text,IntWritable>,key和value與TestInputFormat一樣

4、二進制輸入

SequenceFileInputFormat 將key和value以sequencefile格式輸入。<Text,IntWritable>

先使用二進制輸出格式輸出一個二進制文件再作為輸入文件

5.多文件輸入

在一個MapReduce作業中所有的文件由一個mapper來處理不能滿足不同文件格式需求,可以指定不同的文件由不同的mapper來處理,然后輸出一樣的類型給reduce

like:

  MultipleInputs.addInputPath(job,OneInputpath,TextInputFormat.class,OneMapper.class)

  MultipleInputs.addInputPath(job,TowInputpath,TextInputFormat.class,TowMapper.class)

(addInputPath()只能指定一個路徑,如果要想添加多個路徑需要多次調用該方法:)

2、通過addInputPaths()方法來設置多路徑,多條路徑用分號(;)隔開

String paths = strings[0] + "," + strings[1];  

FileInputFormat.addInputPaths(job, paths); 

6。數據庫輸入:DBInputFormat

用於使用JDBC從關系數據庫中讀取數據,DBOutputFormat用於輸出數據到數據庫,適用於加載少量的數據集

(DBInputFormat map的輸入(Longwriatble,Dbwritable的實現類)

自定義輸入格式要點:

1自定義一個MyRecordReader類繼承抽象類:RecordReader,

每一個數據格式都需要有一個recordreader,主要用於將文件中的數據拆分層具體的鍵值對,Textinputformat中默認的recordreader值linerecordreader

2自定義inputFormat繼承Fileinputformat類,重寫inputformat中的cretaeRecordReader()方法,返回自定義的MyRecordReader類

3.job.setInputformatclass(自定義的Inputformat.class)

代碼:

package com.neworigin.RecordReaderDemo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader {
static Path in=new Path("file:///F:/安裝/java工程/RecordReaderDemo/data/in/test.txt");
static Path out=new Path("file:///F:/安裝/java工程/RecordReaderDemo/data/out");
//自定義Recordreader
public static class DefReadcordReader extends RecordReader<LongWritable ,Text>{
private long start;
private long end;
private long pos;

private FSDataInputStream fin=null;
private LongWritable key=null;
private Text value=null;

private LineReader reader=null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

FileSplit filesplit=(FileSplit)split;
//
start=filesplit.getStart();
end=filesplit.getLength()+start;
//
Path path = filesplit.getPath();

Configuration conf = new Configuration();
//Configuration conf = context.getConfiguration();
FileSystem fs=path.getFileSystem(conf);
fin=fs.open(path);
reader=new LineReader(fin);
pos=1;

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

int kkk = reader.readLine(value);//獲取當前行內容的偏移量
System.out.println(kkk);


if(key==null)
{
key=new LongWritable();
}
key.set(pos);
if(value==null)
{
value=new Text();
}
// value.set(pos);
if(reader.readLine(value)==0)
{
return false;
}
pos++;
return true;
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}

@Override
public void close() throws IOException {
// TODO Auto-generated method stub
if(fin!=null)
{
fin.close();

}
}

}
//自定義輸入格式
public static class MyFileInputFormat extends FileInputFormat<LongWritable,Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
DefReadcordReader reader = new DefReadcordReader();//返回自定義的recordReader類
return reader;
}

@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}

}
public static class MyMapper extends Mapper<LongWritable,Text,LongWritable,Text>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
context.write(key,value);
}

}
//根據奇偶數行來分區
public static class MyPartition extends Partitioner<LongWritable,Text>{

@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
if(key.get()%2==0)
{
key.set(1);
return 1;
}
else
{
key.set(0);
return 0;
}
}

}
public static class MyReducer extends Reducer<LongWritable,Text,Text,IntWritable>{

@Override
protected void reduce(LongWritable key, Iterable<Text> values,
Reducer<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
Text write_key=new Text();
IntWritable write_value=new IntWritable();
int sum=0;
for (Text value:values)
{
sum+=Integer.parseInt(value.toString());
}
if(key.get()==0)
{
write_key.set("奇數行之和");
}
else
{
write_key.set("偶數行之和");
}
write_value.set(sum);
context.write(write_key, write_value);
}


}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
FileSystem fs=FileSystem.get(conf);
if(fs.exists(out))
{
fs.delete(out);
}
Job job = Job.getInstance(conf,"MyRedordReader");
job.setJarByClass(MyRecordReader.class);//打包運行時需喲啊
FileInputFormat.addInputPath(job, in);

job.setInputFormatClass(MyFileInputFormat.class);
//job.setInputFormatClass(KeyValueTextInputFormat.class);
//conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(MyPartition.class);

job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, out);
//TextInputFormat
System.exit(job.waitForCompletion(true)?0:1);
}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM