原理
MapReduce提供了表連接操作其中包括Map端join、Reduce端join還有單表連接,現在我們要討論的是Map端join,Map端join是指數據到達map處理函數之前進行合並的,效率要遠遠高於Reduce端join,因為Reduce端join是把所有的數據都經過Shuffle,非常消耗資源。
1.Map端join的使用場景:一張表數據十分小、一張表數據很大。
Map端join是針對以上場景進行的優化:將小表中的數據全部加載到內存,按關鍵字建立索引。大表中的數據作為map的輸入,對map()函數每一對<key,value>輸入,都能夠方便地和已加載到內存的小數據進行連接。把連接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組並且連接好了的數據。
為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口號)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。
(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標准的文件讀寫API讀取相應的文件。
2.本實驗Map端Join的執行流程
(1)首先在提交作業的時候先將小表文件放到該作業的DistributedCache中,然后從DistributeCache中取出該小表進行join連接的 <key ,value>鍵值對,將其解釋分割放到內存中(可以放大Hash Map等等容器中)。
(2)要重寫MyMapper類下面的setup()方法,因為這個方法是先於map方法執行的,將較小表先讀入到一個HashMap中。
(3)重寫map函數,一行行讀入大表的內容,逐一的與HashMap中的內容進行比較,若Key相同,則對數據進行格式化處理,然后直接輸出。
(4)map函數輸出的<key,value >鍵值對首先經過一個suffle把key值相同的所有value放到一個迭代器中形成values,然后將<key,values>鍵值對傳遞給reduce函數,reduce函數輸入的key直接復制給輸出的key,輸入的values通過增強版for循環遍歷逐一輸出,循環的次數決定了<key,value>輸出的次數。
環境
Linux Ubuntu 14.04
jdk-7u75-linux-x64
hadoop-2.6.0-cdh5.4.5
hadoop-2.6.0-eclipse-cdh5.4.5.jar
eclipse-java-juno-SR2-linux-gtk-x86_64
內容
某電商平台,需要對訂單數據進行分析,已知訂單數據包括兩個文件,分別為訂單表orders1和訂單明細表order_items1,orders1表記錄了用戶購買商品的下單數據,order_items1表記錄了商品id,訂單id以及明細id,它們的表結構以及關系如下圖所示:
它們的數據內容是以"\t"鍵分割,數據內容如下:
orders1表
-
訂單ID 訂單號 用戶ID 下單日期
-
52304 111215052630 176474 2011-12-15 04:58:21
-
52303 111215052629 178350 2011-12-15 04:45:31
-
52302 111215052628 172296 2011-12-15 03:12:23
-
52301 111215052627 178348 2011-12-15 02:37:32
-
52300 111215052626 174893 2011-12-15 02:18:56
-
52299 111215052625 169471 2011-12-15 01:33:46
-
52298 111215052624 178345 2011-12-15 01:04:41
-
52297 111215052623 176369 2011-12-15 01:02:20
-
52296 111215052622 178343 2011-12-15 00:38:02
-
52295 111215052621 178342 2011-12-15 00:18:43
-
52294 111215052620 178341 2011-12-15 00:14:37
-
52293 111215052619 178338 2011-12-15 00:13:07
order_items1表
-
明細ID 訂單ID 商品ID
-
252578 52293 1016840
-
252579 52293 1014040
-
252580 52294 1014200
-
252581 52294 1001012
-
252582 52294 1022245
-
252583 52294 1014724
-
252584 52294 1010731
-
252586 52295 1023399
-
252587 52295 1016840
-
252592 52296 1021134
-
252593 52296 1021133
-
252585 52295 1021840
-
252588 52295 1014040
-
252589 52296 1014040
-
252590 52296 1019043
要求用Map端Join來進行多表連接,查詢在2011-12-15日該電商都有哪些用戶購買了什么商品。這里我們假設orders1文件記錄數很少,order_items1文件記錄數很多。
結果數據如下:
-
訂單ID 用戶ID 下單日期 商品ID
-
52293 178338 2011-12-15 00:13:07 1016840
-
52293 178338 2011-12-15 00:13:07 1014040
-
52294 178341 2011-12-15 00:14:37 1010731
-
52294 178341 2011-12-15 00:14:37 1014724
-
52294 178341 2011-12-15 00:14:37 1022245
-
52294 178341 2011-12-15 00:14:37 1014200
-
52294 178341 2011-12-15 00:14:37 1001012
-
52295 178342 2011-12-15 00:18:43 1023399
-
52295 178342 2011-12-15 00:18:43 1014040
-
52295 178342 2011-12-15 00:18:43 1021840
-
52295 178342 2011-12-15 00:18:43 1016840
-
52296 178343 2011-12-15 00:38:02 1021134
-
52296 178343 2011-12-15 00:38:02 1021133
-
52296 178343 2011-12-15 00:38:02 1014040
-
52296 178343 2011-12-15 00:38:02 1019043
實驗步驟
1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop。
-
cd /apps/hadoop/sbin
-
./start-all.sh
2.在Linux本地新建/data/mapreduce5目錄。
-
mkdir -p /data/mapreduce5
3.在Linux中切換到/data/mapreduce5目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce5/orders1和http://192.168.1.100:60000/allfiles/mapreduce5/order_items1網址上下載文本文件orders1,order_items1。
-
cd /data/mapreduce5
-
wget http://192.168.1.100:60000/allfiles/mapreduce5/orders1
-
wget http://192.168.1.100:60000/allfiles/mapreduce5/order_items1
后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce5/hadoop2lib.tar.gz網址上下載項目用到的依賴包。
-
wget http://192.168.1.100:60000/allfiles/mapreduce5/hadoop2lib.tar.gz
將hadoop2lib.tar.gz解壓到當前目錄下。
-
tar zxvf hadoop2lib.tar.gz
4.首先在HDFS上新建/mymapreduce5/in目錄,然后將Linux本地/data/mapreduce5目錄下的orders1和order_items1文件導入到HDFS的/mymapreduce5/in目錄中。
-
hadoop fs -mkdir -p /mymapreduce5/in
-
hadoop fs -put /data/mapreduce5/orders1 /mymapreduce5/in
-
hadoop fs -put /data/mapreduce5/order_items1 /mymapreduce5/in
5.新建Java Project項目,項目名為mapreduce5。
在mapreduce5項目下新建包,包名為mapduce。
在mapreduce包下新建類,類名為MapJoin。
6.添加項目所需依賴的jar包,右鍵項目,新建一個文件夾,命名為hadoop2lib,用於存放項目所需的jar包。
將/data/mapreduce5目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce5項目的hadoop2lib目錄下。
選中所有項目hadoop2lib目錄下所有jar包,並添加到Build Path中。
7.編寫程序代碼,並描述其設計思路
Map端join適用於一個表記錄數很少(100條),另一表記錄數很多(像幾億條)的情況,我們把小表數據加載到內存中,然后掃描大表,看大表中記錄的每條join key/value是否能在內存中找到相同的join key記錄,如果有則輸出結果。這樣避免了一種數據傾斜問題。Mapreduce的Java代碼分為兩個部分:Mapper部分,Reduce部分。
Mapper代碼
-
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
-
private Map<String, String> dict = new HashMap<>();
-
-
@Override
-
protected void setup(Context context) throws IOException,
-
InterruptedException {
-
String fileName = context.getLocalCacheFiles()[0].getName();
-
System.out.println(fileName);
-
BufferedReader reader = new BufferedReader(new FileReader(fileName));
-
String codeandname = null;
-
while (null != ( codeandname = reader.readLine() ) ) {
-
String str[]=codeandname.split("\t");
-
dict.put(str[0], str[2]+"\t"+str[3]);
-
}
-
reader.close();
-
}
-
@Override
-
protected void map(Object key, Text value, Context context)
-
throws IOException, InterruptedException {
-
String[] kv = value.toString().split("\t");
-
if (dict.containsKey(kv[1])) {
-
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
-
}
-
}
-
}
該部分分為setup方法與map方法。在setup方法中首先用getName()獲取當前文件名為orders1的文件並賦值給fileName,然后用bufferedReader讀取內存中緩存文件。在讀文件時用readLine()方法讀取每行記錄,把該記錄用split("\t")方法截取,與order_items文件中相同的字段str[0]作為key值放到map集合dict中,選取所要展現的字段作為value。map函數接收order_items文件數據,並用split("\t")截取數據存放到數組kv[]中(其中kv[1]與str[0]代表的字段相同),用if判斷,如果內存中dict集合的key值包含kv[1],則用context的write()方法輸出key2/value2值,其中kv[1]作為key2,其他dict.get(kv[1])+"\t"+kv[2]作為value2。
Reduce代碼
-
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context)
-
throws IOException, InterruptedException {
-
for (Text text : values) {
-
context.write(key, text);
-
}
-
}
-
}
map函數輸出的<key,value >鍵值對首先經過一個suffle把key值相同的所有value放到一個迭代器中形成values,然后將<key,values>鍵值對傳遞給reduce函數,reduce函數輸入的key直接復制給輸出的key,輸入的values通過增強版for循環遍歷逐一輸出。
完整代碼
-
package mapreduce;
-
import java.io.BufferedReader;
-
import java.io.FileReader;
-
import java.io.IOException;
-
import java.net.URI;
-
import java.net.URISyntaxException;
-
import java.util.HashMap;
-
import java.util.Map;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
public class MapJoin {
-
-
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
-
private Map<String, String> dict = new HashMap<>();
-
-
@Override
-
protected void setup(Context context) throws IOException,
-
InterruptedException {
-
String fileName = context.getLocalCacheFiles()[0].getName();
-
//System.out.println(fileName);
-
BufferedReader reader = new BufferedReader(new FileReader(fileName));
-
String codeandname = null;
-
while (null != ( codeandname = reader.readLine() ) ) {
-
String str[]=codeandname.split("\t");
-
dict.put(str[0], str[2]+"\t"+str[3]);
-
}
-
reader.close();
-
}
-
@Override
-
protected void map(Object key, Text value, Context context)
-
throws IOException, InterruptedException {
-
String[] kv = value.toString().split("\t");
-
if (dict.containsKey(kv[1])) {
-
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));
-
}
-
}
-
}
-
public static class MyReducer extends Reducer<Text, Text, Text, Text>{
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context)
-
throws IOException, InterruptedException {
-
for (Text text : values) {
-
context.write(key, text);
-
}
-
}
-
}
-
-
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
-
Job job = Job.getInstance();
-
job.setJobName("mapjoin");
-
job.setJarByClass(MapJoin.class);
-
-
job.setMapperClass(MyMapper.class);
-
job.setReducerClass(MyReducer.class);
-
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
-
Path in = new Path("hdfs://localhost:9000/mymapreduce5/in/order_items1");
-
Path out = new Path("hdfs://localhost:9000/mymapreduce5/out");
-
FileInputFormat.addInputPath(job, in);
-
FileOutputFormat.setOutputPath(job, out);
-
-
URI uri = new URI("hdfs://localhost:9000/mymapreduce5/in/orders1");
-
job.addCacheFile(uri);
-
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
8.在MapJoin類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。
9.待執行完畢后,進入命令模式下,在HDFS上/mymapreduce5/out中查看實驗結果。
-
hadoop fs -ls /mymapreduce5/out
-
hadoop fs -cat /mymapreduce5/out/part-r-00000













