MapReduce設置輸出文件到多個文件夾下


一:自定義OutputFormat類
MapReduce默認的OutPutFormat會將結果輸出文件放置到一個我們指定的目錄下,但如果想把輸出文件根據某個條件,把滿足不同條件的內容分別輸出到不同的目錄下,

就需要自定義實現OutputFormat類,且重寫RecordWriter方法。在驅動類中設置job.setOutputFormatClass方法為自定義實現的OutputFormat類

下面案例是一組購物文本數據,將其中的好評和差評分別輸出到對應的好評文件夾下、差評文件夾下。

二:自定義實現OutputFormat類代碼實現

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 自定義實現OutputFormat類
*/
public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {

@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
//從這個方法里面可以獲取一個configuration
Configuration configuration = context.getConfiguration();
//獲取文件系統的對象
FileSystem fileSystem = FileSystem.get(configuration);
//好評文件的輸出路徑
Path goodComment = new Path("file:///F:\\goodComment\\1.txt");

//差評文件的輸出路徑
Path badComment = new Path("file:///F:\\badComment\\1.txt");

//獲取到了兩個輸出流
FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);

MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);

return myRecordWriter;
}
}

三:自定義實現RecordWriter類

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter<Text,NullWritable> {
private FSDataOutputStream goodStream;
private FSDataOutputStream badSteam;

public MyRecordWriter(){

}

public MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){
this.goodStream = goodStream;
this.badSteam= badSteam;

}

/**
* 重寫write方法
* 這個write方法就是往外寫出去數據,我們可以根據這個key,來判斷文件究竟往哪個目錄下面寫
* goodStream:指定輸出文件
* badSteam:自定輸出文件
* @param key:k3
* @param value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String[] split = key.toString().split("\t");
//獲取評論狀態 0 好評 1 中評 2 差評
// split[9]
//判斷評評論狀態,如果是小於等於1,都寫到好評文件里面去
if(Integer.parseInt(split[9])<=1){
//好評
goodStream.write(key.getBytes());
goodStream.write("\r\n".getBytes());
}else{
//差評
badSteam.write(key.getBytes());
badSteam.write("\r\n".getBytes());
}
}

/**
* 關閉資源
* @param context:上下文對象
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(badSteam);
IOUtils.closeStream(goodStream);
}
}

四:自定義Map類

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}

五:驅動程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyOutputMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "ownOutputFormat");

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));


job.setMapperClass(MyOutputMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);


job.setOutputFormatClass(MyOutputFormat.class);
//由於重寫了FileOutputFormat,所以下面這個指定的目錄內不會有輸出文件
//輸出文件在MyOutputFormat中重新指定
MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output"));

boolean b = job.waitForCompletion(true);

return b?0:1;
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
System.exit(run);
}

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM