用Hive一句話搞定的,可是有時必需要用mapreduce
方法介紹
1. 概述
在傳統數據庫(如:MYSQL)中,JOIN操作是很常見且很耗時的。而在HADOOP中進行JOIN操作。相同常見且耗時,因為Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。本文首先介紹了Hadoop上通常的JOIN實現方法。然后給出了幾種針對不同輸入數據集的優化方法。
2. 常見的join方法介紹
如果要進行join的數據分別來自File1和File2.2.1 reduce side join
reduce side join是一種最簡單的join方式,其主要思想例如以下:在map階段,map函數同一時候讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對。對每條數據打一個標簽(tag),比方:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件里的數據打標簽。
在reduce階段。reduce函數獲取key同樣的來自File1和File2文件的value list。 然后對於同一個key。對File1和File2中的數據進行join(笛卡爾乘積)。
即:reduce階段進行實際的連接操作。
2.2 map side join
之所以存在reduce side join。是由於在map階段不能獲取全部須要的join字段,即:同一個key相應的字段可能位於不同map中。Reduce side join是很低效的,由於shuffle階段要進行大量的傳輸數據。
Map side join是針對下面場景進行的優化:兩個待連接表中。有一個表很大。而還有一個表很小。以至於小表能夠直接存放到內存中。
這樣,我們能夠將小表復制多份。讓每一個map task內存中存在一份(比方存放到hash table中)。然后僅僅掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有同樣的key的記錄,假設有,則連接后輸出就可以。
為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法例如以下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(假設是HDFS上的文件。能夠這樣:hdfs://namenode:9000/home/XXX/file,當中9000是自己配置的NameNodeport號)。JobTracker在作業啟動之前會獲取這個URI列表,並將對應的文件復制到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件文件夾。並使用標准的文件讀寫API讀取對應的文件。
2.3 SemiJoin
SemiJoin,也叫半連接。是從分布式數據庫中借鑒過來的方法。它的產生動機是:對於reduce side join,跨機器的傳輸數據量很大,這成了join操作的一個瓶頸。假設可以在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。
實現方法非常easy:選取一個小表。如果是File1。將其參與join的key抽取出來,保存到文件File3中。File3文件一般非常小,能夠放到內存中。
在map階段。使用DistributedCache將File3拷貝到各個TaskTracker上,然后將File2中不在File3中的key相應的記錄過濾掉。剩下的reduce階段的工作與reduce side join同樣。
很多其它關於半連接的介紹,可參考:半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
2.4 reduce side join + BloomFilter
在某些情況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候能夠使用BloomFiler以節省空間。BloomFilter最常見的作用是:推斷某個元素是否在一個集合里面。
它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在false negative,即:假設contains()返回false,則該元素一定不在集合中,但會存在一定的true negative,即:假設contains()返回true。則該元素可能在集合中。
因而可將小表中的key保存到BloomFilter中。在map階段過濾大表。可能有一些不在小表中的記錄沒有過濾掉(可是在小表中的記錄一定不會過濾掉),這沒關系,僅僅只是添加了少量的網絡IO而已。
很多其它關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500
3. 二次排序
在Hadoop中,默認情況下是依照key進行排序,假設要依照value進行排序怎么辦?即:對於同一個key,reduce函數接收到的value list是依照value排序的。這樣的應用需求在join操作中非經常見,比方,希望同樣的key中。小表相應的value排在前面。
有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort。主要思想是:在reduce()函數中,將某個key相應的全部value保存下來。然后進行排序。
這樣的方法最大的缺點是:可能會造成out of memory。
對於value-to-key conversion。主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果便是先按key排序。后按value排序的結果。須要注意的是,用戶須要自己實現Paritioner。以便僅僅依照key進行數據划分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法。可用於設置排序group的key值,
reduce-side-join python代碼
hadoop有個工具叫做steaming,可以支持python、shell、C++、PHP等其它不論什么支持標准輸入stdin及標准輸出stdout的語言。其執行原理可以通過和標准java的map-reduce程序對照來說明:
使用原生java語言實現Map-reduce程序
- hadoop准備好數據后,將數據傳送給java的map程序
- java的map程序將數據處理后,輸出O1
- hadoop將O1打散、排序,然后傳給不同的reduce機器
- 每一個reduce機器將傳來的數據傳給reduce程序
- reduce程序將數據處理,輸出終於數據O2
借助hadoop streaming使用python語言實現Map-reduce程序
- hadoop准備好數據后,將數據傳送給java的map程序
- java的map程序將數據處理成“鍵/值”對,並傳送給python的map程序
- python的map程序將數據處理后,將結果傳回給java的map程序
- java的map程序將數據輸出為O1
- hadoop將O1打散、排序。然后傳給不同的reduce機器
- 每一個reduce機器將傳來的數據處理成“鍵/值”對。並傳送給python的reduce程序
- python的reduce程序將數據處理后,將結果返回給java的reduce程序
- java的reduce程序將數據處理。輸出終於數據O2
上面紅色表示map的對照,藍色表示reduce的對照,能夠看出streaming程序多了一步中間處理。這樣說來steaming程序的效率和性能應該低於java版的程序。然而python的開發效率、執行性能有時候會大於java。這就是streaming的優勢所在。
hadoop之實現集合join的需求
hadoop是用來做數據分析的,大都是對集合進行操作。因此該過程中將集合join起來使得一個集合能得到還有一個集合相應的信息的需求很常見。
比方下面這個需求。有兩份數據:學生信息(學號,姓名)和學生成績(學號、課程、成績),特點是有個共同的主鍵“學號”,如今須要將兩者結合起來得到數據(學號,姓名,課程,成績),計算公式:
(學號。姓名) join (學號,課程,成績)= (學號。姓名。課程。成績)
數據事例1-學生信息:
學號sno | 姓名name |
01 | name1 |
02 | name2 |
03 | name3 |
04 | name4 |
數據事例2:-學生成績:
學號sno | 課程號courseno | 成績grade |
01 | 01 | 80 |
01 | 02 | 90 |
02 | 01 | 82 |
02 | 02 | 95 |
期待的終於輸出:
學號sno | 姓名name | 課程courseno | 成績grade |
01 | name1 | 01 | 80 |
01 | name1 | 02 | 90 |
02 | name2 | 01 | 82 |
02 | name2 | 02 | 95 |
實現join的注意點和易踩坑總結
假設你想寫一個完好健壯的map reduce程序。我建議你首先弄清楚輸入數據的格式、輸出數據的格式,然后自己手動構建輸入數據並手動計算出輸出數據,這個過程中你會發現一些敲代碼中須要特別處理的地方:
- 實現join的key是哪個。是1個字段還是2個字段,本例中key是sno,1個字段
- 每一個集合中key能否夠反復,本例中數據1不可反復,數據2的key能夠反復
- 每一個集合中key的相應值能否夠不存在,本例中有學生會沒成績,所以數據2的key能夠為空
第1條會影響到hadoop啟動腳本中key.fields和partition的配置,第2條會影響到map-reduce程序中詳細的代碼實現方式。第3條相同影響代碼編寫方式。
hadoop實現join操作的思路
詳細思路是給每一個數據源加上一個數字標記label。這樣hadoop對其排序后同一個字段的數據排在一起而且依照label排好序了,於是直接將相鄰同樣key的數據合並在一起輸出就得到了結果。
1、 map階段:給表1和表2加標記,事實上就是多輸出一個字段,比方表一加標記為0,表2加標記為2;
2、 partion階段:依據學號key為第一主鍵,標記label為第二主鍵進行排序和分區
3、 reduce階段:因為已經依照第一主鍵、第二主鍵排好了序,將相鄰同樣key數據合並輸出
hadoop使用python實現join的map和reduce代碼
mapper.py的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# -*- coding: utf-8 -*-
#Mapper.py
import
os
import
sys
#mapper腳本
def
mapper
(
)
:
#獲取當前正在處理的文件的名字。這里我們有兩個輸入文件
#所以要加以區分
filepath
=
os
.
environ
[
"map_input_file"
]
filename
=
os
.
path
.
split
(
filepath
)
[
-
1
]
for
line
in
sys
.
stdin
:
if
line
.
strip
(
)
==
""
:
continue
fields
=
line
[
:
-
1
]
.
split
(
"\t"
)
sno
=
fields
[
0
]
#以下推斷filename的目的是不同的文件有不同的字段。而且需加上不同的標記
if
filename
==
'data_info'
:
name
=
fields
[
1
]
#以下的數字'0'就是為數據源1加上的統一標記
print
'\t'
.
join
(
(
sno
,
'0'
,
name
)
)
elif
filename
==
'data_grade'
:
courseno
=
fields
[
1
]
grade
=
fields
[
2
]
#以下的數字'1'就是為數據源1加上的統一標記
print
'\t'
.
join
(
(
sno
,
'1'
,
courseno
,
grade
)
)
if
__name__
==
'__main__'
:
mapper
(
)
|
reducer的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# -*- coding: utf-8 -*-
#reducer.py
import
sys
def
reducer
(
)
:
#為了記錄和上一個記錄的差別,用lastsno記錄上個sno
lastsno
=
""
for
line
in
sys
.
stdin
:
if
line
.
strip
(
)
==
""
:
continue
fields
=
line
[
:
-
1
]
.
split
(
"\t"
)
sno
=
fields
[
0
]
''
'
處理思路:
遇見當前key與上一條key不同而且label=0,就記錄下來name值,
當前key與上一條key同樣而且label==1,則將本條數據的courseno、
grade聯通上一條記錄的name一起輸出成終於結果
'
''
if
sno
!=
lastsno
:
name
=
""
#這里沒有推斷label==1的情況,
#由於sno!=lastno,而且label=1表示該條key沒有數據源1的數據
if
fields
[
1
]
==
"0"
:
name
=
fields
[
2
]
elif
sno
==
lastno
:
#這里沒有推斷label==0的情況,
#由於sno==lastno而且label==0表示該條key沒有數據源2的數據
if
fields
[
2
]
==
"1"
:
courseno
=
fields
[
2
]
grade
=
fields
[
3
]
if
name
:
print
'\t'
.
join
(
(
lastsno
,
name
,
courseno
,
grade
)
)
lastsno
=
sno
if
__name__
==
'__main__'
:
reducer
(
)
|
使用shell腳本啟動hadoop程序的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#先刪除輸出文件夾
~
/
hadoop
-
client
/
hadoop
/
bin
/
hadoop
fs
-
rmr
/
hdfs
/
jointest
/
output
#注意,以下配置中的環境值每一個人機器不一樣
~
/
hadoop
-
client
/
hadoop
/
bin
/
hadoop
streaming
\
-
D
mapred
.
map
.
tasks
=
10
\
-
D
mapred
.
reduce
.
tasks
=
5
\
-
D
mapred
.
job
.
map
.
capacity
=
10
\
-
D
mapred
.
job
.
reduce
.
capacity
=
5
\
-
D
mapred
.
job
.
name
=
"join--sno_name-sno_courseno_grade"
\
-
D
num
.
key
.
fields
.
for
.
partition
=
1
\
-
D
stream
.
num
.
map
.
output
.
key
.
fields
=
2
\
-
partitioner
org
.
apache
.
hadoop
.
mapred
.
lib
.
KeyFieldBasedPartitioner
\
-
input
"/hdfs/jointest/input/*"
\
-
output
"/hdfs/jointest/output"
\
-
mapper
"python26/bin/python26.sh mapper.py"
\
-
reducer
"python26/bin/python26.sh reducer.py"
\
-
file
"mapper.py"
\
-
file
"reducer.py"
\
-
cacheArchive
"/share/python26.tar.gz#python26"
#看看執行成功沒。若輸出0則表示成功了
echo
$
?
|
能夠自己手工構造輸入輸出數據進行測試。本程序是驗證過的。
很多其它須要注意的地方
hadoop的join操作能夠分為非常多類型,各種類型腳本的編寫有所不同,其分類是依照key字段數目、value字段數目、key是否可反復來划分的。下面是一個個人總結的對比表,表示會影響的地方:
影響類型 | 影響的范圍 |
key字段數目 | 1、啟動腳本中num.key.fields.for.partition的配置2、啟動腳本中stream.num.map.output.key.fields的配置 |
3、map和reduce腳本中key的獲取
4、map和reduce腳本中每一條數據和上一條數據比較的方法key是否可反復假設數據源1可反復,標記為M。數據源2可反復標記為N,那么join能夠分為:1*1、M*1、M*N類型
1*1類型:reduce中先記錄第一個value。然后在下一條直接合並輸出。
M*1類型:將類型1作為標記小的輸出,然后每次遇見label=1就記錄value,每遇見一次label=2就輸出一次終於結果;
M*N類型:遇見類型1,就用數組記錄value值。遇見label=2就將將記錄的數組值所有連同該行value輸出。value字段數目影響每次label=1時記錄的數據個數,須要將value都記錄下來
reduce-side-join java代碼
數據准備
id name 1 北京 2 天津 3 河北 4 山西 5 內蒙古 6 遼寧 7 吉林 8 黑龍江 |
id statyear num 1 2010 1962 1 2011 2019 2 2010 1299 2 2011 1355 4 2010 3574 4 2011 3593 9 2010 2303 9 2011 2347 |
我們的目的是。以id為key做join操作。得到下面表:
id name statyear num 1 北京 2011 2019 1 北京 2010 1962 2 天津 2011 1355 2 天津 2010 1299 4 山西 2011 3593 4 山西 2010 3574 |

技術細節
Reporter類getInputSplit()方法能夠獲取輸入數據的路徑,詳細代碼例如以下:
因為事先不知道m、n是多少。這里使用了兩個向量(可增長數組)來分別保存來自表A和表B的記錄,再用一個兩層嵌套循環組織出我們須要的終於結果。
只是System.out.println()的內容不會在終端顯示,而是輸出到了stdout和stderr這兩個文件里,這兩個文件位於logs/userlogs/attempt_xxx文件夾下。
能夠通過web端的歷史job查看中的“Analyse This Job”來查看stdout和stderr的內容。
全部方法的java代碼(巨長)
從別人那轉來
然后用連接字段作為key。其余部分和新加的標志作為value。最后進行輸出。
- package com.mr.reduceSizeJoin;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- public class CombineValues implements WritableComparable{
- //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
- private Text joinKey;//鏈接keyword
- private Text flag;//文件來源標志
- private Text secondPart;//除了鏈接鍵外的其它部分
- public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey;
- }
- public void setFlag(Text flag) {
- this.flag = flag;
- }
- public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart;
- }
- public Text getFlag() {
- return flag;
- }
- public Text getSecondPart() {
- return secondPart;
- }
- public Text getJoinKey() {
- return joinKey;
- }
- public CombineValues() {
- this.joinKey = new Text();
- this.flag = new Text();
- this.secondPart = new Text();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- this.joinKey.write(out);
- this.flag.write(out);
- this.secondPart.write(out);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.joinKey.readFields(in);
- this.flag.readFields(in);
- this.secondPart.readFields(in);
- }
- @Override
- public int compareTo(CombineValues o) {
- return this.joinKey.compareTo(o.getJoinKey());
- }
- @Override
- public String toString() {
- // TODO Auto-generated method stub
- return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
- }
- }
- package com.mr.reduceSizeJoin;
- import java.io.IOException;
- import java.util.ArrayList;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- * 用途說明:
- * reudce side join中的left outer join
- * 左連接。兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
- * tb_dim_city.dat文件內容,分隔符為"|":
- * id name orderid city_code is_show
- * 0 其它 9999 9999 0
- * 1 長春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 遼源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------風騷的切割線-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件內容,分隔符為"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- *
- * -------------------------風騷的切割線-------------------------------
- * 結果:
- * 1 長春 1 901 1 1 2G 123
- * 1 長春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
- public static class LeftOutJoinMapper extends Mapper {
- private CombineValues combineValues = new CombineValues();
- private Text flag = new Text();
- private Text joinKey = new Text();
- private Text secondPart = new Text();
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //獲得文件輸入路徑
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- //數據來自tb_dim_city.dat文件,標志即為"0"
- if(pathName.endsWith("tb_dim_city.dat")){
- String[] valueItems = value.toString().split("\\|");
- //過濾格式錯誤的記錄
- if(valueItems.length != 5){
- return;
- }
- flag.set("0");
- joinKey.set(valueItems[0]);
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
-
- }//數據來自於tb_user_profiles.dat。標志即為"1"
- else if(pathName.endsWith("tb_user_profiles.dat")){
- String[] valueItems = value.toString().split("\\|");
- //過濾格式錯誤的記錄
- if(valueItems.length != 4){
- return;
- }
- flag.set("1");
- joinKey.set(valueItems[3]);
- secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }
- }
- }
- public static class LeftOutJoinReducer extends Reducer {
- //存儲一個分組中的左表信息
- private ArrayList leftTable = new ArrayList();
- //存儲一個分組中的右表信息
- private ArrayList rightTable = new ArrayList();
- private Text secondPar = null;
- private Text output = new Text();
- /**
- * 一個分組調用一次reduce函數
- */
- @Override
- protected void reduce(Text key, Iterable value, Context context)
- throws IOException, InterruptedException {
- leftTable.clear();
- rightTable.clear();
- /**
- * 將分組中的元素依照文件分別進行存放
- * 這樣的方法要注意的問題:
- * 假設一個分組內的元素太多的話,可能會導致在reduce階段出現OOM,
- * 在處理分布式問題之前最好先了解數據的分布情況。依據不同的分布採取最
- * 適當的處理方法,這樣能夠有效的防止導致OOM和數據過度傾斜問題。
- */
- for(CombineValues cv : value){
- secondPar = new Text(cv.getSecondPart().toString());
- //左表tb_dim_city
- if("0".equals(cv.getFlag().toString().trim())){
- leftTable.add(secondPar);
- }
- //右表tb_user_profiles
- else if("1".equals(cv.getFlag().toString().trim())){
- rightTable.add(secondPar);
- }
- }
- logger.info("tb_dim_city:"+leftTable.toString());
- logger.info("tb_user_profiles:"+rightTable.toString());
- for(Text leftPart : leftTable){
- for(Text rightPart : rightTable){
- output.set(leftPart+ "\t" + rightPart);
- context.write(key, output);
- }
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //獲得配置文件對象
- Job job=new Job(conf,"LeftOutJoinMR");
- job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
- FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
- job.setMapperClass(LeftOutJoinMapper.class);
- job.setReducerClass(LeftOutJoinReducer.class);
- job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格格式
-
- //設置map的輸出key和value類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineValues.class);
-
- //設置reduce的輸出key和value類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage());
- }
- }
- }
2、在Map端進行連接。
然后掃描大表,看大表中的每條記錄的join key /value值是否可以在內存中找到同樣join key的記錄。假設有則直接輸出結果。
- package com.mr.mapSideJoin;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.HashMap;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- *
- * 用途說明:
- * Map side join中的left outer join
- * 左連接。兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
- * 如果tb_dim_city文件記錄數非常少。tb_dim_city.dat文件內容,分隔符為"|":
- * id name orderid city_code is_show
- * 0 其它 9999 9999 0
- * 1 長春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 遼源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------風騷的切割線-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件內容,分隔符為"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- * -------------------------風騷的切割線-------------------------------
- * 結果:
- * 1 長春 1 901 1 1 2G 123
- * 1 長春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class MapSideJoinMain extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
- public static class LeftOutJoinMapper extends Mapper {
-
- private HashMap city_info = new HashMap();
- private Text outPutKey = new Text();
- private Text outPutValue = new Text();
- private String mapInputStr = null;
- private String mapInputSpit[] = null;
- private String city_secondPart = null;
- /**
- * 此方法在每一個task開始之前運行,這里主要用作從DistributedCache
- * 中取到tb_dim_city文件。並將里邊記錄取出放到內存中。
- */
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- BufferedReader br = null;
- //獲得當前作業的DistributedCache相關文件
- Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- String cityInfo = null;
- for(Path p : distributePaths){
- if(p.toString().endsWith("tb_dim_city.dat")){
- //讀緩存文件,並放到mem中
- br = new BufferedReader(new FileReader(p.toString()));
- while(null!=(cityInfo=br.readLine())){
- String[] cityPart = cityInfo.split("\\|",5);
- if(cityPart.length ==5){
- city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
- }
- }
- }
- }
- }
-
- /**
- * Map端的實現相當簡單,直接推斷tb_user_profiles.dat中的
- * cityID是否存在我的map中就ok了,這樣就能夠實現Map Join了
- */
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //排掉空行
- if(value == null || value.toString().equals("")){
- return;
- }
- mapInputStr = value.toString();
- mapInputSpit = mapInputStr.split("\\|",4);
- //過濾非法記錄
- if(mapInputSpit.length != 4){
- return;
- }
- //推斷鏈接字段是否在map中存在
- city_secondPart = city_info.get(mapInputSpit[3]);
- if(city_secondPart != null){
- this.outPutKey.set(mapInputSpit[3]);
- this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
- context.write(outPutKey, outPutValue);
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //獲得配置文件對象
- DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//為該job加入緩存文件
- Job job=new Job(conf,"MapJoinMR");
- job.setNumReduceTasks(0);
-
- FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑
-
- job.setJarByClass(MapSideJoinMain.class);
- job.setMapperClass(LeftOutJoinMapper.class);
-
- job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
-
- //設置map的輸出key和value類型
- job.setMapOutputKeyClass(Text.class);
-
- //設置reduce的輸出key和value類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new MapSideJoinMain(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage());
- }
- }
- }
這樣的方式全然能夠突破內存的控制,使你毫無忌憚的使用Map Join。並且效率也很不錯。
3、SemiJoin。
- package com.mr.SemiJoin;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashSet;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author zengzhaozheng
- *
- * 用途說明:
- * reudce side join中的left outer join
- * 左連接。兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
- * tb_dim_city.dat文件內容,分隔符為"|":
- * id name orderid city_code is_show
- * 0 其它 9999 9999 0
- * 1 長春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 遼源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * -------------------------風騷的切割線-------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat文件內容,分隔符為"|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- * -------------------------風騷的切割線-------------------------------
- * joinKey.dat內容:
- * city_code
- * 1
- * 2
- * 3
- * 4
- * -------------------------風騷的切割線-------------------------------
- * 結果:
- * 1 長春 1 901 1 1 2G 123
- * 1 長春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class SemiJoin extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);
- public static class SemiJoinMapper extends Mapper {
- private CombineValues combineValues = new CombineValues();
- private HashSet joinKeySet = new HashSet();
- private Text flag = new Text();
- private Text joinKey = new Text();
- private Text secondPart = new Text();
- /**
- * 將參加join的key從DistributedCache取出放到內存中,以便在map端將要參加join的key過濾出來。b
- */
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- BufferedReader br = null;
- //獲得當前作業的DistributedCache相關文件
- Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- String joinKeyStr = null;
- for(Path p : distributePaths){
- if(p.toString().endsWith("joinKey.dat")){
- //讀緩存文件,並放到mem中
- br = new BufferedReader(new FileReader(p.toString()));
- while(null!=(joinKeyStr=br.readLine())){
- joinKeySet.add(joinKeyStr);
- }
- }
- }
- }
- @Override
- protected void map(Object key, Text value, Context context)
- throws IOException, InterruptedException {
- //獲得文件輸入路徑
- String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
- //數據來自tb_dim_city.dat文件,標志即為"0"
- if(pathName.endsWith("tb_dim_city.dat")){
- String[] valueItems = value.toString().split("\\|");
- //過濾格式錯誤的記錄
- if(valueItems.length != 5){
- return;
- }
- //過濾掉不須要參加join的記錄
- if(joinKeySet.contains(valueItems[0])){
- flag.set("0");
- joinKey.set(valueItems[0]);
- secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }else{
- return ;
- }
- }//數據來自於tb_user_profiles.dat,標志即為"1"
- else if(pathName.endsWith("tb_user_profiles.dat")){
- String[] valueItems = value.toString().split("\\|");
- //過濾格式錯誤的記錄
- if(valueItems.length != 4){
- return;
- }
- //過濾掉不須要參加join的記錄
- if(joinKeySet.contains(valueItems[3])){
- flag.set("1");
- joinKey.set(valueItems[3]);
- secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
- combineValues.setFlag(flag);
- combineValues.setJoinKey(joinKey);
- combineValues.setSecondPart(secondPart);
- context.write(combineValues.getJoinKey(), combineValues);
- }else{
- return ;
- }
- }
- }
- }
- public static class SemiJoinReducer extends Reducer {
- //存儲一個分組中的左表信息
- private ArrayList leftTable = new ArrayList();
- //存儲一個分組中的右表信息
- private ArrayList rightTable = new ArrayList();
- private Text secondPar = null;
- private Text output = new Text();
- /**
- * 一個分組調用一次reduce函數
- */
- @Override
- protected void reduce(Text key, Iterable value, Context context)
- throws IOException, InterruptedException {
- leftTable.clear();
- rightTable.clear();
- /**
- * 將分組中的元素依照文件分別進行存放
- * 這樣的方法要注意的問題:
- * 假設一個分組內的元素太多的話,可能會導致在reduce階段出現OOM。
- * 在處理分布式問題之前最好先了解數據的分布情況,依據不同的分布採取最
- * 適當的處理方法,這樣能夠有效的防止導致OOM和數據過度傾斜問題。
- */
- for(CombineValues cv : value){
- secondPar = new Text(cv.getSecondPart().toString());
- //左表tb_dim_city
- if("0".equals(cv.getFlag().toString().trim())){
- leftTable.add(secondPar);
- }
- //右表tb_user_profiles
- else if("1".equals(cv.getFlag().toString().trim())){
- rightTable.add(secondPar);
- }
- }
- logger.info("tb_dim_city:"+leftTable.toString());
- logger.info("tb_user_profiles:"+rightTable.toString());
- for(Text leftPart : leftTable){
- for(Text rightPart : rightTable){
- output.set(leftPart+ "\t" + rightPart);
- context.write(key, output);
- }
- }
- }
- }
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf=getConf(); //獲得配置文件對象
- DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
- Job job=new Job(conf,"LeftOutJoinMR");
- job.setJarByClass(SemiJoin.class);
-
- FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
-
- job.setMapperClass(SemiJoinMapper.class);
- job.setReducerClass(SemiJoinReducer.class);
-
- job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式
- job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
-
- //設置map的輸出key和value類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(CombineValues.class);
-
- //設置reduce的輸出key和value類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful()?0:1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException, InterruptedException {
- try {
- int returnCode = ToolRunner.run(new SemiJoin(),args);
- System.exit(returnCode);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
總結
Map join效率最高。其次是SemiJoin。最低的是reduce join。另外,寫分布式大數據處理程序的時最好要對總體要處理的數據分布情況作一個了解,這能夠提高我們代碼的效率,使數據的傾斜度降到最低,使我們的代碼傾向性更好。