一起學Hadoop——實現兩張表之間的連接操作


---恢復內容開始---

之前我們都是學習使用MapReduce處理一張表的數據(一個文件可視為一張表,hive和關系型數據庫Mysql、Oracle等都是將數據存儲在文件中)。但是我們經常會遇到處理多張表的場景,不同的數據存儲在不同的文件中,因此Hadoop也提供了類似傳統關系型數據庫的join操作。Hadoop生態組件的高級框架Hive、Pig等也都實現了join連接操作,編寫類似SQL的語句,就可以在MapReduce中運行,底層的實現也是基於MapReduce。本文介紹如何使用MapReduce實現join操作,為以后學習hive打下基礎。

1、Map端連。
數據在進入到map函數之前就進行連接操作。適用場景:一個文件比較大,一個文件比較小,小到可以加載到內存中。如果兩個都是大文件,就會出現OOM內存溢出的異常。實現Map端連接操作需要用到Job類的addCacheFile()方法將小文件分發到各個計算節點,然后加載到節點的內存中。

下面通過一個例子來實現Map端join連接操作:
1、雇員employee表數據如下:
name gender age dept_no
Tom male 30 1
Tony male 35 2
Lily female 28 1
Lucy female 32 3

2、部門表dept數據如下:
dept_no dept_name
1 TSD
2 MCD
3 PPD

代碼實現如下:

  1 package join;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.conf.Configured;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Reducer;
  8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 import org.apache.hadoop.util.Tool;
 12 import org.apache.hadoop.io.*;
 13 import org.apache.hadoop.util.ToolRunner;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 
 16 import java.io.BufferedReader;
 17 import java.io.FileReader;
 18 import java.io.IOException;
 19 import java.net.URI;
 20 import java.util.HashMap;
 21 import java.util.Map;
 22 import org.apache.hadoop.fs.Path;
 23 
 24 public class MapJoin extends Configured implements Tool {
 25 
 26     public static class MapJoinMapper extends Mapper<LongWritable, Text, Text,NullWritable> {
 27         private Map<Integer, String> deptData = new HashMap<Integer, String>();
 28 
 29         @Override
 30         protected void setup(Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws IOException, InterruptedException {
 31             super.setup(context);
 32             //從緩存的中讀取文件。
 33             Path[] files = context.getLocalCacheFiles();
 34 //            Path file1path = new Path(files[0]);
 35             BufferedReader reader = new BufferedReader(new FileReader(files[0].toString()));
 36             String str = null;
 37             try {
 38                 // 一行一行讀取
 39                 while ((str = reader.readLine()) != null) {
 40                     // 對緩存中的數據以" "分隔符進行分隔。
 41                     String[] splits = str.split(" ");
 42                     // 把需要的數據放在Map中。注意不能操作Map的大小,否則會出現OOM的異常
 43                     deptData.put(Integer.parseInt(splits[0]), splits[1]);
 44                 }
 45             } catch (Exception e) {
 46                 e.printStackTrace();
 47             } finally{
 48                 reader.close();
 49             }
 50         }
 51 
 52         @Override
 53         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws IOException,
 54                 InterruptedException {
 55             // 獲取從HDFS中加載的表
 56             String[] values = value.toString().split(" ");
 57             // 獲取關聯字段depNo,這個字段是關鍵
 58             int depNo = Integer.parseInt(values[3]);
 59             // 根據deptNo從內存中的關聯表中獲取要關聯的屬性depName
 60             String depName = deptData.get(depNo);
 61             String resultData = value.toString() + " " + depName;
 62             // 將數據通過context寫入到Reduce中。
 63             context.write(new Text(resultData),NullWritable.get());
 64         }
 65     }
 66 
 67     public static class MapJoinReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
 68         public void reduce(Text key, Iterable<NullWritable> values,Context context)throws IOException,InterruptedException{
 69             context.write(key,NullWritable.get());
 70         }
 71     }
 72 
 73     @Override
 74     public int run(String[] args) throws Exception {
 75         Configuration conf = new Configuration();
 76         Job job = Job.getInstance(conf, "Total Sort app");
 77         //將小表加載到緩存中。
 78         job.addCacheFile(new URI(args[0]));
 79         job.setJarByClass(MapJoinMapper.class);
 80         //1.1 設置輸入目錄和設置輸入數據格式化的類
 81         FileInputFormat.setInputPaths(job,new Path(args[1]));
 82         job.setInputFormatClass(TextInputFormat.class);
 83 
 84         //1.2 設置自定義Mapper類和設置map函數輸出數據的key和value的類型
 85         job.setMapperClass(MapJoinMapper.class);
 86         job.setMapOutputKeyClass(Text.class);
 87         job.setMapOutputValueClass(NullWritable.class);
 88 
 89         //1.3 設置reduce數量
 90         job.setNumReduceTasks(1);
 91         //設置實現了reduce函數的類
 92         job.setReducerClass(MapJoinReducer.class);
 93 
 94         //設置reduce函數的key值
 95         job.setOutputKeyClass(Text.class);
 96         //設置reduce函數的value值
 97         job.setOutputValueClass(NullWritable.class);
 98 
 99         // 判斷輸出路徑是否存在,如果存在,則刪除
100         Path mypath = new Path(args[2]);
101         FileSystem hdfs = mypath.getFileSystem(conf);
102         if (hdfs.isDirectory(mypath)) {
103             hdfs.delete(mypath, true);
104         }
105 
106         FileOutputFormat.setOutputPath(job, new Path(args[2]));
107 
108         return job.waitForCompletion(true) ? 0 : 1;
109     }
110 
111     public static void main(String[] args)throws Exception{
112 
113         int exitCode = ToolRunner.run(new MapJoin(), args);
114         System.exit(exitCode);
115     }
116 }

執行腳本文件如下::

1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar MapJoin.jar \
2 hdfs://hadoop-master:8020/data/dept.txt \
3 hdfs://hadoop-master:8020/data/employee.txt \
4 hdfs://hadoop-master:8020/mapjoin_output

運行結果:

Lily female 28 1 TSD
Lucy female 32 3 PPD
Tom male 30 1 TSD
Tony male 35 2 MCD

 

2、Reduce端連接(Reduce side join)。
數據在Reduce進程中執行連接操作。實現思路:在Map進程中對來自不同表的數據打上標簽,例如來自表employee的數據打上a標簽,來自文件dept表的數據打上b標簽。然后在Reduce進程,對同一個key,來自不同表的數據進行笛卡爾積操作。請看下圖,我們對表employee和表dept的dept_no字段進行關聯,將dept_no字段當做key。

在MapReduce中,key相同的數據會放在一起,因此我們只需在reduce函數中判斷數據是來自哪張表,來自相同表的數據不進行join。

代碼如下:

  1 public class ReduceJoin extends Configured implements Tool {
  2     public static class JoinMapper extends
  3             Mapper<LongWritable,Text,Text,Text> {
  4         String employeeValue = "";
  5         protected void map(LongWritable key, Text value, Context context)
  6                 throws IOException,InterruptedException {
  7             /*
  8              * 根據命令行傳入的文件名,判斷數據來自哪個文件,來自employee的數據打上a標簽,來自dept的數據打上b標簽
  9              */
 10             String filepath = ((FileSplit)context.getInputSplit()).getPath().toString();
 11             String line = value.toString();
 12             if (line == null || line.equals("")) return;
 13 
 14             if (filepath.indexOf("employee") != -1) {
 15                 String[] lines = line.split(" ");
 16                 if(lines.length < 4) return;
 17 
 18                 String deptNo = lines[3];
 19                 employeeValue = line + " a";
 20                 context.write(new Text(deptNo),new Text(employeeValue));
 21             }
 22 
 23             else if(filepath.indexOf("dept") != -1) {
 24                 String[] lines = line.split(" ");
 25                 if(lines.length < 2) return;
 26                 String deptNo = lines[0];
 27                 context.write(new Text(deptNo), new Text(line + " b"));
 28             }
 29         }
 30     }
 31 
 32     public static class JoinReducer extends
 33             Reducer<Text, Text, Text, NullWritable> {
 34         protected void reduce(Text key, Iterable<Text> values,
 35                               Context context) throws IOException, InterruptedException{
 36             List<String[]> lista = new ArrayList<String[]>();
 37             List<String[]> listb = new ArrayList<String[]>();
 38 
 39             for(Text val:values) {
 40                 String[] str = val.toString().split(" ");
 41                 //最后一位是標簽位,因此根據最后一位判斷數據來自哪個文件,標簽為a的數據放在lista中,標簽為b的數據放在listb中
 42                 String flag = str[str.length -1];
 43                 if("a".equals(flag)) {
 44                     //String valueA = str[0] + " " + str[1] + " " + str[2];
 45                     lista.add(str);
 46                 } else if("b".equals(flag)) {
 47                     //String valueB = str[0] + " " + str[1];
 48                     listb.add(str);
 49                 }
 50             }
 51 
 52             for (int i = 0; i < lista.size(); i++) {
 53                 if (listb.size() == 0) {
 54                     continue;
 55                 } else {
 56                     String[] stra = lista.get(i);
 57                     for (int j = 0; j < listb.size(); j++) {
 58                         String[] strb = listb.get(j);
 59                         String keyValue = stra[0] + " " + stra[1] + " " + stra[2] + " " + stra[3] + " " + strb[1];
 60                         context.write(new Text(keyValue), NullWritable.get());
 61                     }
 62                 }
 63             }
 64         }
 65     }
 66 
 67     @Override
 68     public int run(String[] args) throws Exception {
 69         Configuration conf = getConf();
 70         GenericOptionsParser optionparser = new GenericOptionsParser(conf, args);
 71         conf = optionparser.getConfiguration();
 72         Job job = Job.getInstance(conf, "Reduce side join");
 73         job.setJarByClass(ReduceJoin.class);
 74         //1.1 設置輸入目錄和設置輸入數據格式化的類
 75         //FileInputFormat.setInputPaths(job,new Path(args[0]));
 76         FileInputFormat.addInputPaths(job, conf.get("input_data"));
 77 
 78         job.setInputFormatClass(TextInputFormat.class);
 79 
 80         //1.2 設置自定義Mapper類和設置map函數輸出數據的key和value的類型
 81         job.setMapperClass(JoinMapper.class);
 82         job.setMapOutputKeyClass(Text.class);
 83         job.setMapOutputValueClass(Text.class);
 84 
 85         //1.3 設置reduce數量
 86         job.setNumReduceTasks(1);
 87         //設置實現了reduce函數的類
 88         job.setReducerClass(JoinReducer.class);
 89 
 90         //設置reduce函數的key值
 91         job.setOutputKeyClass(Text.class);
 92         //設置reduce函數的value值
 93         job.setOutputValueClass(NullWritable.class);
 94 
 95         // 判斷輸出路徑是否存在,如果存在,則刪除
 96         Path output_dir = new Path(conf.get("output_dir"));
 97         FileSystem hdfs = output_dir.getFileSystem(conf);
 98         if (hdfs.isDirectory(output_dir)) {
 99             hdfs.delete(output_dir, true);
100         }
101 
102         FileOutputFormat.setOutputPath(job, output_dir);
103 
104         return job.waitForCompletion(true) ? 0 : 1;
105     }
106 
107     public static void main(String[] args)throws Exception{
108         int exitCode = ToolRunner.run(new ReduceJoin(), args);
109         System.exit(exitCode);
110     }
111 }

執行MapReduce的shell腳本如下:

1 /usr/local/src/hadoop-2.6.1/bin/hadoop jar ReduceJoin.jar \
2 -Dinput_data=hdfs://hadoop-master:8020/data/dept.txt,hdfs://hadoop-master:8020/data/employee.txt \
3 -Doutput_dir=hdfs://hadoop-master:8020/reducejoin_output

總結:
1、Map side join的運行速度比Reduce side join快,因為Reduce side join在shuffle階段會消耗大量的資源。Map side join由於把小表放在內存中,所以執行效率很高。
2、當有一張表的數據很小時,小到可以加載到內存中,那么建議使用Map side join。

 

歡迎關注本人公眾號了解更多關於大數據方面的知識:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM