需求:目前业务有一对真实数据,需要入库到数据库中,但是需要进行清洗操作,去除数据长度都符合的数据。要求符合要求的数据和清洗出来的数据都需要保存。
1.不多说直接上代码
package com.kkt.mr2.Dataclean;
/**
* @author yang****
* @date 2021.06.25
*/
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import javax.xml.crypto.Data;
import java.io.IOException;
/**
* 数据清洗,将符合规则的和不符合规则的数据分别放置在不同的文件中,并统计不符合规则的数据数目
*/
public class DataCleanOut extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), DataCleanOut.class.getSimpleName());
job.setJarByClass(DataCleanOut.class);
//定义输入的数据类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(args[0]));
//定义我们的自定义mapper
job.setMapperClass(DatacleanOutMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//定义自定义的分区
job.setPartitionerClass(DataCleanPartition.class);
job.setNumReduceTasks(2);
//数据清洗是没有reduce操作的,所以我们直接定义输出的数据类型,使用的是默认的reduce即直接输出map产生的数据而不做任何处理
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
//为了解决重复运行时,文件夹存在报错,添加一个文件夹已存在判断
FileSystem fs = FileSystem.get(super.getConf());
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
TextOutputFormat.setOutputPath(job,new Path(args[1]));
//真正开始执行
boolean b = job.waitForCompletion(true);
return b?0:1;
}
//自定义的mapper
public static class DatacleanOutMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
private NullWritable nullWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
nullWritable=NullWritable.get();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter("errorCount", "countNum");
if((value.toString().split("\t").length!=6)){
counter.increment(1L);
}
context.write(value,nullWritable);
}
}
//自定义分区
public static class DataCleanPartition extends Partitioner<Text, NullWritable>{
@Override
public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
String[] wordsSP = text.toString().split("\t");
if(wordsSP.length==6){
return 0;
}else {
return 1;
}
}
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new DataCleanOut(), args);
System.exit(run);
}
}