自定義InputFormat和OutputFormat案例


一、自定義InputFormat

  InputFormat是輸入流,在前面的例子中使用的是文件輸入輸出流FileInputFormat和FileOutputFormat,而FileInputFormat和FileOutputFormat它們默認使用的是繼承它們的子類TextInputFormat和TextOutputFormat,以Text的方式去讀取數據。

  當我們遇到許多小文件,要將他們整理合成為一個文件SequenceFile(存儲了多個小文件),且文件內的存儲格式為:文件路徑+文件內容,這時我們可以通過封裝自定義的InputFormat輸入流來實現需求。

  思路如下:

    1.自定義FuncFileInputFormat類繼承FileInputFormat(參數類型為NullWritable和BytesWritable),並重寫isSplitable和createRecordReader方法;

    2.isSplitable方法中return false即可表示不切割,createRecordReader方法中要返回一個RecordReader類,這是我們要自定義的對輸入文件的業務邏輯,所以創建FuncRecordReader類;

    3.FuncRecordReader類繼承RecordReader類,參數類型同為NullWritable和BytesWritable,重寫initialize、nextKeyValue、getCurrentKey、getCurrentValue、getProcess、close方法;

    4.Mapper:初始化setup方法,通過context拿到切片、獲取路徑、將路徑寫入定義的全局變量Text t,然后在map階段將t和value輸出到reducer;

    5.Reducer:遍歷values,輸出key,value;

    6.Driver:在設置完Mapper和Reducer類后,添加設置setInputFormatClass為FuncFileInputFormat、設置setOutputFormatClass為SequenceFileOutputFormat。

  代碼如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/29, 20:49
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FuncRecordReader recordReader = new FuncRecordReader();
        return recordReader;
    }
}

public class FuncRecordReader  extends RecordReader<NullWritable, BytesWritable> {
    boolean isProcess = false;
    FileSplit split;
    Configuration conf;
    BytesWritable value = new BytesWritable();

    //初始化
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //初始化切片文件
        this.split = (FileSplit) inputSplit;

        //初始化配置信息
        conf = taskAttemptContext.getConfiguration();

    }

    //獲取下一個文件
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!isProcess){
            //根據切片的長度來創建緩沖區
            byte[] buf = new byte[(int) split.getLength()];
            FSDataInputStream fis = null;
            FileSystem fs = null;

            try {
                //獲取路徑
                Path path = split.getPath();
                //根據路徑獲取文件系統
                fs = path.getFileSystem(conf);
                //拿到輸入流
                fis = fs.open(path);
                //數據拷貝
                IOUtils.readFully(fis,buf,0,buf.length);
                //拷貝緩存到最終的輸出
                value.set(buf,0,buf.length);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                IOUtils.closeStream(fis);
                IOUtils.closeStream(fs);
            }
            isProcess = true;
            return true;
        }

        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {

    }
}

public class SequencceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> {
    Text t = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //拿到切片信息
        FileSplit split = (FileSplit) context.getInputSplit();
        //路徑
        Path path = split.getPath();
        //即帶路徑有待名稱
        t.set(path.toString());
    }

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(t,value);
    }
}

public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        for (BytesWritable v:values){
            context.write(key,v);
        }
    }
}

public class SequenceFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.獲取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2.獲取Jar包
        job.setJarByClass(SequenceFileDriver.class);

        //3.獲取Mapper、Redcuer類
        job.setMapperClass(SequencceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        //4.設置自定義讀取方法
        job.setInputFormatClass(FuncFileInputFormat.class);

        //5.設置默認的輸出方式
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        //6.獲取Mapper輸出數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        //7.獲取Reducer輸出數據類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //8.設置輸入存在的路徑與處理后的結果路徑
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\inputformat\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\inputformat\\out"));

        //9.提交任務
        if (job.waitForCompletion(true)){
            System.out.println("運行完成!");
        }else {
            System.out.println("運行失敗!");
        }
    }
}

 

  

二、自定義OutputFormat

  需求:目前我們有一個網站ip的文件,每行都有一個網站的ip地址,要求我們將含有“www.baidu.com”的ip地址取出放入一個結果文件,其他的地址放入另一個結果文件。

  思路:1.首先Mapper、Reduer就是簡單的讀取數據、寫出數據;

    2.自定義FuncFileOutputFormat,重寫它的getRecordWriter方法,返回一個FIleRecordWriter對象,這里我們再定義一個FileRecordWriter,重寫FileRecordWriter、write、close方法;

    3.Driver:再設置Reducer輸出后添加設置setOutputFormatClass為我們自定義的FuncFileOutputFormat即可;

  代碼如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/30, 14:44
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class FileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new IntWritable(1),new value);
    }
}

public class FileReducer extends Reducer<IntWritable, Text,Text,NullWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text k:values){
            String s = k.toString() + "\n";
            context.write(new Text(s),NullWritable.get());
        }

    }
}

public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new FileRecordWriter(taskAttemptContext);
    }
}

public class FileRecordWriter extends RecordWriter<Text, NullWritable> {
    Configuration conf = null;
    FSDataOutputStream baidulog = null;
    FSDataOutputStream otherlog = null;

    //定義數據輸出路徑
    public FileRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        //獲取配置信息和文件系統
        conf = taskAttemptContext.getConfiguration();
        FileSystem fs = FileSystem.get(conf);
        //定義輸出路徑
       itstarlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\itstart\\baidu.logs"));
       otherlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\other\\other.logs"));
    }

    //數據輸出
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        if (key.toString().contains("baidu")){
            baidulog.write(key.getBytes());
        }else {
            otherlog.write(key.getBytes());
        }
    }

    //關閉資源
    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        if (itstarlog != null){
            itstarlog.close();
        }
        if (otherlog != null){
            otherlog.close();
        }
    }
}


public class FileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //配置、job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //jar包
        job.setJarByClass(FileDriver.class);

        //Mapper、Reducer
        job.setMapperClass(FileMapper.class);
        job.setReducerClass(FileReducer.class);

        //Mapper輸出
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        //Reudcer輸出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //自定義輸出類
        job.setOutputFormatClass(FuncFileOutputFormat.class);

        //文件輸入輸出流
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\outputformat\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\outputformat\\out"));

        //提交任務
        if (job.waitForCompletion(true)){
            System.out.println("運行完成!");
        }else {
            System.out.println("運行失敗!");
        }
    }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM