MapReduce Join
對兩份數據data1和data2進行關鍵詞連接是一個很通用的問題,如果數據量比較小,可以在內存中完成連接。
如果數據量比較大,在內存進行連接操會發生OOM。mapreduce join可以用來解決大數據的連接。
1 思路
1.1 reduce join
在map階段, 把關鍵字作為key輸出,並在value中標記出數據是來自data1還是data2。因為在shuffle階段已經自然按key分組,reduce階段,判斷每一個value是來自data1還是data2,在內部分成2組,做集合的乘積。
這種方法有2個問題:
1, map階段沒有對數據瘦身,shuffle的網絡傳輸和排序性能很低。
2, reduce端對2個集合做乘積計算,很耗內存,容易導致OOM。
1.2 map join
兩份數據中,如果有一份數據比較小,小數據全部加載到內存,按關鍵字建立索引。大數據文件作為map的輸入文件,對map()函數每一對輸入,都能夠方便地和已加載到內存的小數據進行連接。把連接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組的,並且連接好了的數據。
這種方法,要使用hadoop中的DistributedCache把小數據分布到各個計算節點,每個map節點都要把小數據庫加載到內存,按關鍵字建立索引。
這種方法有明顯的局限性:有一份數據比較小,在map端,能夠把它加載到內存,並進行join操作。
1.3 使用內存服務器,擴大節點的內存空間
針對map join,可以把一份數據存放到專門的內存服務器,在map()方法中,對每一個<key,value>的輸入對,根據key到內存服務器中取出數據,進行連接
1.4 使用BloomFilter過濾空連接的數據
對其中一份數據在內存中建立BloomFilter,另外一份數據在連接之前,用BloomFilter判斷它的key是否存在,如果不存在,那這個記錄是空連接,可以忽略。
1.5 使用mapreduce專為join設計的包
在mapreduce包里看到有專門為join設計的包,對這些包還沒有學習,不知道怎么使用,只是在這里記錄下來,作個提醒。
jar: mapreduce-client-core.jar
package: org.apache.hadoop.mapreduce.lib.join
2 實現map join
相對而言,map join更加普遍,下面的代碼使用DistributedCache實現map join
2.1 背景
有客戶數據customer和訂單數據orders。
customer
客戶編號 | 姓名 | 地址 | 電話 |
---|---|---|---|
1 | hanmeimei | ShangHai | 110 |
2 | leilei | BeiJing | 112 |
3 | lucy | GuangZhou | 119 |
** order**
訂單編號 | 客戶編號 | 其它字段被忽略 |
---|---|---|
1 | 1 | 50 |
2 | 1 | 200 |
3 | 3 | 15 |
4 | 3 | 350 |
5 | 3 | 58 |
6 | 1 | 42 |
7 | 1 | 352 |
8 | 2 | 1135 |
9 | 2 | 400 |
10 | 2 | 2000 |
11 | 2 | 300 |
要求對customer和orders按照客戶編號進行連接,結果要求對客戶編號分組,對訂單編號排序,對其它字段不作要求
客戶編號 | 訂單編號 | 訂單金額 | 姓名 | 地址 | 電話 |
---|---|---|---|---|---|
1 | 1 | 50 | hanmeimei | ShangHai | 110 |
1 | 2 | 200 | hanmeimei | ShangHai | 110 |
1 | 6 | 42 | hanmeimei | ShangHai | 110 |
1 | 7 | 352 | hanmeimei | ShangHai | 110 |
2 | 8 | 1135 | leilei | BeiJing | 112 |
2 | 9 | 400 | leilei | BeiJing | 112 |
2 | 10 | 2000 | leilei | BeiJing | 112 |
2 | 11 | 300 | leilei | BeiJing | 112 |
3 | 3 | 15 | lucy | GuangZhou | 119 |
3 | 4 | 350 | lucy | GuangZhou | 119 |
3 | 5 | 58 | lucy | GuangZhou | 119 |
- 在提交job的時候,把小數據通過DistributedCache分發到各個節點。
- map端使用DistributedCache讀到數據,在內存中構建映射關系--如果使用專門的內存服務器,就把數據加載到內存服務器,map()節點可以只保留一份小緩存;如果使用BloomFilter來加速,在這里就可以構建;
- map()函數中,對每一對<key,value>,根據key到第2)步構建的映射里面中找出數據,進行連接,輸出。
2.2 程序實現
public class Join extends Configured implements Tool {
// customer文件在hdfs上的位置。
// TODO: 改用參數傳入
private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
private static class CustomerBean {
private int custId;
private String name;
private String address;
private String phone;
public CustomerBean() {}
public CustomerBean(int custId, String name, String address,
String phone) {
super();
this.custId = custId;
this.name = name;
this.address = address;
this.phone = phone;
}
public int getCustId() {
return custId;
}
public String getName() {
return name;
}
public String getAddress() {
return address;
}
public String getPhone() {
return phone;
}
}
private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
private int custId;
private int orderId;
public void set(int custId, int orderId) {
this.custId = custId;
this.orderId = orderId;
}
public int getCustId() {
return custId;
}
public int getOrderId() {
return orderId;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(custId);
out.writeInt(orderId);
}
@Override
public void readFields(DataInput in) throws IOException {
custId = in.readInt();
orderId = in.readInt();
}
@Override
public int compareTo(CustOrderMapOutKey o) {
int res = Integer.compare(custId, o.custId);
return res == 0 ? Integer.compare(orderId, o.orderId) : res;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CustOrderMapOutKey) {
CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
return custId == o.custId && orderId == o.orderId;
} else {
return false;
}
}
@Override
public String toString() {
return custId + "\t" + orderId;
}
}
private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
private final Text outputValue = new Text();
/**
* 在內存中customer數據
*/
private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 格式: 訂單編號 客戶編號 訂單金額
String[] cols = value.toString().split("\t");
if (cols.length < 3) {
return;
}
int custId = Integer.parseInt(cols[1]); // 取出客戶編號
CustomerBean customerBean = CUSTOMER_MAP.get(custId);
if (customerBean == null) { // 沒有對應的customer信息可以連接
return;
}
StringBuffer sb = new StringBuffer();
sb.append(cols[2])
.append("\t")
.append(customerBean.getName())
.append("\t")
.append(customerBean.getAddress())
.append("\t")
.append(customerBean.getPhone());
outputValue.set(sb.toString());
outputKey.set(custId, Integer.parseInt(cols[0]));
context.write(outputKey, outputValue);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));
BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
String line = null;
String[] cols = null;
// 格式:客戶編號 姓名 地址 電話
while ((line = reader.readLine()) != null) {
cols = line.split("\t");
if (cols.length < 4) { // 數據格式不匹配,忽略
continue;
}
CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
CUSTOMER_MAP.put(bean.getCustId(), bean);
}
}
}
/**
* reduce
* @author Ivan
*
*/
private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
@Override
protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 什么事都不用做,直接輸出
for (Text value : values) {
context.write(key, value);
}
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if (args.length < 2) {
new IllegalArgumentException("Usage: <inpath> <outpath>");
return;
}
ToolRunner.run(new Configuration(), new Join(), args);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, Join.class.getSimpleName());
job.setJarByClass(SecondarySortMapReduce.class);
// 添加customer cache文件
job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// map settings
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(CustOrderMapOutKey.class);
job.setMapOutputValueClass(Text.class);
// reduce settings
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(CustOrderMapOutKey.class);
job.setOutputKeyClass(Text.class);
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
}
運行環境
- 操作系統: Centos 6.4
- Hadoop: Apache Hadoop-2.5.0
客戶數據文件在hdfs上的位置硬編碼為
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 運行程序之前先把客戶數據上傳到這個位置。
- 程序運行結果