Mapjoin和Reducejoin案例


一、Mapjoin案例

  1.需求:有兩個文件,分別是訂單表、商品表,

  訂單表有三個屬性分別為訂單時間、商品id、訂單id(表示內容量大的表),

  商品表有兩個屬性分別為商品id、商品名稱(表示內容量小的表,用於加載到內存),

  要求結果文件為在訂單表中的每一行最后添加商品id對應的商品名稱。

  2.解決思路:

  將商品表加載到內存中,然后再map方法中將訂單表中的商品id對應的商品名稱添加到該行的最后,不需要Reducer,並在Driver執行類中設置setCacheFile和numReduceTask。

  3.代碼如下:

public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	HashMap<String, String> pdMap = new HashMap<>();
	//1.商品表加載到內存
	protected void setup(Context context) throws IOException {
		
		//加載緩存文件
		BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8"));
		
		String line;
		
		while(StringUtils.isNotEmpty(line = br.readLine()) ) {
			
			//切分
			String[] fields = line.split("\t");
			
			//緩存
			pdMap.put(fields[0], fields[1]);
			
		}
		
		br.close();
	
	}
		
		
		
	//2.map傳輸
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
			throws IOException, InterruptedException {
		//獲取數據
		String line = value.toString();
		
		//切割
		String[] fields = line.split("\t");
		
		//獲取訂單中商品id
		String pid = fields[1];
		
		//根據訂單商品id獲取商品名
		String pName = pdMap.get(pid);
		
		//拼接數據
		line = line + "\t" + pName;
		
		//輸出
		context.write(new Text(line), NullWritable.get());
	}
}

public class CacheDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		// 1.獲取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2.獲取jar包
		job.setJarByClass(CacheDriver.class);

		// 3.獲取自定義的mapper與reducer類
		job.setMapperClass(CacheMapper.class);

		// 5.設置reduce輸出的數據類型(最終的數據類型)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 6.設置輸入存在的路徑與處理后的結果路徑
		FileInputFormat.setInputPaths(job, new Path("c://table1029//in"));
		FileOutputFormat.setOutputPath(job, new Path("c://table1029//out"));
		
		//加載緩存商品數據
		job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));
		
		//設置一下reducetask的數量
		job.setNumReduceTasks(0);

		// 7.提交任務
		boolean rs = job.waitForCompletion(true);
		System.out.println(rs ? 0 : 1);
	}
}

  

二、Reducejoin案例

  1.需求:同上的兩個數據文件,要求將訂單表中的商品id替換成對應的商品名稱。

  2.解決思路:封裝TableBean類,包含屬性:時間、商品id、訂單id、商品名稱、flag(flag用來判斷是哪張表),

    使用Mapper讀兩張表,通過context對象獲取切片對象,然后通過切片獲取切片名稱和路徑的字符串來判斷是哪張表,再將切片的數據封裝到TableBean對象,最后以產品id為key、TableBean對象為value傳輸到Reducer端;

    Reducer接收數據后通過flag判斷是哪張表,因為一個reduce中的所有數據的key是相同的,將商品表的商品id和商品名稱讀入到一個TableBean對象中,然后將訂單表的中的數據讀入到TableBean類型的ArrayList對象中,然后將ArrayList中的每個TableBean的商品id替換為商品名稱,然后遍歷該數組以TableBean為key輸出。

  3.代碼如下:

/**
 * @author: PrincessHug
 * @date: 2019/3/30, 2:37
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class TableBean implements Writable {
    private String timeStamp;
    private String productId;
    private String orderId;
    private String productName;
    private String flag;

    public TableBean() {
    }

    public String getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(String timeStamp) {
        this.timeStamp = timeStamp;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(timeStamp);
        out.writeUTF(productId);
        out.writeUTF(orderId);
        out.writeUTF(productName);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        timeStamp = in.readUTF();
        productId = in.readUTF();
        orderId = in.readUTF();
        productName = in.readUTF();
        flag = in.readUTF();
    }

    @Override
    public String toString() {
        return timeStamp + "\t" + productName + "\t" + orderId;
    }
}


public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //通過切片獲取文件信息
        FileSplit split = (FileSplit) context.getInputSplit();
        String name = split.getPath().getName();

        //獲取一行數據、定義TableBean對象
        String line = value.toString();
        TableBean tb = new TableBean();
        Text t = new Text();

        //判斷是哪一張表
        if (name.contains("order.txt")){
            String[] fields = line.split("\t");
            tb.setTimeStamp(fields[0]);
            tb.setProductId(fields[1]);
            tb.setOrderId(fields[2]);
            tb.setProductName("");
            tb.setFlag("0");
            t.set(fields[1]);
        }else {
            String[] fields = line.split("\t");
            tb.setTimeStamp("");
            tb.setProductId(fields[0]);
            tb.setOrderId("");
            tb.setProductName(fields[1]);
            tb.setFlag("1");
            t.set(fields[0]);
        }
        context.write(t,tb);
    }
}

public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        //分別創建用來存儲訂單表和產品表的集合
        ArrayList<TableBean> orderBean = new ArrayList<>();
        TableBean productBean = new TableBean();

        //遍歷values,通過flag判斷是產品表還是訂單表
        for (TableBean v:values){
            if (v.getFlag().equals("0")){
                TableBean tableBean = new TableBean();
                try {
                    BeanUtils.copyProperties(tableBean,v);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBean.add(tableBean);
            }else {
                try {
                    BeanUtils.copyProperties(productBean,v);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        //拼接表
        for (TableBean ob:orderBean) {
            ob.setProductName(productBean.getProductName());
            context.write(ob,NullWritable.get());
        }
    }
}

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //jar包
        job.setJarByClass(TableDriver.class);

        //Mapper、Reducer
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        //Mapper輸出數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        //Reducer輸出數據類型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        //輸入輸出路徑
        FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\reducejoin\\in"));
        FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\reducejoin\\out"));

        //提交任務
        if (job.waitForCompletion(true)){
            System.out.println("運行完成!");
        }else {
            System.out.println("運行失敗!");
        }
    }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM