小文件處理(自定義InputFormat)
1.需求分析
無論hdfs還是mapreduce,對於小文件都有損效率,實踐中,又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。將多個小文件合並成一個文件SequenceFile,SequenceFile里面存儲着多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value。
2.數據准備
one.txt
yongpeng weidong weinan
sanfeng luozong xiaoming
two.txt
longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin
three.txt
shuaige changmo zhenqiang
dongli lingu xuanxuan
最終預期文件格式:
3.優化分析
小文件的優化無非以下幾種方式:
(1)在數據采集的時候,就將小文件或小批數據合成大文件再上傳HDFS
(2)在業務處理之前,在HDFS上使用mapreduce程序對小文件進行合並
(3)在mapreduce處理時,可采用CombineTextInputFormat提高效率
4.具體實現
本節采用自定義InputFormat的方式,處理輸入小文件的問題。
(1)自定義一個類繼承FileInputFormat
(2)改寫RecordReader,實現一次讀取一個完整文件封裝為KV
(3)在輸出時使用SequenceFileOutPutFormat輸出合並文件
5.代碼實現
(1)自定義InputFromat
package com.xyg.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 1 定義一個自己的recordReader WholeRecordReader recordReader = new WholeRecordReader(); // 2 初始化recordReader recordReader.initialize(split, context); return recordReader; } }
(2)自定義RecordReader
package com.xyg.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit split; private Configuration configuration; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 獲取傳遞過來的數據 this.split = (FileSplit) split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { // 1 定義緩存 byte[] contents = new byte[(int) split.getLength()]; // 2 獲取文件系統 Path path = split.getPath(); FileSystem fs = path.getFileSystem(configuration); // 3 讀取內容 FSDataInputStream fis = null; try { // 3.1 打開輸入流 fis = fs.open(path); // 3.2 讀取文件內容 IOUtils.readFully(fis, contents, 0, contents.length); // 3.3 輸出文件內容 value.set(contents, 0, contents.length); } catch (Exception e) { } finally { IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return processed?1:0; } @Override public void close() throws IOException { } }
(3)InputFormatDriver處理流程
package com.xyg.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class InputFormatDriver { static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text k = new Text();; @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { // 獲取切片信息 InputSplit split = context.getInputSplit(); // 獲取切片路徑 Path path = ((FileSplit) split).getPath(); // 根據切片路徑獲取文件名稱 k.set(path.toString()); // 文件名稱為key context.write(k, value); } } public static void main(String[] args) throws Exception { args = new String[] { "e:/inputinputformat", "e:/output1" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InputFormatDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }