已知兩張數據表,其中表一存儲的是學生編號、學生姓名;表二存儲的是學生編號、考試科目、考試成績;編寫mapreduce程序,匯總兩張表數據為一張統一表格。
表一:
A001 zhangsan
A002 lisi
A003 wangwu
A004 zhaoliu
A005 tianqi
表二:
A001 math 80 A002 math 76 A003 math 90 A004 math 67 A005 math 78 A001 english 78 A002 english 69 A003 english 88 A004 english 98 A005 english 56 A001 computer 56 A002 computer 77 A003 computer 84 A004 computer 92 A005 computer 55
正確結果:
執行java程序,打印出part-r-00000中數據:
代碼如下(由於水平有限,不保證完全正確,如果發現錯誤歡迎指正):
package com; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Test { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration config = new Configuration(); config.set("fs.defaultFS", "hdfs://192.168.0.100:9000"); config.set("yarn.resourcemanager.hostname", "192.168.0.100"); FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(Test.class); //設置所用到的map類 job.setMapperClass(myMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設置用到的reduce類 job.setReducerClass(myReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //設置輸入輸出地址 FileInputFormat.addInputPath(job, new Path("/day19/")); Path path = new Path("/output5/"); if(fs.exists(path)){ fs.delete(path, true); } //指定文件的輸出地址 FileOutputFormat.setOutputPath(job, path); //啟動處理任務job boolean completion = job.waitForCompletion(true); if(completion){ System.out.println("Job Success!"); } } public static class myMapper extends Mapper<Object, Text, Text, Text> { // 實現map函數 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String temp=new String();// 左右表標識 String values=value.toString(); String words[]=values.split("\t"); String mapkey = new String(); String mapvalue = new String(); //左表:A001,zhangsan if (words.length==2) { mapkey = words[0]; mapvalue =words[1]; temp = "1"; }else{ //右表:A001,math,80 mapkey = words[0]; mapvalue =words[1]+"="+words[2]; temp = "2"; } // 輸出左右表 //左表:(A001,1+zhangsan) //右表:(A001,2+math=80) context.write(new Text(mapkey), new Text(temp + "+"+ mapvalue)); System.out.println("key:"+mapkey+"---value:"+mapvalue); } } //reduce解析map輸出,將value中數據按照左右表分別保存 public static class myReducer extends Reducer<Text, Text, Text, Text> { // 實現reduce函數 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //學生的數組 List<String> people =new ArrayList<String>(); //成績的數組 List<String> score =new ArrayList<String>(); //(A001,{1+zhangsan,2+math=80}) for(Text value:values){ // 取得左右表標識 char temp = (char) value.charAt(0); //1 String words[] = value.toString().split("[+]"); //1,zhangsan if(temp == '1'){ people.add(words[1]); } if(temp == '2'){ score.add(words[1]); } } //遍歷兩次,求出笛卡爾積 for (String p : people) { for (String s : score) { context.write(new Text(p), new Text(s)); } } } } }
如果您認為這篇文章還不錯或者有所收獲,您可以通過右邊的“打賞”功能 打賞我一杯咖啡【物質支持】,也可以點擊下方的【好文要頂】按鈕【精神支持】,因為這兩種支持都是使我繼續寫作、分享的最大動力!