一:自定義OutputFormat類
MapReduce默認的OutPutFormat會將結果輸出文件放置到一個我們指定的目錄下,但如果想把輸出文件根據某個條件,把滿足不同條件的內容分別輸出到不同的目錄下,
就需要自定義實現OutputFormat類,且重寫RecordWriter方法。在驅動類中設置job.setOutputFormatClass方法為自定義實現的OutputFormat類
下面案例是一組購物文本數據,將其中的好評和差評分別輸出到對應的好評文件夾下、差評文件夾下。
二:自定義實現OutputFormat類代碼實現
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 自定義實現OutputFormat類 */ public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //從這個方法里面可以獲取一個configuration Configuration configuration = context.getConfiguration(); //獲取文件系統的對象 FileSystem fileSystem = FileSystem.get(configuration); //好評文件的輸出路徑 Path goodComment = new Path("file:///F:\\goodComment\\1.txt"); //差評文件的輸出路徑 Path badComment = new Path("file:///F:\\badComment\\1.txt"); //獲取到了兩個輸出流 FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment); FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment); MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1); return myRecordWriter; } }
三:自定義實現RecordWriter類
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class MyRecordWriter extends RecordWriter<Text,NullWritable> { private FSDataOutputStream goodStream; private FSDataOutputStream badSteam; public MyRecordWriter(){ } public MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){ this.goodStream = goodStream; this.badSteam= badSteam; } /** * 重寫write方法 * 這個write方法就是往外寫出去數據,我們可以根據這個key,來判斷文件究竟往哪個目錄下面寫 * goodStream:指定輸出文件 * badSteam:自定輸出文件 * @param key:k3 * @param value * @throws IOException * @throws InterruptedException */ @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String[] split = key.toString().split("\t"); //獲取評論狀態 0 好評 1 中評 2 差評 // split[9] //判斷評評論狀態,如果是小於等於1,都寫到好評文件里面去 if(Integer.parseInt(split[9])<=1){ //好評 goodStream.write(key.getBytes()); goodStream.write("\r\n".getBytes()); }else{ //差評 badSteam.write(key.getBytes()); badSteam.write("\r\n".getBytes()); } } /** * 關閉資源 * @param context:上下文對象 * @throws IOException * @throws InterruptedException */ @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(badSteam); IOUtils.closeStream(goodStream); } }
四:自定義Map類
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
五:驅動程序
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyOutputMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), "ownOutputFormat"); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///F:\\input")); job.setMapperClass(MyOutputMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputFormatClass(MyOutputFormat.class); //由於重寫了FileOutputFormat,所以下面這個指定的目錄內不會有輸出文件 //輸出文件在MyOutputFormat中重新指定 MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output")); boolean b = job.waitForCompletion(true); return b?0:1; } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args); System.exit(run); } }