在關系型數據庫中 Join 是非常常見的操作,各種優化手段已經到了極致。在海量數據的環境下,不可避免的也會碰到這種類型的需求, 例如在數據分析時需要連接從不同的數據源中獲取到數據。不同於傳統的單機模式,在分布式存儲下采用 MapReduce 編程模型,也有相應的處理措施和優化方法。
我們先簡要地描述待解決的問題。假設有兩個數據集:氣象站數據庫和天氣記錄數據庫,並考慮如何合二為一。一個典型的查詢是:輸出氣象站的歷史信息,同時各行記錄也包含氣象站的元數據信息。
一、Reduce Join
在Reudce端進行連接是MapReduce框架實現join操作最常見的方式,其具體的實現原理如下:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然后用連接字段(兩張表中相同的列)作為key,其余部分和新加的標志作為value,最后進行輸出。
reduce端的主要工作:在reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在map階段已經打標志)分開,最后進行合並就ok了
實現方式一:二次排序
適用場景:其中一個表的連接字段key唯一
思路概述
二次排序的意思在map階段只是對於不同表進行打標簽排序,決定了在reduce階段輸出后兩張中的先后順序;而reduce的作用就是根本map輸出的數據連接兩張表。
代碼實現
自定義TextPair作為兩個文件的Mapper輸出key。

package com.hadoop.reducejoin.test; import org.apache.hadoop.io.WritableComparable; import java.io.*; import org.apache.hadoop.io.*; import com.hadoop.mapreduce.test.IntPair; public class TextPair implements WritableComparable<TextPair> { private Text first;//Text 類型的實例變量 first private Text second;//Text 類型的實例變量 second public TextPair() { set(new Text(),new Text()); } public TextPair(String first, String second) { set(new Text(first),new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } //將對象轉換為字節流並寫入到輸出流out中 public void write(DataOutput out)throws IOException { first.write(out); second.write(out); } //從輸入流in中讀取字節流反序列化為對象 public void readFields(DataInput in)throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() *163+second.hashCode(); } @Override public boolean equals(Object o) { if(o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first +"\t"+ second; } @Override public int compareTo(TextPair o) { // TODO Auto-generated method stub if(!first.equals(o.first)){ return first.compareTo(o.first); } else if(!second.equals(o.second)){ return second.compareTo(o.second); }else{ return 0; } } }

package com.hadoop.reducejoin.test; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * 通過二次排序實現reduce join * 適用場景:其中一個表的連接字段key唯一 */ public class ReduceJoinBySecondarySort extends Configured implements Tool{ // JoinStationMapper 處理來自氣象站數據 public static class JoinStationMapper extends Mapper< LongWritable,Text,TextPair,Text>{ protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split("\\s+");//解析天氣記錄數據 if(arr.length==2){//滿足這種數據格式 //key=氣象站id value=氣象站名稱 context.write(new TextPair(arr[0],"0"),new Text(arr[1])); } } } public static class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>{ protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] arr = line.split("\\s+");//解析天氣記錄數據 if(arr.length==3){ //key=氣象站id value=天氣記錄數據 context.write(new TextPair(arr[0],"1"),new Text(arr[1]+"\t"+arr[2])); } } } public static class KeyPartitioner extends Partitioner< TextPair,Text>{ public int getPartition(TextPair key,Text value,int numPartitions){ // &是位與運算 return (key.getFirst().hashCode()&Integer.MAX_VALUE)% numPartitions; } } public static class GroupingComparator extends WritableComparator{ protected GroupingComparator(){ super(TextPair.class, true); } @Override //Compare two WritableComparables. public int compare(WritableComparable w1, WritableComparable w2){ TextPair ip1 = (TextPair) w1; TextPair ip2 = (TextPair) w2; Text l = ip1.getFirst(); Text r = ip2.getFirst(); return l.compareTo(r); } } public static class JoinReducer extends Reducer< TextPair,Text,Text,Text>{ protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{ Iterator< Text> iter = values.iterator(); Text stationName = new Text(iter.next());//氣象站名稱 while(iter.hasNext()){ Text record = iter.next();//天氣記錄的每條數據 Text outValue = new Text(stationName.toString()+"\t"+record.toString()); context.write(key.getFirst(),outValue); } } } public int run(String[] args) throws Exception{ Configuration conf = new Configuration();// 讀取配置文件 Path mypath = new Path(args[2]); FileSystem hdfs = mypath.getFileSystem(conf);// 創建輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf, "join");// 新建一個任務 job.setJarByClass(ReduceJoinBySecondarySort.class);// 主類 Path recordInputPath = new Path(args[0]);//天氣記錄數據源 Path stationInputPath = new Path(args[1]);//氣象站數據源 Path outputPath = new Path(args[2]);//輸出路徑 // 兩個輸入類型 MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//讀取天氣記錄Mapper MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//讀取氣象站Mapper FileOutputFormat.setOutputPath(job,outputPath); job.setReducerClass(JoinReducer.class);// Reducer job.setPartitionerClass(KeyPartitioner.class);//自定義分區 job.setGroupingComparatorClass(GroupingComparator.class);//自定義分組 job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ String[] args0 = {"hdfs://sparks:9000/middle/reduceJoin/records.txt" ,"hdfs://sparks:9000/middle/reduceJoin/station.txt" ,"hdfs://sparks:9000/middle/reduceJoin/secondSort-out" }; int exitCode = ToolRunner.run(new ReduceJoinBySecondarySort(),args0); System.exit(exitCode); } }
實現方式二:笛卡爾積
適用場景:兩個表的連接字段key都不唯一(包含一對多,多對多的關系)
思路概述
在map階段將來自不同表或文件的key、value打標簽對區別不同的來源,在reduce階段對於來自不同表或文件的相同key的數據分開,然后做笛卡爾積。這樣就實現了連接表。
代碼實現

package com.hadoop.reducejoin.test; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /* * 兩個大表 * 通過笛卡爾積實現 reduce join * 適用場景:兩個表的連接字段key都不唯一(包含一對多,多對多的關系) */ public class ReduceJoinByCartesianProduct { /** 為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。 然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。 */ public static class ReduceJoinByCartesianProductMapper extends Mapper<Object,Text,Text,Text>{ private Text joinKey=new Text(); private Text combineValue=new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String pathName=((FileSplit)context.getInputSplit()).getPath().toString(); //如果數據來自於records,加一個records的標記 if(pathName.endsWith("records.txt")){ String line = value.toString(); String[] valueItems = line.split("\\s+"); //過濾掉臟數據 if(valueItems.length!=3){ return; } joinKey.set(valueItems[0]); combineValue.set("records.txt" + valueItems[1] + "\t" + valueItems[2]); }else if(pathName.endsWith("station.txt")){ //如果數據來自於station,加一個station的標記 String line = value.toString(); String[] valueItems = line.split("\\s+"); //過濾掉臟數據 if(valueItems.length!=2){ return; } joinKey.set(valueItems[0]); combineValue.set("station.txt" + valueItems[1]); } context.write(joinKey,combineValue); } } /* * reduce 端做笛卡爾積 */ public static class ReduceJoinByCartesianProductReducer extends Reducer<Text,Text,Text,Text>{ private List<String> leftTable=new ArrayList<String>(); private List<String> rightTable=new ArrayList<String>(); private Text result=new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //一定要清空數據 leftTable.clear(); rightTable.clear(); //相同key的記錄會分組到一起,我們需要把相同key下來自於不同表的數據分開,然后做笛卡爾積 for(Text value : values){ String val=value.toString(); if(val.startsWith("station.txt")){ leftTable.add(val.replaceFirst("station.txt","")); }else if(val.startsWith("records.txt")){ rightTable.add(val.replaceFirst("records.txt","")); } } //笛卡爾積 for(String leftPart:leftTable){ for(String rightPart:rightTable){ result.set(leftPart+"\t"+rightPart); context.write(key, result); } } } } public static void main(String[] arg0) throws Exception{ Configuration conf = new Configuration(); String[] args = {"hdfs://sparks:9000/middle/reduceJoin/records.txt" ,"hdfs://sparks:9000/middle/reduceJoin/station.txt" ,"hdfs://sparks:9000/middle/reduceJoin/JoinByCartesian-out" }; String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: reducejoin <in> [<in>...] <out>"); System.exit(2); } //輸出路徑 Path mypath = new Path(otherArgs[otherArgs.length - 1]); FileSystem hdfs = mypath.getFileSystem(conf);// 創建輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf, "ReduceJoinByCartesianProduct"); job.setJarByClass(ReduceJoinByCartesianProduct.class); job.setMapperClass(ReduceJoinByCartesianProductMapper.class); job.setReducerClass(ReduceJoinByCartesianProductReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //添加輸入路徑 for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //添加輸出路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
實現方式三:分布式緩存
適用場景:一個大表和一個小表連接
分布式知識點補充
當 MapReduce 處理大型數據集間的 join 操作時,此時如果一個數據集很大而另外一個集合很小,以至於可以分發到集群中的每個節點之中。 這種情況下,我們就用到了 Hadoop 的分布式緩存機制,它能夠在任務運行過程中及時地將文件和存檔復制到任務節點以供使用。為了節約網絡寬帶,在每一個作業中, 各個文件通常只需要復制到一個節點一次。
1、用法
Hadoop 命令行選項中,有三個命令可以實現文件復制分發到任務的各個節點。
1)用戶可以使用 -files 選項指定待分發的文件,文件內包含以逗號隔開的 URL 列表。文件可以存放在本地文件系統、HDFS、或其它 Hadoop 可讀文件系統之中。 如果尚未指定文件系統,則這些文件被默認是本地的。即使默認文件系統並非本地文件系統,這也是成立的。
2)用戶可以使用 -archives 選項向自己的任務中復制存檔文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,這些文件會被解檔到任務節點。
3)用戶可以使用 -libjars 選項把 JAR 文件添加到 mapper 和 reducer 任務的類路徑中。如果作業 JAR 文件並非包含很多庫 JAR 文件,這點會很有用。
2、工作機制
當用戶啟動一個作業,Hadoop 會把由 -files、-archives、和 -libjars 等選項所指定的文件復制到分布式文件系統之中。接着,在任務運行之前, tasktracker 將文件從分布式文件系統復制到本地磁盤(緩存)使任務能夠訪問文件。此時,這些文件就被視為 “本地化” 了。從任務的角度來看, 這些文件就已經在那兒了,它並不關心這些文件是否來自 HDFS 。此外,有 -libjars 指定的文件會在任務啟動前添加到任務的類路徑(classpath)中。
3、分布式緩存 API
由於可以通過 Hadoop 命令行間接使用分布式緩存,大多數應用不需要使用分布式緩存 API。然而,一些應用程序需要用到分布式緩存的更高級的特性,這就需要直接使用 API 了。 API 包括兩部分:將數據放到緩存中的方法,以及從緩存中讀取數據的方法。
1)首先掌握數據放到緩存中的方法,以下列舉 Job 中可將數據放入到緩存中的相關方法:
public void addCacheFile(URI uri); public void addCacheArchive(URI uri);//以上兩組方法將文件或存檔添加到分布式緩存 public void setCacheFiles(URI[] files); public void setCacheArchives(URI[] archives);//以上兩組方法將一次性向分布式緩存中添加一組文件或存檔 public void addFileToClassPath(Path file); public void addArchiveToClassPath(Path archive);//以上兩組方法將文件或存檔添加到 MapReduce 任務的類路徑 public void createSymlink();
在緩存中可以存放兩類對象:文件(files)和存檔(achives)。文件被直接放置在任務節點上,而存檔則會被解檔之后再將具體文件放置在任務節點上。
2)其次掌握在 map 或者 reduce 任務中,使用 API 從緩存中讀取數據。
public Path[] getLocalCacheFiles() throws IOException; public Path[] getLocalCacheArchives() throws IOException; public Path[] getFileClassPaths(); public Path[] getArchiveClassPaths();
我們可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法獲取緩存中的文件或者存檔的引用。 當處理存檔時,將會返回一個包含解檔文件的目的目錄。相應的,用戶可以通過 getFileClassPaths()和getArchivesClassPaths()方法獲取被添加到任務的類路徑下的文件和文檔。
思路概述
小表作為緩存分發至各個節點,在reduce階段,通過讀取緩存中的小表過濾大表中一些不需要的數據和字段。
代碼實現

package com.hadoop.reducejoin.test; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Hashtable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.bloom.Key; /* * 通過分布式緩存實現Reduce Join * 適用場景:其中一個表比較小,能放入內存 */ public class ReduceJoinByDistributedCache extends Configured implements Tool { //直接輸出大表數據records.txt public static class ReduceJoinByDistributedCacheMapper extends Mapper< LongWritable, Text, Text, Text> { private Text combineValue=new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] arr = line.split("\\s+"); if (arr.length == 3) { combineValue.set(arr[1] + "\t" + arr[2]); context.write(new Text(arr[0]), combineValue); } } } //在reduce 端通過緩存文件實現join操作 public static class ReduceJoinByDistributedCacheReducer extends Reducer< Text, Text, Text, Text> { //定義Hashtable存放緩存數據 private Hashtable< String, String> table = new Hashtable< String, String>(); /** * 獲取分布式緩存文件 */ protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br; String infoAddr = null; // 返回緩存文件路徑 Path[] cacheFilesPaths = context.getLocalCacheFiles(); for (Path path : cacheFilesPaths) { String pathStr = path.toString(); br = new BufferedReader(new FileReader(pathStr)); while (null != (infoAddr = br.readLine())) { // 按行讀取並解析氣象站數據 String line = infoAddr.toString(); String[] records = line.split("\\s+"); if (null != records) // key為stationID,value為stationName table.put(records[0], records[1]); } } } public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException { //天氣記錄根據stationId 獲取stationName String stationName = table.get(key.toString()); for (Text value : values) { value.set(stationName + "\t" + value.toString()); context.write(key, value); } } } public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); String[] args = {"hdfs://sparks:9000/middle/reduceJoin/station.txt" ,"hdfs://sparks:9000/middle/reduceJoin/records.txt" ,"hdfs://sparks:9000/middle/reduceJoin/DistributedCache-out" }; String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: cache <in> [<in>...] <out>"); System.exit(2); } //輸出路徑 Path mypath = new Path(otherArgs[otherArgs.length - 1]); FileSystem hdfs = mypath.getFileSystem(conf);// 創建輸出路徑 if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = Job.getInstance(conf, "ReduceJoinByDistributedCache"); //添加緩存文件 job.addCacheFile(new Path(otherArgs[0]).toUri());//station.txt job.setJarByClass(ReduceJoinByDistributedCache.class); job.setMapperClass(ReduceJoinByDistributedCacheMapper.class); job.setReducerClass(ReduceJoinByDistributedCacheReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //添加輸入路徑 for (int i = 1; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //添加輸出路徑 FileOutputFormat.setOutputPath(job, new Path( otherArgs[otherArgs.length - 1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int ec = ToolRunner.run(new Configuration(),new ReduceJoinByDistributedCache(), args); System.exit(ec); } }
Reduce Join的不足
這里主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,是因為整體數據被分割了,每個map task只處理一部分數據而不能夠獲取到所有需要的join字段,因此我們可以充分利用mapreduce框架的特性,讓他按照join key進行分區,將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現了。
這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。