map join 與 reduce join


要解決什么問題?

解決的都是同一個問題,即將兩張“表‘進行join操作。更廣義地來講,就是不同數據源數據的合並問題。

reduce join是在map階段完成數據的標記,在reduce階段完成數據的合並

map join是直接在map階段完成數據的合並,沒有reduce階段

 

比如有如下問題:

 

 

 這是訂單表。

 

 

 這是商品表。

現在需要將商品表中的商品名稱填充到訂單表中。得到如下的聯合表:

 

 

Reduce Join

 

map:

將輸入數據統一封裝為一個Bean,此Bean包含了商品表和訂單表的所有公共和非公共屬性,相當於進行了全外連接,並新增加一個屬性——文件名稱,以區分數據是來自與商品表還是訂單表,便於在reduce階段數據的 處理;map輸出的key是pid,value是bean

shuffle:

根據pid對bean進行排序,所有pid相同的數據都會被聚合到同一個key下,發往同一個reducetask

reduce:

對同一個pid下所有的bean,首先要區分出它們是來自於哪個表,是商品表還是訂單表。如果是商品表,數據只有一條,保存其中的pname屬性;如果是訂單表,數據有多條,用保存的pname屬性替換pid屬性,並輸出

 

map join

沒有reduce過程,所有的工作都在map階段完成,極大減少了網絡傳輸和io的代價。如何實現:

上述的join過程可以看作外表與內表的連接過程,外表是訂單表,外表大,內表是商品表,內表小。所以可以把內表事先緩存於各個maptask結點,然后等到外表的數據傳輸過來以后,直接用外表的數據連接內表的數據並輸出即可

 

1)在driver中設置加載緩存文件,這樣每個maptask就可以獲取到該文件;設置reducetask個數為0,去除reduce階段

2)在setup方法中讀取緩存文件,並將結果以kv的形式存入hashmap,k是pid,v是pname

3)在map方法中,根據pid,通過hashmap找到pname,替換pid,寫出結果;

完整的示例代碼如下:

 

mapper

package mapjoin;

import java.io.BufferedReader;


import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private  Map<String,String> produs = new HashMap<>();
    Text text = new Text();

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {

        //獲取緩存文件的輸入流
        URI[] uris=context.getCacheFiles();
        String path=uris[0].getPath().toString();
        BufferedReader br=new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));

        //將小表中的數據,封裝進HashMap中
        String line = null;
        while(StringUtils.isNotEmpty((line=br.readLine()))) {
            String[] fields = line.split("\t");
            produs.put(fields[0], fields[1]);
        }
    }

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {

        //這里的記錄,只來源於order表,也就是大表,直接切割並封裝
        String[] fields = value.toString().split("\t");
        String pname = produs.get(fields[1]);
        pname = (pname==null)?"":pname;

        String s = fields[0]+"\t"+pname+"\t"+fields[2];
        text.set(s);
        context.write(text, NullWritable.get());
    }
}

driver

package mapjoin;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapJoinDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // TODO Auto-generated method stub

        // 0 根據自己電腦路徑重新配置
        args = new String[]{"e:/input/table/order.txt", "e:/output/table2"};

        // 1 獲取job信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 設置加載jar包路徑
        job.setJarByClass(MapJoinDriver.class);

        // 3 關聯map
        job.setMapperClass(MapJoinMapper.class);

        // 4 設置最終輸出數據類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 5 設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 加載緩存數據
        job.addCacheFile(new URI("file:///e:/input/inputcache/pro.txt"));

        // 7 Map端Join的邏輯不需要Reduce階段,設置reduceTask數量為0
        job.setNumReduceTasks(0);

        // 8 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM