需求:目前業務有一對真實數據,需要入庫到數據庫中,但是需要進行清洗操作,去除數據長度都符合的數據。要求符合要求的數據和清洗出來的數據都需要保存。
1.不多說直接上代碼
package com.kkt.mr2.Dataclean;
/**
* @author yang****
* @date 2021.06.25
*/
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import javax.xml.crypto.Data;
import java.io.IOException;
/**
* 數據清洗,將符合規則的和不符合規則的數據分別放置在不同的文件中,並統計不符合規則的數據數目
*/
public class DataCleanOut extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), DataCleanOut.class.getSimpleName());
job.setJarByClass(DataCleanOut.class);
//定義輸入的數據類型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(args[0]));
//定義我們的自定義mapper
job.setMapperClass(DatacleanOutMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//定義自定義的分區
job.setPartitionerClass(DataCleanPartition.class);
job.setNumReduceTasks(2);
//數據清洗是沒有reduce操作的,所以我們直接定義輸出的數據類型,使用的是默認的reduce即直接輸出map產生的數據而不做任何處理
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
//為了解決重復運行時,文件夾存在報錯,添加一個文件夾已存在判斷
FileSystem fs = FileSystem.get(super.getConf());
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
TextOutputFormat.setOutputPath(job,new Path(args[1]));
//真正開始執行
boolean b = job.waitForCompletion(true);
return b?0:1;
}
//自定義的mapper
public static class DatacleanOutMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
private NullWritable nullWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
nullWritable=NullWritable.get();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter("errorCount", "countNum");
if((value.toString().split("\t").length!=6)){
counter.increment(1L);
}
context.write(value,nullWritable);
}
}
//自定義分區
public static class DataCleanPartition extends Partitioner<Text, NullWritable>{
@Override
public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
String[] wordsSP = text.toString().split("\t");
if(wordsSP.length==6){
return 0;
}else {
return 1;
}
}
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new DataCleanOut(), args);
System.exit(run);
}
}