Hadoop案例(六)小文件處理(自定義InputFormat)


小文件處理(自定義InputFormat)

1.需求分析

無論hdfs還是mapreduce,對於小文件都有損效率,實踐中,又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。將多個小文件合並成一個文件SequenceFile,SequenceFile里面存儲着多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value。

2.數據准備

one.txt

yongpeng weidong weinan
sanfeng luozong xiaoming

two.txt

longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin

three.txt

shuaige changmo zhenqiang 
dongli lingu xuanxuan

最終預期文件格式:

 

3.優化分析

小文件的優化無非以下幾種方式:

(1)在數據采集的時候,就將小文件或小批數據合成大文件再上傳HDFS

(2)在業務處理之前,在HDFS上使用mapreduce程序對小文件進行合並

(3)在mapreduce處理時,可采用CombineTextInputFormat提高效率

4.具體實現

本節采用自定義InputFormat的方式,處理輸入小文件的問題。

(1)自定義一個類繼承FileInputFormat

(2)改寫RecordReader,實現一次讀取一個完整文件封裝為KV

(3)在輸出時使用SequenceFileOutPutFormat輸出合並文件

5.代碼實現

(1)自定義InputFromat

package com.xyg.mapreduce.inputformat;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputformat extends FileInputFormat<NullWritable, BytesWritable>{

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
    
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        // 1 定義一個自己的recordReader
        WholeRecordReader recordReader = new WholeRecordReader();
        
        // 2 初始化recordReader
        recordReader.initialize(split, context);
        
        return recordReader;
    }
}

(2)自定義RecordReader

package com.xyg.mapreduce.inputformat;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit split;
    private Configuration configuration;

    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 獲取傳遞過來的數據
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        
        if (!processed) {
            // 1 定義緩存
            byte[] contents = new byte[(int) split.getLength()];

            // 2 獲取文件系統
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(configuration);

            // 3 讀取內容
            FSDataInputStream fis = null;
            try {
                // 3.1 打開輸入流
                fis = fs.open(path);
                
                // 3.2 讀取文件內容
                IOUtils.readFully(fis, contents, 0, contents.length);
                
                // 3.3 輸出文件內容
                value.set(contents, 0, contents.length);
            } catch (Exception e) {

            } finally {
                IOUtils.closeStream(fis);
            }
            
            processed = true;
            
            return true;
        }
        
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed?1:0;
    }

    @Override
    public void close() throws IOException {

    }
}

(3)InputFormatDriver處理流程

package com.xyg.mapreduce.inputformat;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class InputFormatDriver {

    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
        private Text k = new Text();;

        @Override
        protected void map(NullWritable key, BytesWritable value, Context context)
                throws IOException, InterruptedException {

            // 獲取切片信息
            InputSplit split = context.getInputSplit();
            // 獲取切片路徑
            Path path = ((FileSplit) split).getPath();
            // 根據切片路徑獲取文件名稱
k.set(path.toString());

            // 文件名稱為key
            context.write(k, value);
        }
    }

    public static void main(String[] args) throws Exception {
        args = new String[] { "e:/inputinputformat", "e:/output1" };

        Configuration conf = new Configuration();
        
        Job job = Job.getInstance(conf);
        job.setJarByClass(InputFormatDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM