Mapreduce實例——Map端join


原理

MapReduce提供了表連接操作其中包括MapjoinReducejoin還有單表連接,現在我們要討論的是MapjoinMapjoin是指數據到達map處理函數之前進行合並的,效率要遠遠高於Reducejoin,因為Reducejoin是把所有的數據都經過Shuffle,非常消耗資源。

1.Mapjoin的使用場景:一張表數據十分小、一張表數據很大。

Mapjoin是針對以上場景進行的優化:將小表中的數據全部加載到內存,按關鍵字建立索引。大表中的數據作為map的輸入,對map()函數每一對<key,value>輸入,都能夠方便地和已加載到內存的小數據進行連接。把連接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組並且連接好了的數據。

為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:

1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口號)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。

2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標准的文件讀寫API讀取相應的文件。

2.本實驗MapJoin的執行流程

1)首先在提交作業的時候先將小表文件放到該作業的DistributedCache中,然后從DistributeCache中取出該小表進行join連接的 <key ,value>鍵值對,將其解釋分割放到內存中(可以放大Hash Map等等容器中)。

2)要重寫MyMapper類下面的setup()方法,因為這個方法是先於map方法執行的,將較小表先讀入到一個HashMap中。

3)重寫map函數,一行行讀入大表的內容,逐一的與HashMap中的內容進行比較,若Key相同,則對數據進行格式化處理,然后直接輸出。

4map函數輸出的<key,value >鍵值對首先經過一個sufflekey值相同的所有value放到一個迭代器中形成values,然后將<key,values>鍵值對傳遞給reduce函數,reduce函數輸入的key直接復制給輸出的key,輸入的values通過增強版for循環遍歷逐一輸出,循環的次數決定了<key,value>輸出的次數。

環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

內容

某電商平台,需要對訂單數據進行分析,已知訂單數據包括兩個文件,分別為訂單表orders1和訂單明細表order_items1orders1表記錄了用戶購買商品的下單數據,order_items1表記錄了商品id,訂單id以及明細id,它們的表結構以及關系如下圖所示:

它們的數據內容是以"\t"鍵分割,數據內容如下:

orders1

  1. 訂單ID   訂單號          用戶ID    下單日期  
  2. 52304   111215052630    176474  2011-12-15 04:58:21  
  3. 52303   111215052629    178350  2011-12-15 04:45:31  
  4. 52302   111215052628    172296  2011-12-15 03:12:23  
  5. 52301   111215052627    178348  2011-12-15 02:37:32  
  6. 52300   111215052626    174893  2011-12-15 02:18:56  
  7. 52299   111215052625    169471  2011-12-15 01:33:46  
  8. 52298   111215052624    178345  2011-12-15 01:04:41  
  9. 52297   111215052623    176369  2011-12-15 01:02:20  
  10. 52296   111215052622    178343  2011-12-15 00:38:02  
  11. 52295   111215052621    178342  2011-12-15 00:18:43  
  12. 52294   111215052620    178341  2011-12-15 00:14:37  
  13. 52293   111215052619    178338  2011-12-15 00:13:07  

order_items1

  1. 明細ID  訂單ID   商品ID  
  2. 252578  52293   1016840  
  3. 252579  52293   1014040  
  4. 252580  52294   1014200  
  5. 252581  52294   1001012  
  6. 252582  52294   1022245  
  7. 252583  52294   1014724  
  8. 252584  52294   1010731  
  9. 252586  52295   1023399  
  10. 252587  52295   1016840  
  11. 252592  52296   1021134  
  12. 252593  52296   1021133  
  13. 252585  52295   1021840  
  14. 252588  52295   1014040  
  15. 252589  52296   1014040  
  16. 252590  52296   1019043  

要求用MapJoin來進行多表連接,查詢在2011-12-15日該電商都有哪些用戶購買了什么商品。這里我們假設orders1文件記錄數很少,order_items1文件記錄數很多。

結果數據如下:

  1. 訂單ID  用戶ID   下單日期             商品ID  
  2. 52293   178338  2011-12-15 00:13:07 1016840  
  3. 52293   178338  2011-12-15 00:13:07 1014040  
  4. 52294   178341  2011-12-15 00:14:37 1010731  
  5. 52294   178341  2011-12-15 00:14:37 1014724  
  6. 52294   178341  2011-12-15 00:14:37 1022245  
  7. 52294   178341  2011-12-15 00:14:37 1014200  
  8. 52294   178341  2011-12-15 00:14:37 1001012  
  9. 52295   178342  2011-12-15 00:18:43 1023399  
  10. 52295   178342  2011-12-15 00:18:43 1014040  
  11. 52295   178342  2011-12-15 00:18:43 1021840  
  12. 52295   178342  2011-12-15 00:18:43 1016840  
  13. 52296   178343  2011-12-15 00:38:02 1021134  
  14. 52296   178343  2011-12-15 00:38:02 1021133  
  15. 52296   178343  2011-12-15 00:38:02 1014040  
  16. 52296   178343  2011-12-15 00:38:02 1019043  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.Linux本地新建/data/mapreduce5目錄。

  1. mkdir -p /data/mapreduce5  

3.Linux中切換到/data/mapreduce5目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce5/orders1http://192.168.1.100:60000/allfiles/mapreduce5/order_items1網址上下載文本文件orders1order_items1

  1. cd /data/mapreduce5  
  2. wget http://192.168.1.100:60000/allfiles/mapreduce5/orders1  
  3. wget http://192.168.1.100:60000/allfiles/mapreduce5/order_items1  

后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce5/hadoop2lib.tar.gz網址上下載項目用到的依賴包。

  1. wget http://192.168.1.100:60000/allfiles/mapreduce5/hadoop2lib.tar.gz  

hadoop2lib.tar.gz解壓到當前目錄下。

  1. tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce5/in目錄,然后將Linux本地/data/mapreduce5目錄下的orders1order_items1文件導入到HDFS/mymapreduce5/in目錄中。

  1. hadoop fs -mkdir -p /mymapreduce5/in  
  2. hadoop fs -put /data/mapreduce5/orders1 /mymapreduce5/in  
  3. hadoop fs -put /data/mapreduce5/order_items1 /mymapreduce5/in  

5.新建Java Project項目,項目名為mapreduce5

mapreduce5項目下新建包,包名為mapduce

mapreduce包下新建類,類名為MapJoin

6.添加項目所需依賴的jar包,右鍵項目,新建一個文件夾,命名為hadoop2lib,用於存放項目所需的jar包。

/data/mapreduce5目錄下,hadoop2lib目錄中的jar包,拷貝到eclipsemapreduce5項目的hadoop2lib目錄下。

選中所有項目hadoop2lib目錄下所有jar包,並添加到Build Path中。

7.編寫程序代碼,並描述其設計思路

Mapjoin適用於一個表記錄數很少(100條),另一表記錄數很多(像幾億條)的情況,我們把小表數據加載到內存中,然后掃描大表,看大表中記錄的每條join key/value是否能在內存中找到相同的join key記錄,如果有則輸出結果。這樣避免了一種數據傾斜問題。MapreduceJava代碼分為兩個部分:Mapper部分,Reduce部分。

Mapper代碼

  1. public static class MyMapper extends Mapper<Object, Text, Text, Text>{  
  2.         private Map<String, String> dict = new HashMap<>();  
  3.     
  4.         @Override  
  5.         protected void setup(Context context) throws IOException,  
  6.                 InterruptedException {  
  7.             String fileName = context.getLocalCacheFiles()[0].getName();  
  8.             System.out.println(fileName);  
  9.             BufferedReader reader = new BufferedReader(new FileReader(fileName));  
  10.             String codeandname = null;  
  11.             while (null != ( codeandname = reader.readLine() ) ) {  
  12.                 String str[]=codeandname.split("\t");  
  13.                 dict.put(str[0], str[2]+"\t"+str[3]);  
  14.             }  
  15.             reader.close();  
  16.         }  
  17.         @Override  
  18.         protected void map(Object key, Text value, Context context)  
  19.                 throws IOException, InterruptedException {  
  20.             String[] kv = value.toString().split("\t");  
  21.             if (dict.containsKey(kv[1])) {  
  22.                 context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));  
  23.             }  
  24.         }  
  25.     }  

該部分分為setup方法與map方法。在setup方法中首先用getName()獲取當前文件名為orders1的文件並賦值給fileName,然后用bufferedReader讀取內存中緩存文件。在讀文件時用readLine()方法讀取每行記錄,把該記錄用split("\t")方法截取,與order_items文件中相同的字段str[0]作為key值放到map集合dict中,選取所要展現的字段作為valuemap函數接收order_items文件數據,並用split("\t")截取數據存放到數組kv[]中(其中kv[1]str[0]代表的字段相同),用if判斷,如果內存中dict集合的key值包含kv[1],則用contextwrite()方法輸出key2/value2值,其中kv[1]作為key2,其他dict.get(kv[1])+"\t"+kv[2]作為value2

Reduce代碼

  1. public static class MyReducer extends Reducer<Text, Text, Text, Text>{  
  2.         @Override  
  3.         protected void reduce(Text key, Iterable<Text> values, Context context)  
  4.     throws IOException, InterruptedException {  
  5.     for (Text text : values) {  
  6.     context.write(key, text);  
  7.     }  
  8.     }  
  9.     }  

map函數輸出的<key,value >鍵值對首先經過一個sufflekey值相同的所有value放到一個迭代器中形成values,然后將<key,values>鍵值對傳遞給reduce函數,reduce函數輸入的key直接復制給輸出的key,輸入的values通過增強版for循環遍歷逐一輸出。

完整代碼

  1. package mapreduce;  
  2. import java.io.BufferedReader;  
  3. import java.io.FileReader;  
  4. import java.io.IOException;  
  5. import java.net.URI;  
  6. import java.net.URISyntaxException;  
  7. import java.util.HashMap;  
  8. import java.util.Map;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. public class MapJoin {  
  17.     
  18.     public static class MyMapper extends Mapper<Object, Text, Text, Text>{  
  19.         private Map<String, String> dict = new HashMap<>();  
  20.     
  21.         @Override  
  22.         protected void setup(Context context) throws IOException,  
  23.                 InterruptedException {  
  24.             String fileName = context.getLocalCacheFiles()[0].getName();  
  25.             //System.out.println(fileName);  
  26.             BufferedReader reader = new BufferedReader(new FileReader(fileName));  
  27.             String codeandname = null;  
  28.             while (null != ( codeandname = reader.readLine() ) ) {  
  29.                 String str[]=codeandname.split("\t");  
  30.                 dict.put(str[0], str[2]+"\t"+str[3]);  
  31.             }  
  32.             reader.close();  
  33.         }  
  34.         @Override  
  35.         protected void map(Object key, Text value, Context context)  
  36.                 throws IOException, InterruptedException {  
  37.             String[] kv = value.toString().split("\t");  
  38.             if (dict.containsKey(kv[1])) {  
  39.                 context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));  
  40.             }  
  41.         }  
  42.     }  
  43.     public static class MyReducer extends Reducer<Text, Text, Text, Text>{  
  44.         @Override  
  45.         protected void reduce(Text key, Iterable<Text> values, Context context)  
  46.     throws IOException, InterruptedException {  
  47.     for (Text text : values) {  
  48.     context.write(key, text);  
  49.     }  
  50.     }  
  51.     }  
  52.     
  53.     public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {  
  54.     Job job = Job.getInstance();  
  55.     job.setJobName("mapjoin");  
  56.     job.setJarByClass(MapJoin.class);  
  57.     
  58.     job.setMapperClass(MyMapper.class);  
  59.     job.setReducerClass(MyReducer.class);  
  60.     
  61.     job.setOutputKeyClass(Text.class);  
  62.     job.setOutputValueClass(Text.class);  
  63.     
  64.     Path in = new Path("hdfs://localhost:9000/mymapreduce5/in/order_items1");  
  65.     Path out = new Path("hdfs://localhost:9000/mymapreduce5/out");  
  66.     FileInputFormat.addInputPath(job, in);  
  67.     FileOutputFormat.setOutputPath(job, out);  
  68.     
  69.     URI uri = new URI("hdfs://localhost:9000/mymapreduce5/in/orders1");  
  70.     job.addCacheFile(uri);  
  71.     
  72.     System.exit(job.waitForCompletion(true) ? 0 : 1);  
  73.     }  
  74.     }  

8.MapJoin類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢后,進入命令模式下,在HDFS/mymapreduce5/out中查看實驗結果。

  1. hadoop fs -ls /mymapreduce5/out  
  2. hadoop fs -cat /mymapreduce5/out/part-r-00000  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM