自定義InputFormat案例實操


引言:

  無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現小文件的合並。(對外是一個整文件,對內仍是原先的小文件,節省MapTask)

需求如下:

  將多個小文件合並成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式),SequenceFile里面存儲着多個文件,存儲的形式為文件路徑+名稱為key文件內容為value

 

(1)輸入數據

 

          

 

(2)期望輸出文件格式

  

 

步驟:

 

 

 

 

 

 

 

程序實現

(1)自定義InputFromat

package cn.mark.mrInputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

//存儲的形式為文件路徑+名稱為key,文件內容為value。讀全部文件用到流,byte
//故 輸入Key類型為Text,輸入Value類型為BytesWritable
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {
// 定義類繼承FileInputFormat

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;//單個文件不允許再切片
    }

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {

        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split,context);
        return recordReader;
    }
}

(2)自定義RecordReader類(核心)

package cn.mark.mrInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
//RecordReader<Text, BytesWritable> 固有的輸入KV格式
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

//    主要針對缺什么補什么
    FileSplit split;
    Configuration configuration;
    Text k = new Text();
    BytesWritable v = new BytesWritable();
//    標記位
    boolean isProgress = true;
//************************************
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//      初始化
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
//        核心業務邏輯處理   對key和value進行封裝
        if (isProgress){

            /** The number of bytes in the file to process. 獲取文件字節的全部數量
            public long getLength() { return fs.getLength(); }      */
            byte[] buf = new byte[(int) split.getLength()];
    //        1.獲取fs對象
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(configuration);

    //        2.獲取輸入流
            FSDataInputStream fis = fs.open(path);

    //        3.拷貝
    //        readFully(InputStream in, byte buf[],    int off,        int len)
    //        4各參數:   1.要讀的流       2.目的地     3.讀的大小的起始位置 4.讀的長度
            /**
             * Reads len bytes in a loop.
             * @param in InputStream to read from
             * @param buf The buffer to fill
             * @param off offset from the buffer :緩沖區的偏移量,即開始位置
             * @param len the length of bytes to read
               先開辟一段相應長度的字節緩沖區,再讀內容進去                      */
            IOUtils.readFully(fis,buf,0,buf.length);

    //        4.封裝v  v是文件的內容
            /**Set the value to a copy of the given byte range
             * @param newData the new values to copy in
             * @param offset the offset in newData to start at
             * @param length the number of bytes to copy
            public void set(byte[] newData, int offset, int length)
             再將之前的緩沖區內容通過set方法,設置成v的值    */
            v.set(buf,0,buf.length);

    //        5.封裝K ,k本身就是路徑名  path.toString():既有路徑又有文件名稱
            k.set(path.toString());

    //        6.關閉資源
            IOUtils.closeStream(fis);

//            能進來說明能讀到數據,而且每次調用nextKeyValue函數時候是說明已經新讀一個文件,
//            本WholeRecordReader類會重新創建對象,重新初始化,isProgress都會重新設為true
            isProgress = false;//說明本文件已經讀完!
            return true;        //只有return true才會執行下面的函數
         /**   public void run(Context context) throws IOException, InterruptedException {
                setup(context);
                try {
                    while (context.nextKeyValue()) { //<往常只讀一行,有數據則true>
                        map(context.getCurrentKey(), context.getCurrentValue(), context);
                    }
                } finally {
                    cleanup(context);
                }
            }     下如果只寫ture 則會無限循環,如果只寫false則會不進循環,不進行讀寫操作,
                        故需要一個標記*/
        }
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {

        return k;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {

        return v;
    }
//***********************************
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {

    }
}

 

(3)編寫SequenceFileMapper類處理流程

package cn.mark.mrInputFormat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> {
    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
//        不是一次讀取一行,是一次讀取一整個文件
        context.write(key,value);
    }
}

(4)編寫SequenceFileReducer類處理流程

package cn.mark.mrInputFormat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
//  要知道傳過來的是什么數據及其類型
//  傳過來a.txt , b.txt 輸出 :  <路徑名文件名,文件內容>
public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

//        循環寫出 每次都是一個文件的全部內容
        for (BytesWritable value :
                values) {
            context.write(key,value);
        }

    }
}

(5)編寫SequenceFileDriver類處理流程

package cn.mark.mrInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

public class SequenceFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置
        args = new String[] { "C:\\Users\\Administrator\\Downloads\\input\\123",
                "C:\\Users\\Administrator\\Downloads\\input\\output" };

        // 1 獲取job對象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 設置jar包存儲位置、關聯自定義的mapper和reducer
        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        // 7設置輸入的inputFormat
        job.setInputFormatClass(WholeFileInputformat.class);

        // 8設置輸出的outputFormat  默認是Text.class
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 3 設置map輸出端的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 4 設置最終輸出端的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        // 5 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交job
        job.waitForCompletion(true);

    }
}

 

 ok!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM