數據清洗-使用hadoop進行數據清洗操作


需求:目前業務有一對真實數據,需要入庫到數據庫中,但是需要進行清洗操作,去除數據長度都符合的數據。要求符合要求的數據和清洗出來的數據都需要保存。

1.不多說直接上代碼

package com.kkt.mr2.Dataclean;

/**
* @author yang****
* @date 2021.06.25
*/

import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import javax.xml.crypto.Data;
import java.io.IOException;

/**
* 數據清洗,將符合規則的和不符合規則的數據分別放置在不同的文件中,並統計不符合規則的數據數目
*/
public class DataCleanOut extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), DataCleanOut.class.getSimpleName());
job.setJarByClass(DataCleanOut.class);
      //定義輸入的數據類型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(args[0]));
      //定義我們的自定義mapper
job.setMapperClass(DatacleanOutMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
      //定義自定義的分區
job.setPartitionerClass(DataCleanPartition.class);
job.setNumReduceTasks(2);
      //數據清洗是沒有reduce操作的,所以我們直接定義輸出的數據類型,使用的是默認的reduce即直接輸出map產生的數據而不做任何處理
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
     //為了解決重復運行時,文件夾存在報錯,添加一個文件夾已存在判斷

FileSystem fs = FileSystem.get(super.getConf());
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]),true);
}
TextOutputFormat.setOutputPath(job,new Path(args[1]));
     //真正開始執行
boolean b = job.waitForCompletion(true);

return b?0:1;

}
//自定義的mapper
public static class DatacleanOutMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
private NullWritable nullWritable;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
nullWritable=NullWritable.get();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter("errorCount", "countNum");
if((value.toString().split("\t").length!=6)){
counter.increment(1L);
}
context.write(value,nullWritable);
}

}
   //自定義分區
public static class DataCleanPartition extends Partitioner<Text, NullWritable>{

@Override
public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
String[] wordsSP = text.toString().split("\t");
if(wordsSP.length==6){
return 0;
}else {
return 1;
}
}
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new DataCleanOut(), args);
System.exit(run);
}

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM