要解決什么問題?
解決的都是同一個問題,即將兩張“表‘進行join操作。更廣義地來講,就是不同數據源數據的合並問題。
reduce join是在map階段完成數據的標記,在reduce階段完成數據的合並
map join是直接在map階段完成數據的合並,沒有reduce階段
比如有如下問題:
這是訂單表。
這是商品表。
現在需要將商品表中的商品名稱填充到訂單表中。得到如下的聯合表:
Reduce Join
map:
將輸入數據統一封裝為一個Bean,此Bean包含了商品表和訂單表的所有公共和非公共屬性,相當於進行了全外連接,並新增加一個屬性——文件名稱,以區分數據是來自與商品表還是訂單表,便於在reduce階段數據的 處理;map輸出的key是pid,value是bean
shuffle:
根據pid對bean進行排序,所有pid相同的數據都會被聚合到同一個key下,發往同一個reducetask
reduce:
對同一個pid下所有的bean,首先要區分出它們是來自於哪個表,是商品表還是訂單表。如果是商品表,數據只有一條,保存其中的pname屬性;如果是訂單表,數據有多條,用保存的pname屬性替換pid屬性,並輸出
map join
沒有reduce過程,所有的工作都在map階段完成,極大減少了網絡傳輸和io的代價。如何實現:
上述的join過程可以看作外表與內表的連接過程,外表是訂單表,外表大,內表是商品表,內表小。所以可以把內表事先緩存於各個maptask結點,然后等到外表的數據傳輸過來以后,直接用外表的數據連接內表的數據並輸出即可
1)在driver中設置加載緩存文件,這樣每個maptask就可以獲取到該文件;設置reducetask個數為0,去除reduce階段
2)在setup方法中讀取緩存文件,並將結果以kv的形式存入hashmap,k是pid,v是pname
3)在map方法中,根據pid,通過hashmap找到pname,替換pid,寫出結果;
完整的示例代碼如下:
mapper
package mapjoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private Map<String,String> produs = new HashMap<>(); Text text = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //獲取緩存文件的輸入流 URI[] uris=context.getCacheFiles(); String path=uris[0].getPath().toString(); BufferedReader br=new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8")); //將小表中的數據,封裝進HashMap中 String line = null; while(StringUtils.isNotEmpty((line=br.readLine()))) { String[] fields = line.split("\t"); produs.put(fields[0], fields[1]); } } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //這里的記錄,只來源於order表,也就是大表,直接切割並封裝 String[] fields = value.toString().split("\t"); String pname = produs.get(fields[1]); pname = (pname==null)?"":pname; String s = fields[0]+"\t"+pname+"\t"+fields[2]; text.set(s); context.write(text, NullWritable.get()); } }
driver
package mapjoin; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // TODO Auto-generated method stub // 0 根據自己電腦路徑重新配置 args = new String[]{"e:/input/table/order.txt", "e:/output/table2"}; // 1 獲取job信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 設置加載jar包路徑 job.setJarByClass(MapJoinDriver.class); // 3 關聯map job.setMapperClass(MapJoinMapper.class); // 4 設置最終輸出數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 加載緩存數據 job.addCacheFile(new URI("file:///e:/input/inputcache/pro.txt")); // 7 Map端Join的邏輯不需要Reduce階段,設置reduceTask數量為0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }