一、Wordcount練習
1.需求:通過hadoop分析文件中單詞總數
1.要被分析的文件內容如圖所示,每個單詞之間以空格分開
2.實現的效果如圖
2.代碼實現
1.解決數據傾斜問題
考慮到在機器運行過程中 Reduce階段每個相同的Key會由一個ReduceTask來處理,而java共有十六萬個,其他的單詞只有幾個,分出的ReduceTask處理少的單詞很快就完成,但是處理的java的單詞會用一些時間才處理完,這就造成了嚴重的數據傾斜的狀況,所以在這種情況下應該創建多個分區,將整個數據分到不同的分區中, 然后分區中在處理的java單詞數量就會大大減少工作時間,然后將各個分區的統計在相加得出最終結果.具體實現看代碼
2.編寫代碼
package com.wxw.superwc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Random; /*在Hadoop中想要實現數據整合就必須有類去繼承 Mapper 和 Reduce 這兩個類 Map主要用於數據的拆分后 以鍵值對的形式向Reduce輸出 在繼承Mapper中 需要填入泛型約束他的參數類型 Mapper<LongWritable, Text,Text, IntWritable> 第一個參數類型 只能為LongWritable表示讀入文件的偏移量 這個偏移量是讀入行的Key 第二個參數類型 表示傳進來的每一行數據 這一行數據內容是讀入文件的值 第三個參數類型 表示出入的Key的類型 第四個參數類型 標書輸出的Value的類型 繼承Mapper后需要重寫方法 map(LongWritable key, Text value, Context context) map方法中的參數為Mapper的前兩個參數類型 context表示上下文 輸出給Reduce 在文件讀出的每一行都要去調用一次map方法 Reduce主要用於數據的計算 計算的類繼承Reduce后需要填入他的泛型 Reduce<Text,IntWritable,Text,IntWritable> 表示<從Mapper接受他輸出Key的類型,從Mapper接受的value的數據類型,從Reduce輸出到文件Key的類型,從Reduce輸出到文件Value的類型> 重寫reduce方法reduce(Text key, Iterable<IntWritable> values, Context context) 在Reduce是處理的相同鍵不同值的value結合 創建繼承分區Partitioner的類 並重寫方法 getPartition(Text text, IntWritable intWritable, int i) 三個參數表示從Mapper接受的鍵和值的類型及在主程序中設置的分區個數的值 */ public class SuperWc { public static class SupWcMa extends Mapper<LongWritable, Text,Text, IntWritable>{//編寫繼承Mapper的類用於拆分 //重寫map方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] s = value.toString().split(" ");//將讀取到的每一行數據按照空格拆分成數組 for (String name ://利用循環 讓每一單詞作為鍵值為1去輸出 s) { context.write(new Text(name),new IntWritable(1)); } } } public static class SpuWcRe extends Reducer<Text,IntWritable,Text,IntWritable>{//編寫繼承Reduce的類實現計算 //重寫reduce方法 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0;//生成一個計數器 for (IntWritable writable :values//遍歷得相同鍵的值的集合 得到他們的值並通過sum計數統計出數量 ) { sum+=writable.get(); } context.write(key,new IntWritable(sum)); } } public static class fenqu extends Partitioner<Text,IntWritable> {//編寫一個分區類 這個類在Mapper之后 Reduce之前運行 //重寫分區的方法 @Override public int getPartition(Text text, IntWritable intWritable, int i) { Random random = new Random(); return random.nextInt(i);//在主程序中已經設好分區的個數為5 然后隨機的將map得來的值分到不同的區中,減少數據傾斜的弊端 } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //添加驅動 Configuration configuration = new Configuration();//聲明驅動類 Job job = Job.getInstance(configuration);//聲明配置類 job.setMapperClass(SupWcMa.class);//設置運行Mapper的類 job.setReducerClass(SpuWcRe.class);//設置運行Reduce的類 job.setJarByClass(SuperWc.class);//設置在Linux的運行Jar包的類 一般為程序的主類 //設置分區 job.setPartitionerClass(fenqu.class);//加載 寫好的分區類 job.setNumReduceTasks(5);//設置分區的個數 這個個數是設置分區隨機數傳的參 //設置Mapper類輸入出類型參數 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置Reduce的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置文件讀取的路徑 最好是絕對值 FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\superWc\\input")); //加載驅動 FileSystem fs = FileSystem.get(configuration); //設置輸出路徑 Path outPath = new Path("E:\\wordcountdemo\\superWc\\output"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); //提交文件 job.submit(); } }
每個分區會生成一個文件結果如下
再次編寫代碼整合每個分區的文件
package com.wxw.superwc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class SuperWC2 { public static class SupWcMa extends Mapper<LongWritable, Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //分割讀取的文件 String[] s = value.toString().split("\t"); //輸出得到文件的內容 context.write(new Text(s[0]),new IntWritable(Integer.valueOf(s[1]))); } } public static class SpuWcRe extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //將相同鍵的值相加 int sum=0; for (IntWritable writable :values ) { sum+=writable.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //添加驅動 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setMapperClass(SupWcMa.class); job.setReducerClass(SpuWcRe.class); job.setJarByClass(SuperWC2.class); //設置輸入出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設置文件加載 FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\superWc\\output\\part*")); FileSystem fs = FileSystem.get(configuration); //設置輸出路徑 Path outPath = new Path("E:\\wordcountdemo\\superWc\\output2"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); job.submit(); } }
最終結果為
二、共同好友問題
1.先求出誰都有A的好友
package com.wxw; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class friend { //處理map public static class FDMapper extends Mapper<LongWritable, Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get()!=0){ String fridens[]=value.toString().split(":"); String user=fridens[0]; String friendss[]=fridens[1].split(","); System.out.println(111); for (String f :friendss){ context.write(new Text(f),new Text(user)); System.out.println(f); } } } } //處理reduce public static class FDReduce extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer stringBuffer=new StringBuffer(); for (Text item : values) { stringBuffer.append(item).append(","); } context.write(key,new Text(stringBuffer.toString())); System.out.println(stringBuffer); } } //添加主程序 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(friend.class); job.setMapperClass(FDMapper.class); job.setReducerClass(FDReduce.class); //指定map和reduce輸出數據的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\friend\\input")); FileSystem fs = FileSystem.get(conf); Path outPath = new Path("E:\\wordcountdemo\\friend\\output"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); job.submit(); } }
效果如下
2.將得出的結果再次遍歷 產看兩兩結果的交集
package com.wxw; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Arrays; public class friend2 { public static class Fd2Mapper extends Mapper<LongWritable, Text,Text,Text>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String user=value.toString().split("\t")[0]; String[] aa =value.toString().split("\t")[1].split(","); Arrays.sort(aa); for (int i=0;i<aa.length-1;i++) { for (int j=i+1;j<aa.length;j++){ context.write(new Text(aa[i]+"-"+aa[j]),new Text(user)); } } } } public static class FD2Reduce extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer stringBuffer=new StringBuffer(); for (Text te:values ) { stringBuffer.append(te).append(","); } context.write(key,new Text(stringBuffer.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(friend2.class); job.setMapperClass(Fd2Mapper.class); job.setReducerClass(FD2Reduce.class); //指定map和reduce輸出數據的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\friend\\output\\")); FileSystem fs = FileSystem.get(conf); Path outPath = new Path("E:\\wordcountdemo\\friend\\output2"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); job.submit(); } }
運行效果:
三、通過JSON文件讀取並輸出對象
1.將如下JSON轉成對象輸出
2.題意分析 :本次練習只需要讀取文件並輸出,因此並不需要通過Reduce計算 代碼如下
創建實體類
package com.wxw.phone; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //想要在Mapper中出現該類需要實現Writable的類 並且 實現Writable的兩個方法 public class Phone implements Writable { private String uid; private String phone; private String addr; @Override public String toString() { return "Phone{" + "uid='" + uid + '\'' + ", phone='" + phone + '\'' + ", addr='" + addr + '\'' + '}'; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getAddr() { return addr; } public void setAddr(String addr) { this.addr = addr; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.uid); dataOutput.writeUTF(this.phone); dataOutput.writeUTF(this.addr); } @Override public void readFields(DataInput dataInput) throws IOException { this.uid=dataInput.readUTF(); this.phone=dataInput.readUTF(); this.addr=dataInput.readUTF(); } }
創建操作類
package com.wxw.phone; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; public class PhoneMR { //本題只是讓map輸出對象 因此將輸出的Key設置為Phone對象 它的值為空 public static class PhoneMap extends Mapper<LongWritable, Text,Phone, NullWritable>{ //從寫map @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將讀取的數據轉換格式防止輸出文件時出錯 String line=new String(value.getBytes(),0,value.getLength(),"GBK"); //聲明解析Json格式的類 ObjectMapper obj=new ObjectMapper(); //實例化要輸出的對象並通過解析將數據按照對象的類型寫入 Phone phone=obj.readValue(line,Phone.class); //輸出 context.write(phone, NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PhoneMR.class); job.setMapperClass(PhoneMap.class); //只需設置MAp的輸出類型 job.setOutputKeyClass(Phone.class); job.setOutputValueClass(NullWritable.class); //沒有reduce的操作所以就要將它的ReduceTask設置為0為了避免出錯 job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\phone\\input")); FileSystem fs = FileSystem.get(conf); Path outPath = new Path("E:\\wordcountdemo\\phone\\output"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }
四、兩表連接
1.如果要操作兩個表,那么就可以先將小的文件加載在緩存中,在Mapper操作中一起讀寫操作,這樣就可以完成兩個表的操作 如圖 將兩張表的數據整合到一張。
2.代碼編寫
package com.wxw.join; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; //用於兩表聯合 主要是將小的表加載到內存中 連接到大的數據表 public class joinmap { public static class joinMap extends Mapper<LongWritable, Text,Text, NullWritable>{ //定義集合 Map map=new HashMap<String,String>(); //次方法在本類中只執行一次 可以用於優化 @Override protected void setup(Context context) throws IOException, InterruptedException { //創建緩沖池 讀取較小的文件 BufferedReader br = new BufferedReader(new FileReader("E:\\wordcountdemo\\join\\input\\gou.txt")); String line=" "; while(StringUtils.isNotBlank(line=br.readLine())){ //將讀取的文件存在集合中 方便在Mapper類中調用 String[] split = line.split("\t"); map.put(split[0],split[1]); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //設置字符編碼 String line=new String(value.getBytes(),0,value.getLength(),"GBK"); String[] split1 =line.split("\t"); //通過if判斷達到數據清洗的效果 if (split1.length>=2){ System.out.println(split1[0]+" "+split1[1]+" "+map.get(split1[0])); //直接輸出 context.write(new Text(split1[0]+split1[1]+" "+map.get(split1[0])),NullWritable.get()); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //添加配置類 Configuration conf=new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(joinmap.class); job.setMapperClass(joinMap.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //設置分區個數 job.setNumReduceTasks(0); //設置文件讀取輸出配置 FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\join\\input\\guc.txt")); FileSystem fs = FileSystem.get(conf); Path outPath = new Path("E:\\wordcountdemo\\join\\output"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); job.submit(); } }
五、按照某種條件分區的練習
1.根據科目分區,將學生的其平均成績求出如圖
2.代碼實現
創建學生類
package com.wxw.score; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //實現WritableComparable 實現排序的方法 public class student implements WritableComparable <student> { private String name; private String subject; private double avg; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public double getAvg() { return avg; } public void setAvg(double avg) { this.avg = avg; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.name); dataOutput.writeUTF(this.subject); dataOutput.writeDouble(this.avg); } @Override public void readFields(DataInput dataInput) throws IOException { this.name=dataInput.readUTF(); this.subject=dataInput.readUTF(); this.avg=dataInput.readDouble(); } //自定義排序 @Override public int compareTo(student o) { if (o.getAvg() == this.getAvg()){ return o.getName().compareTo(this.name); }else { return o.getAvg() >this.getAvg()?1:-1; } } @Override public String toString() { return "student{" + "name='" + name + '\'' + ", subject='" + subject + '\'' + ", avg=" + avg + '}'; } }
編寫代碼
package com.wxw.score; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.HashMap; public class scoreTest { //創建分區的類 public static class parnt extends Partitioner<student, NullWritable> { HashMap<String, Integer> map = new HashMap<>(); @Override public int getPartition(student student,NullWritable s, int i) { //自定義一個分區的集合 根據map傳過來的值並且自動分區 map.put("computer",0); map.put("english",1); map.put("algorithm",2); map.put("math",3); return map.get(student.getSubject()); } } public static class scoreMap extends Mapper<LongWritable, Text,student, NullWritable> { //map操作 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //切分 String[] split = value.toString().split(","); //實例化總分 Double sum=0.0; //實例化科目數 int num=split.length-2; //實例化avg Double avg=0.0; //實例化學生對象 student student=new student(); student.setName(split[1]); student.setSubject(split[0]); for (int i=2;i<split.length;i++){ sum += (Double) Double.valueOf(split[i]); } avg = sum/num*1.0; student.setAvg(avg); context.write(student,NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //添加驅動 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(scoreTest.class); job.setMapperClass(scoreMap.class); //設置分區 job.setPartitionerClass(parnt.class); job.setNumReduceTasks(4); //設置輸入出 job.setOutputKeyClass(student.class); job.setOutputValueClass(NullWritable.class); //設置文件加載 FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\score\\input")); FileSystem fs = FileSystem.get(configuration); Path outPath = new Path("E:\\wordcountdemo\\score\\output"); if (fs.exists(outPath)){ fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); job.submit(); } }
實現效果