一、數據樣例
- 文件一:one.txt
pangtouyu yinain yutang chaojidan
pikaqiu Dalen study let me happy
- 文件二:two.txt
longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin
- 文件三:three.txt
shuaige changmo zhenqiang
dongli lingu xuanxuan
二、需求
- 無論hdfs還是mapreduce,對於小文件都有損效率,實踐中,又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。將多個小文件合並成一個文件SequenceFile,SequenceFile里面存儲着多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value。
三、分析
-
小文件的優化無非以下幾種方式:
(1)在數據采集的時候,就將小文件或小批數據合成大文件再上傳HDFS
(2)在業務處理之前,在HDFS上使用mapreduce程序對小文件進行合並
(3)在mapreduce處理時,可采用CombineTextInputFormat提高效率 -
本節采用自定義InputFormat的方式,處理輸入小文件的問題。
(1)自定義一個類繼承FileInputFormat
(2)改寫RecordReader,實現一次讀取一個完整文件封裝為KV
(3)在輸出時使用SequenceFileOutPutFormat輸出合並文件
四、代碼實現
- 1、創建 WholeRecordReader 類:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable>{
private Configuration configuration;
private FileSplit split;
private boolean processed = false;
private BytesWritable value = new BytesWritable();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit)split;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
// 1 定義緩存區
byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 獲取文件系統
Path path = split.getPath();
fs = path.getFileSystem(configuration);
// 3 讀取數據
fis = fs.open(path);
// 4 讀取文件內容
IOUtils.readFully(fis, contents, 0, contents.length);
// 5 輸出文件內容
value.set(contents, 0, contents.length);
} catch (Exception e) {
}finally {
IOUtils.closeStream(fis);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed? 1:0;
}
@Override
public void close() throws IOException {
}
}
- 2、創建 WholeFileInputformat 類:
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
// 定義類繼承FileInputFormat
public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
- 3、創建 SequenceFileMapper 類:
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
Text k = new Text();
@Override
protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// 1 獲取文件切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 2 獲取切片名稱
String name = inputSplit.getPath().toString();
// 3 設置key的輸出
k.set(name);
}
@Override
protected void map(NullWritable key, BytesWritable value,
Context context)
throws IOException, InterruptedException {
context.write(k, value);
}
}
- 4、創建 SequenceFileReducer 類:
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
- 5、創建 SequenceFileDriver 類:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[] { "e:/input/inputinputformat", "e:/output1" };
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 設置輸入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
// 設置輸出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
- 結果圖: