Mapreduce中的join操作


一、背景

    MapReduce提供了表連接操作其中包括Map端join、Reduce端join還有半連接,現在我們要討論的是Map端join,Map端join是指數據到達map處理函數之前進行合並的,效率要遠遠高於Reduce端join,因為Reduce端join是把所有的數據都經過Shuffle,非常消耗資源。

二、具體join

   1、join的例子

    比如我們有兩個文件,分別存儲 訂單信息:products.txt,和 商品信息:orders.txt ,詳細數據如下:

    • products.txt:
      //商品ID,商品名稱,商品類型(數字表示,我們假設有一個數字和具體類型的映射)
      p0001,xiaomi,001
      p0002,chuizi,001

       

    • orders.txt:
      //訂單號,時間,商品id,購買數量 
      1001,20170710,p0001,1 
      1002,20170710,p0001,3 
      1003,20170710,p0001,3 
      1004,20170710,p0002,1

      我們想象有多個商品,並有海量的訂單信息,並且存儲在多個 HDFS 塊中。

      xiaomi,7
      chuizi,1

      該怎么處理? 我們分析上面我們想要的結果,商品名稱和銷量,這兩個屬性分別存放到不同的文件中,那我們就要考慮 在一個地方(mapper)讀取這兩個文件的數據,並把數據在一個地方(reducer)進行結合。這就是 MapReduce 中的 Join 了。

    • 代碼如下:
      • Mapper:
        public class joinMapper extends Mapper<LongWritable,Text,Text,Text> {
        
            private Text outKey=new Text();
            private Text outValue=new Text();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] split = line.split(",");
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                String name = inputSplit.getPath().getName();
                //兩個文件 在一個 mapper 中處理
                //通過文件名判斷是那種數據
                if(name.startsWith("a")){
                    //取商品ID 作為 輸出key 和 商品名稱 作為 輸出value,即 第0、1 的數據
                    outKey.set(split[0]);
                    outValue.set("product#" + split[1]);
                    context.write(outKey, outValue);
                }else{
                    //取商品ID 作為 輸出key 和 購買數量 作為 輸出value,即 第2、3 的數據
                    outKey.set(split[2]);
                    outValue.set("order#" + split[3]);
                    context.write(outKey, outValue);
                }
            }
        }

         

      • Reducer

        public class joinReducer extends Reducer<Text,Text,Text,Text> {
            private Text outValue = new Text();
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //用來存放:商品ID、商品名稱
                List<String> productsList = new ArrayList<String>();
                //用來存放:商品ID、購買數量
                List<Integer> ordersList = new ArrayList<Integer>();
        
                for (Text text:values){
                    String value = text.toString();
                    if(value.startsWith("product#")) {
                        productsList.add(value.split("#")[1]); //取出 商品名稱
                    } else if(value.startsWith("order#")){
                        ordersList.add(Integer.parseInt(text.toString().split("#")[1].trim())); //取出商品的銷量
                    }
                }
                int totalOrders = 0;
                for (int i=0; i < productsList.size(); i++) {
                    System.out.println(productsList.size());
        
                    for (int j=0; j < ordersList.size(); j++) {
                        System.out.println(ordersList.size());
                        totalOrders += ordersList.get(j);
                    }
                    outValue.set(productsList.get(i) + "\t" + totalOrders );
                    //最后的輸出是:商品ID、商品名稱、購買數量
                    context.write(key, outValue);
                }
        
            }
        }

         

      • App:

        public class App  {
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", "file:///");
        
                Path path = new Path("F:\\mr\\join\\out");
                FileSystem fileSystem = path.getFileSystem(conf);
                if(fileSystem.isDirectory(path)){
                    fileSystem.delete(path,true);
                }
                Job job = Job.getInstance(conf);
                //設置job的各種屬性
                job.setJobName("App");                        //作業名稱
                job.setJarByClass(App.class);                 //搜索類
                job.setInputFormatClass(TextInputFormat.class); //設置輸入格式
        
                job.setMapperClass(joinMapper.class);
                job.setReducerClass(joinReducer.class);
                //添加輸入路徑
                FileInputFormat.addInputPath(job,new Path("F:\\mr\\join\\map"));
                //設置輸出路徑
                FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\join\\out"));
                //map輸出類型
                job.setOutputKeyClass(Text.class);           //
                job.setOutputValueClass(Text.class);        //
                job.waitForCompletion(true);
        
            }
        }

         

      • 輸出結果

        p0001    xiaomi    7
        p0002    chuizi    1

         

                

2、 Map Join   

    一個數據集很大,另一個數據集很小(能夠被完全放進內存中),MAPJION會把小表全部讀入內存中,把小表拷貝多份分發到大表數據所在實例上的內存里,在map階段直接 拿另 外一個表的數據和內存中表數據做匹配,由於在map是進行了join操作,省去了reduce運行的效率會高很多;

    適用於關聯表中有小表的情形;可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join並輸出最終結果,可以大大提高join操作的並發度,加快處理速度。並用distributedcache機制將小表的數據分發到每一個maptask執行節點,從而每一個maptask節點可以從本地加載到小表的數據,進而在本地即可實現join

    • left outer join的左表必須是大表    
    • right outer join的右表必須是大表
    • inner join左表或右表均可以作為大表
    • full outer join不能使用mapjoin;
    •  mapjoin支持小表為子查詢,使用mapjoin時需要引用小表或是子查詢時,需要引用別名;在mapjoin中,可以使用不等值連接或者使用or連接多個條件;    

  1.2、 Map Join事例

      • product表

        p0001,xiaomi,001
        p0002,chuizi,001

         

      • orders表

        1001,20170710,p0001,1
        1002,20170710,p0001,3
        1003,20170710,p0001,3
        1004,20170710,p0002,1

         

      • 期望輸出

        xiaomi 1001,20170710,p0001,1
        xiaomi 1002,20170710,p0001,3
        xiaomi 1003,20170710,p0001,3
        chuizi 1004,20170710,p0002,1

         

      • 代碼實現
        • Mapper
          /**
           * 鏈接操作  map端鏈接
           */
          public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
          
              private Map<String,String> pdInfoMap =new HashMap<String,String>();
              private Text keyOut=new Text();
              /**
               * 通過閱讀父類Mapper的源碼,發現 setup方法是在maptask處理數據之前調用一次 可以用來做一些初始化工作
               */
              @Override
              protected void setup(Context context) {
                  try {
                      Configuration conf = context.getConfiguration();
                      FileSystem fs= null;
                      fs = FileSystem.get(conf);
                      FSDataInputStream fis = fs.open(new Path("file:/F:/mr/join/map/input/a.txt"));
                      //得到緩沖區閱讀器
                      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
                      String line=null;
                      while((line=br.readLine())!=null){
                          String[] fields = line.split(",");
                          pdInfoMap.put(fields[0],fields[1]);
                      }
                      fis.close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
              // 由於已經持有完整的產品信息表,所以在map方法中就能實現join邏輯了
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  //訂單信息
                  String orderline = value.toString();
                  String[] fields = orderline.split(",");
                  String pName = pdInfoMap.get(fields[2]);
                  keyOut.set(pName+"\t"+orderline);
                  context.write(keyOut,NullWritable.get());
              }
          }

           

        • App
          public class MapJoinApp {
              public static void main(String[] args) throws Exception {
                  Configuration conf = new Configuration();
                  conf.set("fs.defaultFS", "file:///");
                  Job job = Job.getInstance(conf);
                  //設置job的各種屬性
                  job.setJobName("MapJoinApp");                        //作業名稱
                  job.setJarByClass(MapJoinApp.class);                 //搜索類
                  //添加輸入路徑
                  FileInputFormat.addInputPath(job,new Path("F:/mr/join/map/input/b.txt"));
                  //設置輸出路徑
                  FileOutputFormat.setOutputPath(job,new Path("F:/mr/join/map/output"));
                  job.setMapperClass(MapJoinMapper.class);             //mapper類
                  //沒有reduce
                  job.setNumReduceTasks(0);
                  job.setMapOutputKeyClass(Text.class);           //
                  job.setMapOutputValueClass(NullWritable.class);  //
          
                  job.waitForCompletion(true);
              }
          }

           

        • 輸出和期望輸出一致

3、Reduce端Join

    • Reduce端連接比Map端連接更為普遍,因為輸入的數據不需要特定的結構,但是效率比較低,因為所有數據都必須經過Shuffle過程。
    • 基本思路:
      1. Map端讀取所有的文件,並在輸出的內容里加上標示,代表數據是從哪個文件里來的。
      2. 在reduce處理函數中,按照標識對數據進行處理
      3. 然后根據Key去join來求出結果直接輸出。
    • 例子
      • 數據如上
      • 計算過程:
        • 在Map階段,把所有數據標記成<key,value>的形式,其中key是id,value則根據來源不同取不同的形式:來源於products表的記錄,value的值為"products#"+name;來源於orders的記錄,value的值為"orders#"+score。
        • 在reduce階段,先把每個key下的value列表拆分為分別來自表A和表B的兩部分,分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積,形成一條條最終的結果。
      • 代碼如下:
        • Mapper
          /**
           * map階段打標記
           */
          public class reduceMapper extends Mapper<LongWritable,Text,Text,Text> {
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  String line = value.toString();
                  String[] fields = line.split(",");
          FileSplit fileSplit
          = (FileSplit)context.getInputSplit(); String pathName = fileSplit.getPath().toString(); pathName=pathName.substring(27); //通過文件名判斷是那種數據 if (pathName.startsWith("a")){//product數據 //System.out.println(keyOut+"\t"+valueOut); context.write(new Text(fields[0]),new Text("product#"+fields[1])); }else if (pathName.startsWith("b")){ context.write(new Text(fields[2]),new Text("order#"+fields[0]+"\t"+fields[1]+"\t"+fields[3])); } } }

           

        • Reducer
          public class reduceReducer extends Reducer<Text,Text,Text,Text> {
              @Override
              protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                  //存放產品信息
                  List<String> proInfo = new ArrayList<String>();
                  //存放訂單信息
                  List<String> ordInfo = new ArrayList<String>();
                  for (Text text:values){
                      System.out.println("key="+key+"  value="+text);
                      //將數組中的數據添加到對應的數組中去
                      if (text.toString().startsWith("product")){
                          proInfo.add(text.toString().split("#")[1]);
                      }else if(text.toString().startsWith("order")){
                          ordInfo.add(text.toString().split("#")[1]);
                      }
                  }
                  //獲取兩個數組的大小
                  int sizePro = proInfo.size();
                  int sizeOrd = ordInfo.size();
                  //遍歷兩個數組將結果寫出去
                  for (int i=0;i<sizePro;i++){
                      for (int j=0;j<sizeOrd;j++){
                          context.write(key,new Text(proInfo.get(i)+" "+ordInfo.get(j)));
                      }
                  }
              }
          }

           

        • App
          public class ReduceApp {
              public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                  Configuration conf = new Configuration();
                  conf.set("fs.defaultFS", "file:///");
                  Job job = Job.getInstance(conf);
          
                  Path path = new Path("F:\\mr\\join\\map/output1");
                  FileSystem fileSystem = path.getFileSystem(conf);
                  if(fileSystem.isDirectory(path)){
                      fileSystem.delete(path,true);
                  }
          
                  //設置job的各種屬性
                  job.setJobName("ReduceApp");                        //作業名稱
                  job.setJarByClass(ReduceApp.class);                 //搜索類
                  //添加輸入路徑
                  FileInputFormat.addInputPath(job,new Path("F:\\mr\\join\\map\\input"));
                  //設置輸出路徑
                  FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\join\\map/output1"));
          
                  job.setMapperClass(reduceMapper.class);             //mapper類
                  job.setReducerClass(reduceReducer.class);         //reducer類
          
                  job.setMapOutputKeyClass(Text.class);           //
                  job.setMapOutputValueClass(Text.class);  //
          
                  job.waitForCompletion(true);
              }
          }

           

        • 輸出結果
          p0001    xiaomi 1003    20170710    3 
          p0001    xiaomi 1002    20170710    3 
          p0001    xiaomi 1001    20170710    1 
          p0002    chuizi 1004    20170710    1

           

 

 

         細節:

 

      • 當map讀取源文件時,如何區分出是file1還是file2 
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String path =  fileSplit.getPath().toString();

             根據path就可以知道文件的來源咯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM