一、自定義InputFormat
InputFormat是輸入流,在前面的例子中使用的是文件輸入輸出流FileInputFormat和FileOutputFormat,而FileInputFormat和FileOutputFormat它們默認使用的是繼承它們的子類TextInputFormat和TextOutputFormat,以Text的方式去讀取數據。
當我們遇到許多小文件,要將他們整理合成為一個文件SequenceFile(存儲了多個小文件),且文件內的存儲格式為:文件路徑+文件內容,這時我們可以通過封裝自定義的InputFormat輸入流來實現需求。
思路如下:
1.自定義FuncFileInputFormat類繼承FileInputFormat(參數類型為NullWritable和BytesWritable),並重寫isSplitable和createRecordReader方法;
2.isSplitable方法中return false即可表示不切割,createRecordReader方法中要返回一個RecordReader類,這是我們要自定義的對輸入文件的業務邏輯,所以創建FuncRecordReader類;
3.FuncRecordReader類繼承RecordReader類,參數類型同為NullWritable和BytesWritable,重寫initialize、nextKeyValue、getCurrentKey、getCurrentValue、getProcess、close方法;
4.Mapper:初始化setup方法,通過context拿到切片、獲取路徑、將路徑寫入定義的全局變量Text t,然后在map階段將t和value輸出到reducer;
5.Reducer:遍歷values,輸出key,value;
6.Driver:在設置完Mapper和Reducer類后,添加設置setInputFormatClass為FuncFileInputFormat、設置setOutputFormatClass為SequenceFileOutputFormat。
代碼如下:
/** * @author: PrincessHug * @date: 2019/3/29, 20:49 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FuncRecordReader recordReader = new FuncRecordReader(); return recordReader; } } public class FuncRecordReader extends RecordReader<NullWritable, BytesWritable> { boolean isProcess = false; FileSplit split; Configuration conf; BytesWritable value = new BytesWritable(); //初始化 @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //初始化切片文件 this.split = (FileSplit) inputSplit; //初始化配置信息 conf = taskAttemptContext.getConfiguration(); } //獲取下一個文件 @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!isProcess){ //根據切片的長度來創建緩沖區 byte[] buf = new byte[(int) split.getLength()]; FSDataInputStream fis = null; FileSystem fs = null; try { //獲取路徑 Path path = split.getPath(); //根據路徑獲取文件系統 fs = path.getFileSystem(conf); //拿到輸入流 fis = fs.open(path); //數據拷貝 IOUtils.readFully(fis,buf,0,buf.length); //拷貝緩存到最終的輸出 value.set(buf,0,buf.length); } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeStream(fis); IOUtils.closeStream(fs); } isProcess = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } } public class SequencceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> { Text t = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //拿到切片信息 FileSplit split = (FileSplit) context.getInputSplit(); //路徑 Path path = split.getPath(); //即帶路徑有待名稱 t.set(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(t,value); } } public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { for (BytesWritable v:values){ context.write(key,v); } } } public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2.獲取Jar包 job.setJarByClass(SequenceFileDriver.class); //3.獲取Mapper、Redcuer類 job.setMapperClass(SequencceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); //4.設置自定義讀取方法 job.setInputFormatClass(FuncFileInputFormat.class); //5.設置默認的輸出方式 job.setOutputFormatClass(SequenceFileOutputFormat.class); //6.獲取Mapper輸出數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); //7.獲取Reducer輸出數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); //8.設置輸入存在的路徑與處理后的結果路徑 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\inputformat\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\inputformat\\out")); //9.提交任務 if (job.waitForCompletion(true)){ System.out.println("運行完成!"); }else { System.out.println("運行失敗!"); } } }
二、自定義OutputFormat
需求:目前我們有一個網站ip的文件,每行都有一個網站的ip地址,要求我們將含有“www.baidu.com”的ip地址取出放入一個結果文件,其他的地址放入另一個結果文件。
思路:1.首先Mapper、Reduer就是簡單的讀取數據、寫出數據;
2.自定義FuncFileOutputFormat,重寫它的getRecordWriter方法,返回一個FIleRecordWriter對象,這里我們再定義一個FileRecordWriter,重寫FileRecordWriter、write、close方法;
3.Driver:再設置Reducer輸出后添加設置setOutputFormatClass為我們自定義的FuncFileOutputFormat即可;
代碼如下:
/** * @author: PrincessHug * @date: 2019/3/30, 14:44 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class FileMapper extends Mapper<LongWritable, Text, IntWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new IntWritable(1),new value); } } public class FileReducer extends Reducer<IntWritable, Text,Text,NullWritable> { @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text k:values){ String s = k.toString() + "\n"; context.write(new Text(s),NullWritable.get()); } } } public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new FileRecordWriter(taskAttemptContext); } } public class FileRecordWriter extends RecordWriter<Text, NullWritable> { Configuration conf = null; FSDataOutputStream baidulog = null; FSDataOutputStream otherlog = null; //定義數據輸出路徑 public FileRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException { //獲取配置信息和文件系統 conf = taskAttemptContext.getConfiguration(); FileSystem fs = FileSystem.get(conf); //定義輸出路徑 itstarlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\itstart\\baidu.logs")); otherlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\other\\other.logs")); } //數據輸出 @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { if (key.toString().contains("baidu")){ baidulog.write(key.getBytes()); }else { otherlog.write(key.getBytes()); } } //關閉資源 @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { if (itstarlog != null){ itstarlog.close(); } if (otherlog != null){ otherlog.close(); } } } public class FileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //配置、job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //jar包 job.setJarByClass(FileDriver.class); //Mapper、Reducer job.setMapperClass(FileMapper.class); job.setReducerClass(FileReducer.class); //Mapper輸出 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); //Reudcer輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //自定義輸出類 job.setOutputFormatClass(FuncFileOutputFormat.class); //文件輸入輸出流 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\outputformat\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\outputformat\\out")); //提交任務 if (job.waitForCompletion(true)){ System.out.println("運行完成!"); }else { System.out.println("運行失敗!"); } } }