MapReduce實現兩表的Join--原理及python和java代碼實現


用Hive一句話搞定的,可是有時必需要用mapreduce

方法介紹

1. 概述

在傳統數據庫(如:MYSQL)中,JOIN操作是很常見且很耗時的。而在HADOOP中進行JOIN操作。相同常見且耗時,因為Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。
本文首先介紹了Hadoop上通常的JOIN實現方法。然后給出了幾種針對不同輸入數據集的優化方法。

2. 常見的join方法介紹

如果要進行join的數據分別來自File1和File2.

2.1 reduce side join

reduce side join是一種最簡單的join方式,其主要思想例如以下:
在map階段,map函數同一時候讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對。對每條數據打一個標簽(tag),比方:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件里的數據打標簽。
在reduce階段。reduce函數獲取key同樣的來自File1和File2文件的value list。 然后對於同一個key。對File1和File2中的數據進行join(笛卡爾乘積)。

即:reduce階段進行實際的連接操作。


2.2 map side join

之所以存在reduce side join。是由於在map階段不能獲取全部須要的join字段,即:同一個key相應的字段可能位於不同map中。Reduce side join是很低效的,由於shuffle階段要進行大量的傳輸數據。


Map side join是針對下面場景進行的優化:兩個待連接表中。有一個表很大。而還有一個表很小。以至於小表能夠直接存放到內存中。

這樣,我們能夠將小表復制多份。讓每一個map task內存中存在一份(比方存放到hash table中)。然后僅僅掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有同樣的key的記錄,假設有,則連接后輸出就可以。


為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法例如以下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(假設是HDFS上的文件。能夠這樣:hdfs://namenode:9000/home/XXX/file,當中9000是自己配置的NameNodeport號)。JobTracker在作業啟動之前會獲取這個URI列表,並將對應的文件復制到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件文件夾。並使用標准的文件讀寫API讀取對應的文件。

2.3 SemiJoin

SemiJoin,也叫半連接。是從分布式數據庫中借鑒過來的方法。

它的產生動機是:對於reduce side join,跨機器的傳輸數據量很大,這成了join操作的一個瓶頸。假設可以在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。
實現方法非常easy:選取一個小表。如果是File1。將其參與join的key抽取出來,保存到文件File3中。File3文件一般非常小,能夠放到內存中。

在map階段。使用DistributedCache將File3拷貝到各個TaskTracker上,然后將File2中不在File3中的key相應的記錄過濾掉。剩下的reduce階段的工作與reduce side join同樣。
很多其它關於半連接的介紹,可參考:半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

2.4 reduce side join + BloomFilter

在某些情況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候能夠使用BloomFiler以節省空間。
BloomFilter最常見的作用是:推斷某個元素是否在一個集合里面。

它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在false negative,即:假設contains()返回false,則該元素一定不在集合中,但會存在一定的true negative,即:假設contains()返回true。則該元素可能在集合中。


因而可將小表中的key保存到BloomFilter中。在map階段過濾大表。可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄一定不會過濾掉),這沒關系,僅僅只是添加了少量的網絡IO而已。
很多其它關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,默認情況下是依照key進行排序,假設要依照value進行排序怎么辦?即:對於同一個key,reduce函數接收到的value list是依照value排序的。

這樣的應用需求在join操作中非經常見,比方,希望同樣的key中。小表相應的value排在前面。
有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort。主要思想是:在reduce()函數中,將某個key相應的全部value保存下來。然后進行排序。

這樣的方法最大的缺點是:可能會造成out of memory。
對於value-to-key conversion。主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果便是先按key排序。后按value排序的結果。須要注意的是,用戶須要自己實現Paritioner。以便僅僅依照key進行數據划分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法。可用於設置排序group的key值,

reduce-side-join python代碼

hadoop有個工具叫做steaming,可以支持python、shell、C++、PHP等其它不論什么支持標准輸入stdin及標准輸出stdout的語言。其執行原理可以通過和標准java的map-reduce程序對照來說明:

使用原生java語言實現Map-reduce程序
  1. hadoop准備好數據后,將數據傳送給java的map程序
  2. java的map程序將數據處理后,輸出O1
  3. hadoop將O1打散、排序,然后傳給不同的reduce機器
  4. 每一個reduce機器將傳來的數據傳給reduce程序
  5. reduce程序將數據處理,輸出終於數據O2
借助hadoop streaming使用python語言實現Map-reduce程序
  1. hadoop准備好數據后,將數據傳送給java的map程序
  2. java的map程序將數據處理成“鍵/值”對,並傳送給python的map程序
  3. python的map程序將數據處理后,將結果傳回給java的map程序
  4. java的map程序將數據輸出為O1
  5. hadoop將O1打散、排序。然后傳給不同的reduce機器
  6. 每一個reduce機器將傳來的數據處理成“鍵/值”對。並傳送給python的reduce程序
  7. python的reduce程序將數據處理后,將結果返回給java的reduce程序
  8. java的reduce程序將數據處理。輸出終於數據O2

上面紅色表示map的對照,藍色表示reduce的對照,能夠看出streaming程序多了一步中間處理。這樣說來steaming程序的效率和性能應該低於java版的程序。然而python的開發效率、執行性能有時候會大於java。這就是streaming的優勢所在。

hadoop之實現集合join的需求

hadoop是用來做數據分析的,大都是對集合進行操作。因此該過程中將集合join起來使得一個集合能得到還有一個集合相應的信息的需求很常見。

比方下面這個需求。有兩份數據:學生信息(學號,姓名)和學生成績(學號、課程、成績),特點是有個共同的主鍵“學號”,如今須要將兩者結合起來得到數據(學號,姓名,課程,成績),計算公式:

學號。姓名) join (學號,課程,成績)= (學號。姓名。課程。成績)

數據事例1-學生信息:

學號sno 姓名name
01 name1
02 name2
03 name3
04 name4

數據事例2:-學生成績:

學號sno 課程號courseno 成績grade
01 01 80
01 02 90
02 01 82
02 02 95

期待的終於輸出:

學號sno 姓名name 課程courseno 成績grade
01 name1 01 80
01 name1 02 90
02 name2 01 82
02 name2 02 95

實現join的注意點和易踩坑總結

假設你想寫一個完好健壯的map reduce程序。我建議你首先弄清楚輸入數據的格式、輸出數據的格式,然后自己手動構建輸入數據並手動計算出輸出數據,這個過程中你會發現一些敲代碼中須要特別處理的地方:

  1. 實現join的key是哪個。是1個字段還是2個字段,本例中key是sno,1個字段
  2. 每一個集合中key能否夠反復,本例中數據1不可反復,數據2的key能夠反復
  3. 每一個集合中key的相應值能否夠不存在,本例中有學生會沒成績,所以數據2的key能夠為空

第1條會影響到hadoop啟動腳本中key.fields和partition的配置,第2條會影響到map-reduce程序中詳細的代碼實現方式。第3條相同影響代碼編寫方式。

hadoop實現join操作的思路

詳細思路是給每一個數據源加上一個數字標記label。這樣hadoop對其排序后同一個字段的數據排在一起而且依照label排好序了,於是直接將相鄰同樣key的數據合並在一起輸出就得到了結果。

1、 map階段:給表1和表2加標記,事實上就是多輸出一個字段,比方表一加標記為0,表2加標記為2;

2、 partion階段:依據學號key為第一主鍵,標記label為第二主鍵進行排序和分區

3、 reduce階段:因為已經依照第一主鍵、第二主鍵排好了序,將相鄰同樣key數據合並輸出

hadoop使用python實現join的map和reduce代碼

mapper.py的代碼:

reducer的代碼:

使用shell腳本啟動hadoop程序的方法:

能夠自己手工構造輸入輸出數據進行測試。本程序是驗證過的。

很多其它須要注意的地方

hadoop的join操作能夠分為非常多類型,各種類型腳本的編寫有所不同,其分類是依照key字段數目、value字段數目、key是否可反復來划分的。下面是一個個人總結的對比表,表示會影響的地方:

影響類型 影響的范圍
key字段數目 1、啟動腳本中num.key.fields.for.partition的配置2、啟動腳本中stream.num.map.output.key.fields的配置

3、map和reduce腳本中key的獲取

4、map和reduce腳本中每一條數據和上一條數據比較的方法key是否可反復假設數據源1可反復,標記為M。數據源2可反復標記為N,那么join能夠分為:1*1、M*1、M*N類型

1*1類型:reduce中先記錄第一個value。然后在下一條直接合並輸出。

M*1類型:將類型1作為標記小的輸出,然后每次遇見label=1就記錄value,每遇見一次label=2就輸出一次終於結果;

M*N類型:遇見類型1,就用數組記錄value值。遇見label=2就將將記錄的數組值所有連同該行value輸出。value字段數目影響每次label=1時記錄的數據個數,須要將value都記錄下來


reduce-side-join java代碼

數據准備

首先是准備好數據。這個倒已經是一個熟練的過程。所要做的是把演示樣例數據准備好,記住路徑和字段分隔符。
准備好以下兩張表:
(1)m_ys_lab_jointest_a(下面簡稱表A)
建表語句為:
[sql]  view plain  copy
 print ?

  1. create table if not exists m_ys_lab_jointest_a (  
  2.      id bigint,  
  3.      name string  
  4. )  
  5. row format delimited  
  6. fields terminated by '9'  
  7. lines terminated by '10'  
  8. stored as textfile;  
數據:
id     name
1     北京
2     天津
3     河北
4     山西
5     內蒙古
6     遼寧
7     吉林
8     黑龍江

(2)m_ys_lab_jointest_b(下面簡稱表B)
建表語句為:
[sql]  view plain  copy
 print ?
  1. create table if not exists m_ys_lab_jointest_b (  
  2.      id bigint,  
  3.      statyear bigint,  
  4.      num bigint  
  5. )  
  6. row format delimited  
  7. fields terminated by '9'  
  8. lines terminated by '10'  
  9. stored as textfile;  
數據:
id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347

我們的目的是。以id為key做join操作。得到下面表:
m_ys_lab_jointest_ab
id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

計算模型

整個計算過程是:
(1)在map階段。把全部記錄標記成<key, value>的形式,當中key是id,value則依據來源不同取不同的形式:來源於表A的記錄,value的值為"a#"+name。來源於表B的記錄,value的值為"b#"+score。

(2)在reduce階段,先把每一個key下的value列表拆分為分別來自表A和表B的兩部分。分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積。形成一條條終於結果。
例如以下圖所看到的:

代碼

代碼例如以下:
[java]  view plain  copy
 print ?
  1. import java.io.IOException;  
  2. import java.util.HashMap;  
  3. import java.util.Iterator;  
  4. import java.util.Vector;  
  5.   
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.Writable;  
  9. import org.apache.hadoop.mapred.FileSplit;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.MapReduceBase;  
  12. import org.apache.hadoop.mapred.Mapper;  
  13. import org.apache.hadoop.mapred.OutputCollector;  
  14. import org.apache.hadoop.mapred.RecordWriter;  
  15. import org.apache.hadoop.mapred.Reducer;  
  16. import org.apache.hadoop.mapred.Reporter;  
  17.   
  18. /** 
  19.  * MapReduce實現Join操作 
  20.  */  
  21. public class MapRedJoin {  
  22.     public static final String DELIMITER = "\u0009"// 字段分隔符  
  23.       
  24.     // map過程  
  25.     public static class MapClass extends MapReduceBase implements  
  26.             Mapper<LongWritable, Text, Text, Text> {  
  27.                           
  28.         public void configure(JobConf job) {  
  29.             super.configure(job);  
  30.         }  
  31.           
  32.         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,  
  33.                 Reporter reporter) throws IOException, ClassCastException {  
  34.             // 獲取輸入文件的全路徑和名稱  
  35.             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
  36.             // 獲取記錄字符串  
  37.             String line = value.toString();  
  38.             // 拋棄空記錄  
  39.             if (line == null || line.equals("")) return;   
  40.               
  41.             // 處理來自表A的記錄  
  42.             if (filePath.contains("m_ys_lab_jointest_a")) {  
  43.                 String[] values = line.split(DELIMITER); // 按分隔符切割出字段  
  44.                 if (values.length < 2return;  
  45.                   
  46.                 String id = values[0]; // id  
  47.                 String name = values[1]; // name  
  48.                   
  49.                 output.collect(new Text(id), new Text("a#"+name));  
  50.             }  
  51.             // 處理來自表B的記錄  
  52.             else if (filePath.contains("m_ys_lab_jointest_b")) {  
  53.                 String[] values = line.split(DELIMITER); // 按分隔符切割出字段  
  54.                 if (values.length < 3return;  
  55.                   
  56.                 String id = values[0]; // id  
  57.                 String statyear = values[1]; // statyear  
  58.                 String num = values[2]; //num  
  59.                   
  60.                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
  61.             }  
  62.         }  
  63.     }  
  64.       
  65.     // reduce過程  
  66.     public static class Reduce extends MapReduceBase  
  67.             implements Reducer<Text, Text, Text, Text> {  
  68.         public void reduce(Text key, Iterator<Text> values,  
  69.                 OutputCollector<Text, Text> output, Reporter reporter)  
  70.                 throws IOException {  
  71.                       
  72.             Vector<String> vecA = new Vector<String>(); // 存放來自表A的值  
  73.             Vector<String> vecB = new Vector<String>(); // 存放來自表B的值  
  74.               
  75.             while (values.hasNext()) {  
  76.                 String value = values.next().toString();  
  77.                 if (value.startsWith("a#")) {  
  78.                     vecA.add(value.substring(2));  
  79.                 } else if (value.startsWith("b#")) {  
  80.                     vecB.add(value.substring(2));  
  81.                 }  
  82.             }  
  83.               
  84.             int sizeA = vecA.size();  
  85.             int sizeB = vecB.size();  
  86.               
  87.             // 遍歷兩個向量  
  88.             int i, j;  
  89.             for (i = 0; i < sizeA; i ++) {  
  90.                 for (j = 0; j < sizeB; j ++) {  
  91.                     output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  
  92.                 }  
  93.             }     
  94.         }  
  95.     }  
  96.       
  97.     protected void configJob(JobConf conf) {  
  98.         conf.setMapOutputKeyClass(Text.class);  
  99.         conf.setMapOutputValueClass(Text.class);  
  100.         conf.setOutputKeyClass(Text.class);  
  101.         conf.setOutputValueClass(Text.class);  
  102.         conf.setOutputFormat(ReportOutFormat.class);  
  103.     }  
  104. }  

技術細節

以下說一下當中的若干技術細節:
(1)因為輸入數據涉及兩張表。我們須要推斷當前處理的記錄是來自表A還是來自表B。

Reporter類getInputSplit()方法能夠獲取輸入數據的路徑,詳細代碼例如以下:

String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
(2)map的輸出的結果,同id的全部記錄(無論來自表A還是表B)都在同一個key下保存在同一個列表中,在reduce階段須要將其拆開,保存為相當於笛卡爾積的m x n條記錄。

因為事先不知道m、n是多少。這里使用了兩個向量(可增長數組)來分別保存來自表A和表B的記錄,再用一個兩層嵌套循環組織出我們須要的終於結果。

(3)在MapReduce中能夠使用System.out.println()方法輸出,以方便調試。

只是System.out.println()的內容不會在終端顯示,而是輸出到了stdout和stderr這兩個文件里,這兩個文件位於logs/userlogs/attempt_xxx文件夾下。

能夠通過web端的歷史job查看中的“Analyse This Job”來查看stdout和stderr的內容。


全部方法的java代碼(巨長)


從別人那轉來

1、在Reudce端進行連接。
在Reudce端進行連接是MapReduce框架進行表之間join操作最為常見的模式,其詳細的實現原理例如以下:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以差別不同來源的記錄。

然后用連接字段作為key。其余部分和新加的標志作為value。最后進行輸出。

reduce端的主要工作:在reduce端以連接字段作為key的分組已經完畢,我們僅僅須要在每個分組其中將那些來源於不同文件的記錄(在map階段已經打標志)分開,最后進行笛卡爾僅僅就ok了。原理很easy,以下來看一個實例:
(1)自己定義一個value返回類型:
  1. package com.mr.reduceSizeJoin;   
  2. import java.io.DataInput;   
  3. import java.io.DataOutput;   
  4. import java.io.IOException;   
  5. import org.apache.hadoop.io.Text;   
  6. import org.apache.hadoop.io.WritableComparable;   
  7. public class CombineValues implements WritableComparable{   
  8.     //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);   
  9.     private Text joinKey;//鏈接keyword   
  10.     private Text flag;//文件來源標志   
  11.     private Text secondPart;//除了鏈接鍵外的其它部分   
  12.     public void setJoinKey(Text joinKey) {   
  13.         this.joinKey = joinKey;   
  14.     }   
  15.     public void setFlag(Text flag) {   
  16.         this.flag = flag;   
  17.     }   
  18.     public void setSecondPart(Text secondPart) {   
  19.         this.secondPart = secondPart;   
  20.     }   
  21.     public Text getFlag() {   
  22.         return flag;   
  23.     }   
  24.     public Text getSecondPart() {   
  25.         return secondPart;   
  26.     }   
  27.     public Text getJoinKey() {   
  28.         return joinKey;   
  29.     }   
  30.     public CombineValues() {   
  31.         this.joinKey =  new Text();   
  32.         this.flag = new Text();   
  33.         this.secondPart = new Text();   
  34.     }

  35.     @Override 
  36.     public void write(DataOutput out) throws IOException {   
  37.         this.joinKey.write(out);   
  38.         this.flag.write(out);   
  39.         this.secondPart.write(out);   
  40.     }   
  41.     @Override 
  42.     public void readFields(DataInput in) throws IOException {   
  43.         this.joinKey.readFields(in);   
  44.         this.flag.readFields(in);   
  45.         this.secondPart.readFields(in);   
  46.     }   
  47.     @Override 
  48.     public int compareTo(CombineValues o) {   
  49.         return this.joinKey.compareTo(o.getJoinKey());   
  50.     }   
  51.     @Override 
  52.     public String toString() {   
  53.         // TODO Auto-generated method stub   
  54.         return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   
  55.     }   
  56. }


(2)map、reduce主體代碼
  1. package com.mr.reduceSizeJoin;   
  2. import java.io.IOException;   
  3. import java.util.ArrayList;   
  4. import org.apache.hadoop.conf.Configuration;   
  5. import org.apache.hadoop.conf.Configured;   
  6. import org.apache.hadoop.fs.Path;   
  7. import org.apache.hadoop.io.Text;   
  8. import org.apache.hadoop.mapreduce.Job;   
  9. import org.apache.hadoop.mapreduce.Mapper;   
  10. import org.apache.hadoop.mapreduce.Reducer;   
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  16. import org.apache.hadoop.util.Tool;   
  17. import org.apache.hadoop.util.ToolRunner;   
  18. import org.slf4j.Logger;   
  19. import org.slf4j.LoggerFactory;   
  20. /**   
  21. * @author zengzhaozheng   
  22. * 用途說明:   
  23. * reudce side join中的left outer join   
  24. * 左連接。兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段   
  25. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
  26. * tb_dim_city.dat文件內容,分隔符為"|":   
  27. * id     name  orderid  city_code  is_show   
  28. * 0       其它        9999     9999         0   
  29. * 1       長春        1        901          1   
  30. * 2       吉林        2        902          1   
  31. * 3       四平        3        903          1   
  32. * 4       松原        4        904          1   
  33. * 5       通化        5        905          1   
  34. * 6       遼源        6        906          1   
  35. * 7       白城        7        907          1   
  36. * 8       白山        8        908          1   
  37. * 9       延吉        9        909          1   
  38. * -------------------------風騷的切割線-------------------------------   
  39. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
  40. * tb_user_profiles.dat文件內容,分隔符為"|":   
  41. * userID   network     flow    cityID   
  42. * 1           2G       123      1   
  43. * 2           3G       333      2   
  44. * 3           3G       555      1   
  45. * 4           2G       777      3   
  46. * 5           3G       666      4   
  47. *   
  48. * -------------------------風騷的切割線-------------------------------   
  49. *  結果:   
  50. *  1   長春  1   901 1   1   2G  123   
  51. *  1   長春  1   901 1   3   3G  555   
  52. *  2   吉林  2   902 1   2   3G  333   
  53. *  3   四平  3   903 1   4   2G  777   
  54. *  4   松原  4   904 1   5   3G  666   
  55. */ 
  56. public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
  57.     private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   
  58.     public static class LeftOutJoinMapper extends Mapper {   
  59.         private CombineValues combineValues = new CombineValues();   
  60.         private Text flag = new Text();   
  61.         private Text joinKey = new Text();   
  62.         private Text secondPart = new Text();   
  63.         @Override 
  64.         protected void map(Object key, Text value, Context context)   
  65.                 throws IOException, InterruptedException {   
  66.             //獲得文件輸入路徑   
  67.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
  68.             //數據來自tb_dim_city.dat文件,標志即為"0"   
  69.             if(pathName.endsWith("tb_dim_city.dat")){   
  70.                 String[] valueItems = value.toString().split("\\|");   
  71.                 //過濾格式錯誤的記錄   
  72.                 if(valueItems.length != 5){   
  73.                     return;   
  74.                 }   
  75.                 flag.set("0");   
  76.                 joinKey.set(valueItems[0]);   
  77.                 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
  78.                 combineValues.setFlag(flag);   
  79.                 combineValues.setJoinKey(joinKey);   
  80.                 combineValues.setSecondPart(secondPart);   
  81.                 context.write(combineValues.getJoinKey(), combineValues);

  82.                 }//數據來自於tb_user_profiles.dat。標志即為"1"   
  83.             else if(pathName.endsWith("tb_user_profiles.dat")){   
  84.                 String[] valueItems = value.toString().split("\\|");   
  85.                 //過濾格式錯誤的記錄   
  86.                 if(valueItems.length != 4){   
  87.                     return;   
  88.                 }   
  89.                 flag.set("1");   
  90.                 joinKey.set(valueItems[3]);   
  91.                 secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
  92.                 combineValues.setFlag(flag);   
  93.                 combineValues.setJoinKey(joinKey);   
  94.                 combineValues.setSecondPart(secondPart);   
  95.                 context.write(combineValues.getJoinKey(), combineValues);   
  96.             }   
  97.         }   
  98.     }   
  99.     public static class LeftOutJoinReducer extends Reducer {   
  100.         //存儲一個分組中的左表信息   
  101.         private ArrayList leftTable = new ArrayList();   
  102.         //存儲一個分組中的右表信息   
  103.         private ArrayList rightTable = new ArrayList();   
  104.         private Text secondPar = null;   
  105.         private Text output = new Text();   
  106.         /**   
  107.          * 一個分組調用一次reduce函數   
  108.          */ 
  109.         @Override 
  110.         protected void reduce(Text key, Iterable value, Context context)   
  111.                 throws IOException, InterruptedException {   
  112.             leftTable.clear();   
  113.             rightTable.clear();   
  114.             /**   
  115.              * 將分組中的元素依照文件分別進行存放   
  116.              * 這樣的方法要注意的問題:   
  117.              * 假設一個分組內的元素太多的話,可能會導致在reduce階段出現OOM,   
  118.              * 在處理分布式問題之前最好先了解數據的分布情況。依據不同的分布採取最   
  119.              * 適當的處理方法,這樣能夠有效的防止導致OOM和數據過度傾斜問題。   
  120.              */ 
  121.             for(CombineValues cv : value){   
  122.                 secondPar = new Text(cv.getSecondPart().toString());   
  123.                 //左表tb_dim_city   
  124.                 if("0".equals(cv.getFlag().toString().trim())){   
  125.                     leftTable.add(secondPar);   
  126.                 }   
  127.                 //右表tb_user_profiles   
  128.                 else if("1".equals(cv.getFlag().toString().trim())){   
  129.                     rightTable.add(secondPar);   
  130.                 }   
  131.             }   
  132.             logger.info("tb_dim_city:"+leftTable.toString());   
  133.             logger.info("tb_user_profiles:"+rightTable.toString());   
  134.             for(Text leftPart : leftTable){   
  135.                 for(Text rightPart : rightTable){   
  136.                     output.set(leftPart+ "\t" + rightPart);   
  137.                     context.write(key, output);   
  138.                 }   
  139.             }   
  140.         }   
  141.     }   
  142.     @Override 
  143.     public int run(String[] args) throws Exception {   
  144.           Configuration conf=getConf(); //獲得配置文件對象   
  145.             Job job=new Job(conf,"LeftOutJoinMR");   
  146.             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  147.             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
  148.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
  149.             job.setMapperClass(LeftOutJoinMapper.class);   
  150.             job.setReducerClass(LeftOutJoinReducer.class);
  151.             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
  152.             job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格格式

  153.             //設置map的輸出key和value類型   
  154.             job.setMapOutputKeyClass(Text.class);   
  155.             job.setMapOutputValueClass(CombineValues.class);

  156.             //設置reduce的輸出key和value類型   
  157.             job.setOutputKeyClass(Text.class);   
  158.             job.setOutputValueClass(Text.class);   
  159.             job.waitForCompletion(true);   
  160.             return job.isSuccessful()?0:1;   
  161.     }   
  162.     public static void main(String[] args) throws IOException,   
  163.             ClassNotFoundException, InterruptedException {   
  164.         try {   
  165.             int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   
  166.             System.exit(returnCode);   
  167.         } catch (Exception e) {   
  168.             // TODO Auto-generated catch block   
  169.             logger.error(e.getMessage());   
  170.         }   
  171.     }   
  172. }

當中詳細的分析以及數據的輸出輸入請看代碼中的凝視已經寫得比較清楚了,這里主要分析一下reduce join的一些不足。之所以會存在reduce join這樣的方式,我們可以非常明顯的看出原:由於總體數據被切割了。每一個map task僅僅處理一部分數據而不可以獲取到全部須要的join字段,因此我們須要在講join key作為reduce端的分組將全部join key同樣的記錄集中起來進行處理。所以reduce join這樣的方式就出現了。這樣的方式的缺點非常明顯就是會造成map和reduce端也就是shuffle階段出現大量的傳輸數據,效率非常低。

2、在Map端進行連接。

使用場景:一張表十分小、一張表非常大。
使用方法:在提交作業的時候先將小表文件放到該作業的DistributedCache中,然后從DistributeCache中取出該小表進行join key / value解釋切割放到內存中(可以放大Hash Map等等容器中)。

然后掃描大表,看大表中的每條記錄的join key /value值是否可以在內存中找到同樣join key的記錄。假設有則直接輸出結果。

直接上代碼。比較簡單:
  1. package com.mr.mapSideJoin;   
  2. import java.io.BufferedReader;   
  3. import java.io.FileReader;   
  4. import java.io.IOException;   
  5. import java.util.HashMap;   
  6. import org.apache.hadoop.conf.Configuration;   
  7. import org.apache.hadoop.conf.Configured;   
  8. import org.apache.hadoop.filecache.DistributedCache;   
  9. import org.apache.hadoop.fs.Path;   
  10. import org.apache.hadoop.io.Text;   
  11. import org.apache.hadoop.mapreduce.Job;   
  12. import org.apache.hadoop.mapreduce.Mapper;   
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  17. import org.apache.hadoop.util.Tool;   
  18. import org.apache.hadoop.util.ToolRunner;   
  19. import org.slf4j.Logger;   
  20. import org.slf4j.LoggerFactory;   
  21. /**   
  22. * @author zengzhaozheng   
  23. *   
  24. * 用途說明:   
  25. * Map side join中的left outer join   
  26. * 左連接。兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段   
  27. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),   
  28. * 如果tb_dim_city文件記錄數非常少。tb_dim_city.dat文件內容,分隔符為"|":   
  29. * id     name  orderid  city_code  is_show   
  30. * 0       其它        9999     9999         0   
  31. * 1       長春        1        901          1   
  32. * 2       吉林        2        902          1   
  33. * 3       四平        3        903          1   
  34. * 4       松原        4        904          1   
  35. * 5       通化        5        905          1   
  36. * 6       遼源        6        906          1   
  37. * 7       白城        7        907          1   
  38. * 8       白山        8        908          1   
  39. * 9       延吉        9        909          1   
  40. * -------------------------風騷的切割線-------------------------------   
  41. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
  42. * tb_user_profiles.dat文件內容,分隔符為"|":   
  43. * userID   network     flow    cityID   
  44. * 1           2G       123      1   
  45. * 2           3G       333      2   
  46. * 3           3G       555      1   
  47. * 4           2G       777      3   
  48. * 5           3G       666      4   
  49. * -------------------------風騷的切割線-------------------------------   
  50. *  結果:   
  51. *  1   長春  1   901 1   1   2G  123   
  52. *  1   長春  1   901 1   3   3G  555   
  53. *  2   吉林  2   902 1   2   3G  333   
  54. *  3   四平  3   903 1   4   2G  777   
  55. *  4   松原  4   904 1   5   3G  666   
  56. */ 
  57. public class MapSideJoinMain extends Configured implements Tool{   
  58.     private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
  59.     public static class LeftOutJoinMapper extends Mapper {

  60.         private HashMap city_info = new HashMap();   
  61.         private Text outPutKey = new Text();   
  62.         private Text outPutValue = new Text();   
  63.         private String mapInputStr = null;   
  64.         private String mapInputSpit[] = null;   
  65.         private String city_secondPart = null;   
  66.         /**   
  67.          * 此方法在每一個task開始之前運行,這里主要用作從DistributedCache   
  68.          * 中取到tb_dim_city文件。並將里邊記錄取出放到內存中。

       

  69.          */ 
  70.         @Override 
  71.         protected void setup(Context context)   
  72.                 throws IOException, InterruptedException {   
  73.             BufferedReader br = null;   
  74.             //獲得當前作業的DistributedCache相關文件   
  75.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
  76.             String cityInfo = null;   
  77.             for(Path p : distributePaths){   
  78.                 if(p.toString().endsWith("tb_dim_city.dat")){   
  79.                     //讀緩存文件,並放到mem中   
  80.                     br = new BufferedReader(new FileReader(p.toString()));   
  81.                     while(null!=(cityInfo=br.readLine())){   
  82.                         String[] cityPart = cityInfo.split("\\|",5);   
  83.                         if(cityPart.length ==5){   
  84.                             city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
  85.                         }   
  86.                     }   
  87.                 }   
  88.             }   
  89.         }

  90.         /**   
  91.          * Map端的實現相當簡單,直接推斷tb_user_profiles.dat中的   
  92.          * cityID是否存在我的map中就ok了,這樣就能夠實現Map Join了   
  93.          */ 
  94.         @Override 
  95.         protected void map(Object key, Text value, Context context)   
  96.                 throws IOException, InterruptedException {   
  97.             //排掉空行   
  98.             if(value == null || value.toString().equals("")){   
  99.                 return;   
  100.             }   
  101.             mapInputStr = value.toString();   
  102.             mapInputSpit = mapInputStr.split("\\|",4);   
  103.             //過濾非法記錄   
  104.             if(mapInputSpit.length != 4){   
  105.                 return;   
  106.             }   
  107.             //推斷鏈接字段是否在map中存在   
  108.             city_secondPart = city_info.get(mapInputSpit[3]);   
  109.             if(city_secondPart != null){   
  110.                 this.outPutKey.set(mapInputSpit[3]);   
  111.                 this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
  112.                 context.write(outPutKey, outPutValue);   
  113.             }   
  114.         }   
  115.     }   
  116.     @Override 
  117.     public int run(String[] args) throws Exception {   
  118.             Configuration conf=getConf(); //獲得配置文件對象   
  119.             DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//為該job加入緩存文件   
  120.             Job job=new Job(conf,"MapJoinMR");   
  121.             job.setNumReduceTasks(0);

  122.             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
  123.             FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑

  124.             job.setJarByClass(MapSideJoinMain.class);   
  125.             job.setMapperClass(LeftOutJoinMapper.class);

  126.             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
  127.             job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式

  128.             //設置map的輸出key和value類型   
  129.             job.setMapOutputKeyClass(Text.class);

  130.             //設置reduce的輸出key和value類型   
  131.             job.setOutputKeyClass(Text.class);   
  132.             job.setOutputValueClass(Text.class);   
  133.             job.waitForCompletion(true);   
  134.             return job.isSuccessful()?0:1;   
  135.     }   
  136.     public static void main(String[] args) throws IOException,   
  137.             ClassNotFoundException, InterruptedException {   
  138.         try {   
  139.             int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
  140.             System.exit(returnCode);   
  141.         } catch (Exception e) {   
  142.             // TODO Auto-generated catch block   
  143.             logger.error(e.getMessage());   
  144.         }   
  145.     }   
  146. }

這里說說DistributedCache。DistributedCache是分布式緩存的一種實現。它在整個MapReduce框架中起着相當關鍵的數據,他能夠支撐我們寫一些相當復雜高效的分布式程序。說回到這里,JobTracker在作業啟動之前會獲取到DistributedCache的資源uri列表,並將相應的文件分發到各個涉及到該作業的任務的TaskTracker上。另外。關於DistributedCache和作業的關系。比方權限、存儲路徑區分、public和private等屬性。接下來實用再整理研究一下寫一篇blog,這里就不具體說了。

另外另一種比較變態的Map Join方式,就是結合HBase來做Map Join操作。

這樣的方式全然能夠突破內存的控制,使你毫無忌憚的使用Map Join。並且效率也很不錯。


3、SemiJoin。
SemiJoin就是所謂的半連接,事實上細致一看就是reduce join的一個變種。就是在map端過濾掉一些數據,在網絡中僅僅傳輸參與連接的數據不參與連接的數據不必在網絡中進行傳輸,從而降低了shuffle的網絡傳輸量。使總體效率得到提高。其它思想和reduce join是一模一樣的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來通過DistributedCach分發到相關節點,然后將其取出放到內存中(能夠放到HashSet中),在map階段掃描連接表,將join key不在內存HashSet中的記錄過濾掉。讓那些參與join的記錄通過shuffle傳輸到reduce端進行join操作。其它的和reduce join都是一樣的。看代碼:
  1. package com.mr.SemiJoin;   
  2. import java.io.BufferedReader;   
  3. import java.io.FileReader;   
  4. import java.io.IOException;   
  5. import java.util.ArrayList;   
  6. import java.util.HashSet;   
  7. import org.apache.hadoop.conf.Configuration;   
  8. import org.apache.hadoop.conf.Configured;   
  9. import org.apache.hadoop.filecache.DistributedCache;   
  10. import org.apache.hadoop.fs.Path;   
  11. import org.apache.hadoop.io.Text;   
  12. import org.apache.hadoop.mapreduce.Job;   
  13. import org.apache.hadoop.mapreduce.Mapper;   
  14. import org.apache.hadoop.mapreduce.Reducer;   
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  16. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  17. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  20. import org.apache.hadoop.util.Tool;   
  21. import org.apache.hadoop.util.ToolRunner;   
  22. import org.slf4j.Logger;   
  23. import org.slf4j.LoggerFactory;   
  24. /**   
  25. * @author zengzhaozheng   
  26. *   
  27. * 用途說明:   
  28. * reudce side join中的left outer join   
  29. * 左連接。兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段   
  30. * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
  31. * tb_dim_city.dat文件內容,分隔符為"|":   
  32. * id     name  orderid  city_code  is_show   
  33. * 0       其它        9999     9999         0   
  34. * 1       長春        1        901          1   
  35. * 2       吉林        2        902          1   
  36. * 3       四平        3        903          1   
  37. * 4       松原        4        904          1   
  38. * 5       通化        5        905          1   
  39. * 6       遼源        6        906          1   
  40. * 7       白城        7        907          1   
  41. * 8       白山        8        908          1   
  42. * 9       延吉        9        909          1   
  43. * -------------------------風騷的切割線-------------------------------   
  44. * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
  45. * tb_user_profiles.dat文件內容,分隔符為"|":   
  46. * userID   network     flow    cityID   
  47. * 1           2G       123      1   
  48. * 2           3G       333      2   
  49. * 3           3G       555      1   
  50. * 4           2G       777      3   
  51. * 5           3G       666      4   
  52. * -------------------------風騷的切割線-------------------------------   
  53. * joinKey.dat內容:   
  54. * city_code   
  55. * 1   
  56. * 2   
  57. * 3   
  58. * 4   
  59. * -------------------------風騷的切割線-------------------------------   
  60. *  結果:   
  61. *  1   長春  1   901 1   1   2G  123   
  62. *  1   長春  1   901 1   3   3G  555   
  63. *  2   吉林  2   902 1   2   3G  333   
  64. *  3   四平  3   903 1   4   2G  777   
  65. *  4   松原  4   904 1   5   3G  666   
  66. */ 
  67. public class SemiJoin extends Configured implements Tool{   
  68.     private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
  69.     public static class SemiJoinMapper extends Mapper {   
  70.         private CombineValues combineValues = new CombineValues();   
  71.         private HashSet joinKeySet = new HashSet();   
  72.         private Text flag = new Text();   
  73.         private Text joinKey = new Text();   
  74.         private Text secondPart = new Text();   
  75.         /**   
  76.          * 將參加join的key從DistributedCache取出放到內存中,以便在map端將要參加join的key過濾出來。b   
  77.          */ 
  78.         @Override 
  79.         protected void setup(Context context)   
  80.                 throws IOException, InterruptedException {   
  81.             BufferedReader br = null;   
  82.             //獲得當前作業的DistributedCache相關文件   
  83.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
  84.             String joinKeyStr = null;   
  85.             for(Path p : distributePaths){   
  86.                 if(p.toString().endsWith("joinKey.dat")){   
  87.                     //讀緩存文件,並放到mem中   
  88.                     br = new BufferedReader(new FileReader(p.toString()));   
  89.                     while(null!=(joinKeyStr=br.readLine())){   
  90.                         joinKeySet.add(joinKeyStr);   
  91.                     }   
  92.                 }   
  93.             }   
  94.         }   
  95.         @Override 
  96.         protected void map(Object key, Text value, Context context)   
  97.                 throws IOException, InterruptedException {   
  98.             //獲得文件輸入路徑   
  99.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
  100.             //數據來自tb_dim_city.dat文件,標志即為"0"   
  101.             if(pathName.endsWith("tb_dim_city.dat")){   
  102.                 String[] valueItems = value.toString().split("\\|");   
  103.                 //過濾格式錯誤的記錄   
  104.                 if(valueItems.length != 5){   
  105.                     return;   
  106.                 }   
  107.                 //過濾掉不須要參加join的記錄   
  108.                 if(joinKeySet.contains(valueItems[0])){   
  109.                     flag.set("0");   
  110.                     joinKey.set(valueItems[0]);   
  111.                     secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
  112.                     combineValues.setFlag(flag);   
  113.                     combineValues.setJoinKey(joinKey);   
  114.                     combineValues.setSecondPart(secondPart);   
  115.                     context.write(combineValues.getJoinKey(), combineValues);   
  116.                 }else{   
  117.                     return ;   
  118.                 }   
  119.             }//數據來自於tb_user_profiles.dat,標志即為"1"   
  120.             else if(pathName.endsWith("tb_user_profiles.dat")){   
  121.                 String[] valueItems = value.toString().split("\\|");   
  122.                 //過濾格式錯誤的記錄   
  123.                 if(valueItems.length != 4){   
  124.                     return;   
  125.                 }   
  126.                 //過濾掉不須要參加join的記錄   
  127.                 if(joinKeySet.contains(valueItems[3])){   
  128.                     flag.set("1");   
  129.                     joinKey.set(valueItems[3]);   
  130.                     secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
  131.                     combineValues.setFlag(flag);   
  132.                     combineValues.setJoinKey(joinKey);   
  133.                     combineValues.setSecondPart(secondPart);   
  134.                     context.write(combineValues.getJoinKey(), combineValues);   
  135.                 }else{   
  136.                     return ;   
  137.                 }   
  138.             }   
  139.         }   
  140.     }   
  141.     public static class SemiJoinReducer extends Reducer {   
  142.         //存儲一個分組中的左表信息   
  143.         private ArrayList leftTable = new ArrayList();   
  144.         //存儲一個分組中的右表信息   
  145.         private ArrayList rightTable = new ArrayList();   
  146.         private Text secondPar = null;   
  147.         private Text output = new Text();   
  148.         /**   
  149.          * 一個分組調用一次reduce函數   
  150.          */ 
  151.         @Override 
  152.         protected void reduce(Text key, Iterable value, Context context)   
  153.                 throws IOException, InterruptedException {   
  154.             leftTable.clear();   
  155.             rightTable.clear();   
  156.             /**   
  157.              * 將分組中的元素依照文件分別進行存放   
  158.              * 這樣的方法要注意的問題:   
  159.              * 假設一個分組內的元素太多的話,可能會導致在reduce階段出現OOM。   
  160.              * 在處理分布式問題之前最好先了解數據的分布情況,依據不同的分布採取最   
  161.              * 適當的處理方法,這樣能夠有效的防止導致OOM和數據過度傾斜問題。   
  162.              */ 
  163.             for(CombineValues cv : value){   
  164.                 secondPar = new Text(cv.getSecondPart().toString());   
  165.                 //左表tb_dim_city   
  166.                 if("0".equals(cv.getFlag().toString().trim())){   
  167.                     leftTable.add(secondPar);   
  168.                 }   
  169.                 //右表tb_user_profiles   
  170.                 else if("1".equals(cv.getFlag().toString().trim())){   
  171.                     rightTable.add(secondPar);   
  172.                 }   
  173.             }   
  174.             logger.info("tb_dim_city:"+leftTable.toString());   
  175.             logger.info("tb_user_profiles:"+rightTable.toString());   
  176.             for(Text leftPart : leftTable){   
  177.                 for(Text rightPart : rightTable){   
  178.                     output.set(leftPart+ "\t" + rightPart);   
  179.                     context.write(key, output);   
  180.                 }   
  181.             }   
  182.         }   
  183.     }   
  184.     @Override 
  185.     public int run(String[] args) throws Exception {   
  186.             Configuration conf=getConf(); //獲得配置文件對象   
  187.             DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
  188.             Job job=new Job(conf,"LeftOutJoinMR");   
  189.             job.setJarByClass(SemiJoin.class);

  190.             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑   
  191.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑

  192.             job.setMapperClass(SemiJoinMapper.class);   
  193.             job.setReducerClass(SemiJoinReducer.class);

  194.             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式   
  195.             job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式

  196.             //設置map的輸出key和value類型   
  197.             job.setMapOutputKeyClass(Text.class);   
  198.             job.setMapOutputValueClass(CombineValues.class);

  199.             //設置reduce的輸出key和value類型   
  200.             job.setOutputKeyClass(Text.class);   
  201.             job.setOutputValueClass(Text.class);   
  202.             job.waitForCompletion(true);   
  203.             return job.isSuccessful()?0:1;   
  204.     }   
  205.     public static void main(String[] args) throws IOException,   
  206.             ClassNotFoundException, InterruptedException {   
  207.         try {   
  208.             int returnCode =  ToolRunner.run(new SemiJoin(),args);   
  209.             System.exit(returnCode);   
  210.         } catch (Exception e) {   
  211.             logger.error(e.getMessage());   
  212.         }   
  213.     }   
  214. }

這里還說說SemiJoin也是有一定的適用范圍的。其抽取出來進行join的key是要放到內存中的,所以不可以太大。easy在Map端造成OOM。

總結
blog介紹了三種join方式。這三種join方式適用於不同的場景。其處理效率上的相差還是蠻大的。當中主要導致因素是網絡傳輸。

Map join效率最高。其次是SemiJoin。最低的是reduce join。另外,寫分布式大數據處理程序的時最好要對總體要處理的數據分布情況作一個了解,這能夠提高我們代碼的效率,使數據的傾斜度降到最低,使我們的代碼傾向性更好。



      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM