MapReduce:匯總學生表和成績表為----學生成績表


已知兩張數據表,其中表一存儲的是學生編號、學生姓名;表二存儲的是學生編號、考試科目、考試成績;編寫mapreduce程序匯總兩張表數據為一張統一表格。

表一:

A001 zhangsan
A002 lisi
A003 wangwu
A004 zhaoliu
A005 tianqi

 

表二:

A001 math 80
A002 math 76
A003 math 90
A004 math 67
A005 math 78
A001 english 78
A002 english 69
A003 english 88
A004 english 98
A005 english 56
A001 computer 56
A002 computer 77
A003 computer 84
A004 computer 92
A005 computer 55

 

正確結果:

執行java程序,打印出part-r-00000中數據

 

代碼如下(由於水平有限,不保證完全正確,如果發現錯誤歡迎指正):

package com;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Test {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration config = new  Configuration();
        config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
        config.set("yarn.resourcemanager.hostname", "192.168.0.100");
        
        FileSystem fs = FileSystem.get(config);
        
        Job job = Job.getInstance(config);
        
        job.setJarByClass(Test.class);
        
        //設置所用到的map類
        job.setMapperClass(myMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        //設置用到的reduce類
        job.setReducerClass(myReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //設置輸入輸出地址
        FileInputFormat.addInputPath(job, new Path("/day19/"));
        
        Path path = new Path("/output5/");
        
        if(fs.exists(path)){
            fs.delete(path, true);
        }
        
        //指定文件的輸出地址
        FileOutputFormat.setOutputPath(job, path);
        
        //啟動處理任務job
        boolean completion = job.waitForCompletion(true);
        if(completion){
            System.out.println("Job Success!");
        }
    }
    
    public static class myMapper extends Mapper<Object, Text, Text, Text> {

        // 實現map函數
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String temp=new String();// 左右表標識
            String values=value.toString();
            String words[]=values.split("\t");
            
            String mapkey = new String();
            String mapvalue = new String(); 
           
            //左表:A001,zhangsan
            if (words.length==2) {
                mapkey = words[0];
                mapvalue =words[1];
                temp = "1";
                
            }else{
                //右表:A001,math,80
                mapkey = words[0];
                mapvalue =words[1]+"="+words[2];
                temp = "2";
            }
            
            // 輸出左右表
            //左表:(A001,1+zhangsan)
            //右表:(A001,2+math=80)
            context.write(new Text(mapkey), new Text(temp + "+"+ mapvalue));
            System.out.println("key:"+mapkey+"---value:"+mapvalue);
        }
    }

    //reduce解析map輸出,將value中數據按照左右表分別保存
    public static class myReducer extends Reducer<Text, Text, Text, Text> {
        // 實現reduce函數
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            
            //學生的數組
            List<String> people =new ArrayList<String>();
            
            //成績的數組
            List<String> score =new ArrayList<String>();
            
            //(A001,{1+zhangsan,2+math=80})
            for(Text value:values){
                // 取得左右表標識
                char temp = (char) value.charAt(0);  //1
                String words[] = value.toString().split("[+]"); //1,zhangsan
                if(temp == '1'){
                    people.add(words[1]);
                }

                if(temp == '2'){
                    score.add(words[1]);
                }
            }
            
            //遍歷兩次,求出笛卡爾積
            for (String p : people) {
                for (String s : score) {
                    context.write(new Text(p), new Text(s));
                }
            }
        }
    } 
}

 

如果您認為這篇文章還不錯或者有所收獲,您可以通過右邊的“打賞”功能 打賞我一杯咖啡【物質支持】,也可以點擊下方的【好文要頂】按鈕【精神支持】,因為這兩種支持都是使我繼續寫作、分享的最大動力!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM