[大牛翻譯系列]Hadoop(3)MapReduce 連接:半連接(Semi-join)


4.1.3 半連接(Semi-join)

假設一個場景,需要連接兩個很大的數據集,例如,用戶日志和OLTP的用戶數據。任何一個數據集都不是足夠小到可以緩存在map作業的內存中。這樣看來,似乎就不能使用reduce端的連接了。盡管不是必須,可以思考以下問題:如果在數據集的連接操作中,一個數據集中有的記錄由於因為無法連接到另一個數據集的記錄,將會被移除。這樣還需要將整個數據集放到內存中嗎?在這個例子中,在用戶日志中的用戶僅僅是OLTP用戶數據中的用戶中的很小的一部分。那么就可以從OLTP用戶數據中只取出存在於用戶日志中的那部分用戶的用戶數據。然后就可以得到足夠小到可以放在內存中的數據集。這種的解決方案就叫做半連接。

圖4.6說明了在半連接中將要執行的三個MapReduce作業(Job)。

 

 

接下來介紹如何實現一個半連接。

 

技術20 實現半連接

當需要連接兩個都很大的數據集時,很容易想到要用重分區連接(利用了整個MapReduce框架的reduce端的連接)。如果這么想了,又不能夠將其中一個數據集過濾到一個較小的尺寸以便放到map端的內存中,那也就是想想而已。然而,如果能夠將一個數據集減小到一個可管理的大小,也許就用不着使用重分區連接了。

 

問題

需要連接兩個都很大的數據集,同時減少整理和排序階段的消耗。

 

解決方案

在這個技術中,將會用到三個MapReduce作業來連接兩個數據集,以此來減少reduce端連接的消耗。對於很大的數據集,這個技術非常有用。

 

討論

在這個技術中,將會用到附錄D.2中的復制連接(Replicated join)的代碼來實現MapReduce作業中的最后兩步(http://www.cnblogs.com/datacloud/p/3617078.html)。同時,在圖4.6中的三個作業將會被分開來說明。

 

作業1

第一個MapReduce作業的功能是從日志文件中提取出用戶名,用這些用戶名生成一個用戶名唯一的集合(Set)。這通過在map函數執行用戶名的投影(projection)操作來實現。然后用reduce出用戶名。為了減少在map階段和reduce階段之間傳輸的數據量,采用如下方法:在map任務中采用哈希集(HashSet)來保存用戶名,在cleanup方法中輸出哈希集的值。圖4.7說明了這個作業的流程:

 

 

作業1的map和reduce的代碼如下:

 

 1 public static class Map extends Mapper<Text, Text, Text, NullWritable> {
 2 
 3     private Set<String> keys = new HashSet<String>();
 4     
 5     @Override
 6     protected void map(Text key, Text value, Context context)
 7         throws IOException, InterruptedException {
 8         keys.add(key.toString());
 9     }
10     
11     @Override
12     protected void cleanup(Context context)
13         throws IOException, InterruptedException {
14         
15         Text outputKey = new Text();
16         
17         for(String key: keys) {
18             outputKey.set(key);
19             context.write(outputKey, NullWritable.get());
20         }
21         
22     }
23     
24 }
25 
26 public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
27 
28     @Override
29     protected void reduce(Text key, Iterable<NullWritable> values, Context context)
30         throws IOException, InterruptedException {
31         context.write(key, NullWritable.get());
32     }
33     
34 }

 

作業1的結果就是來自於日志文件中的所有用戶的集合。集合中的用戶名是唯一的。

 

作業2

作業2包含了復雜的過濾過程。目的是從全體用戶的用戶數據集中移除不存在於日志文件中的用戶。這是一個只包含map的作業。它用到了復制連接來緩存出現在日志文件中的用戶名,並把他們和全體用戶的數據集連接。由於來自於作業1的用戶唯一的數據集要遠遠小於全體用戶的數據集,就把來自作業1的用戶集放到緩存中了。圖4.8說明了這個作業的流程:

 

 

現在是個不錯的時間去熟悉一下附錄D中的復制連接框架。這個框架對KeyValueTextInputFormat和TextOutputFormat提供了內置支持,並假設 KeyValueTextInputFormat生成的鍵是連接鍵。同時,這也是數據被展開的過程。圖4.9是這個框架的類圖:

 

GenericReplicatedJoin類是執行連接的類。如圖4.9中所示,在GenericReplicatedJoin的類列表中前三個類是可擴展的,相對應的復制連接的行為也是可定制的。readFromInputFormat方法可以用於任意的輸入類型(InputFormat)。getDistributedCacheReader方法可以被重載來支持來自於分布式緩存(distributed cache)的任意文件類型。在這一步中的核心是join方法。join方法將會生成作業的輸出鍵和輸出值。在默認的實現中,兩個數據集的值將會被合並以生成最終的輸出值。這個join方法可以自定義,可以指定僅僅輸出來自於OLTP的用戶表的值,如下所示:

 

1 public class ReplicatedFilterJob extends GenericReplicatedJoin {
2 
3     @Override
4     public Pair join(Pair inputSplitPair, Pair distCachePair) {
5         return inputSplitPair;
6     }
7     
8 }

 

還需要把來自於作業1的文件放到分布式緩存中:

 

1 for(FileStatus f: fs.listStatus(uniqueUserStatus)) {
2     if(f.getPath().getName().startsWith("part")) {
3         DistributedCache.addCacheFile(f.getPath().toUri(), conf);
4     }
5 }

 

然后,在驅動(driver)代碼中,調用GenericReplicatedJoin類:

 

 1 public class ReplicatedFilterJob extends GenericReplicatedJoin {
 2 
 3     public static void runJob(Path usersPath,
 4                                 Path uniqueUsersPath,
 5                                 Path outputPath)
 6         throws Exception {
 7         
 8         Configuration conf = new Configuration();
 9         
10         for(FileStatus f: fs.listStatus(uniqueUsersPath)) {
11             if(f.getPath().getName().startsWith("part")) {
12                 DistributedCache.addCacheFile(f.getPath().toUri(), conf);
13             }
14         }
15         
16         Job job = new Job(conf);
17         job.setJarByClass(ReplicatedFilterJob.class);
18         job.setMapperClass(ReplicatedFilterJob.class);
19         job.setNumReduceTasks(0);
20         job.setInputFormatClass(KeyValueTextInputFormat.class);
21         outputPath.getFileSystem(conf).delete(outputPath, true);
22         FileInputFormat.setInputPaths(job, usersPath);
23         FileOutputFormat.setOutputPath(job, outputPath);
24         
25         if(!job.waitForCompletion(true)) {
26             throw new Exception("Job failed");
27         }
28         
29     }
30     
31     @Override
32     public Pair join(Pair inputSplitPair, Pair distCachePair) {
33         return inputSplitPair;
34     }
35     
36 }

 

作業2的輸出就是已被用戶日志數據集的用戶過濾過的用戶集了。

 

作業3

在最后一步中,需要將作業2生成的已過濾的用戶集和原始的用戶日志合並了。表面上,已過濾的用戶集是足夠小到可以放到內存中,同樣也可以放到分布式緩存中。圖4.10說明了這個作業的流程:

 

1 FileStatus usersStatus = fs.getFileStatus(usersPath);
2 
3 for(FileStatus f: fs.listStatus(usersPath)) {
4 
5     if(f.getPath().getName().startsWith("part")) {
6         DistributedCache.addCacheFile(f.getPath().toUri(), conf);
7     }
8     
9 ...

 

 

這里要再次用到復制連接框架來執行連接。但這次不用自定義join方法的行為,因為兩個數據集中的數據都要出現在最后的輸出中。

執行這個代碼,觀察前述步驟生成的輸出。

 

$ bin/run.sh com.manning.hip.ch4.joins.semijoin.Main users.txt user-logs.txt output

$ hadoop fs -ls output
/user/aholmes/output/filtered
/user/aholmes/output/result
/user/aholmes/output/unique

$ hadoop fs -cat output/unique/part*
bob
jim
marie
mike

$ hadoop fs -cat output/filtered/part*
mike 69 VA
marie 27 OR
jim 21 OR
bob 71 CA

$ hadoop fs -cat output/result/part*
jim logout 93.24.237.12 21 OR
mike new_tweet 87.124.79.252 69 VA
bob new_tweet 58.133.120.100 71 CA
mike logout 55.237.104.36 69 VA
jim new_tweet 93.24.237.12 21 OR
marie view_user 122.158.130.90 27 OR
jim login 198.184.237.49 21 OR
marie login 58.133.120.100 27 OR

 

這些輸出說明了在半連接的作業中的邏輯進程和最終連接的輸出。

 

小結

在這個技術中說明了如何使用半連接來合並兩個數據集。半連接的創建包括了比其他連接類型更多的步驟。但它確實是一個處理大的數據集的map端連接的強大的工具。當然,這些很大的數據集要能夠被減小到能夠放到內存中。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM