4.1.3 半連接(Semi-join)
假設一個場景,需要連接兩個很大的數據集,例如,用戶日志和OLTP的用戶數據。任何一個數據集都不是足夠小到可以緩存在map作業的內存中。這樣看來,似乎就不能使用reduce端的連接了。盡管不是必須,可以思考以下問題:如果在數據集的連接操作中,一個數據集中有的記錄由於因為無法連接到另一個數據集的記錄,將會被移除。這樣還需要將整個數據集放到內存中嗎?在這個例子中,在用戶日志中的用戶僅僅是OLTP用戶數據中的用戶中的很小的一部分。那么就可以從OLTP用戶數據中只取出存在於用戶日志中的那部分用戶的用戶數據。然后就可以得到足夠小到可以放在內存中的數據集。這種的解決方案就叫做半連接。
圖4.6說明了在半連接中將要執行的三個MapReduce作業(Job)。
接下來介紹如何實現一個半連接。
技術20 實現半連接
當需要連接兩個都很大的數據集時,很容易想到要用重分區連接(利用了整個MapReduce框架的reduce端的連接)。如果這么想了,又不能夠將其中一個數據集過濾到一個較小的尺寸以便放到map端的內存中,那也就是想想而已。然而,如果能夠將一個數據集減小到一個可管理的大小,也許就用不着使用重分區連接了。
問題
需要連接兩個都很大的數據集,同時減少整理和排序階段的消耗。
解決方案
在這個技術中,將會用到三個MapReduce作業來連接兩個數據集,以此來減少reduce端連接的消耗。對於很大的數據集,這個技術非常有用。
討論
在這個技術中,將會用到附錄D.2中的復制連接(Replicated join)的代碼來實現MapReduce作業中的最后兩步(http://www.cnblogs.com/datacloud/p/3617078.html)。同時,在圖4.6中的三個作業將會被分開來說明。
作業1
第一個MapReduce作業的功能是從日志文件中提取出用戶名,用這些用戶名生成一個用戶名唯一的集合(Set)。這通過在map函數執行用戶名的投影(projection)操作來實現。然后用reduce出用戶名。為了減少在map階段和reduce階段之間傳輸的數據量,采用如下方法:在map任務中采用哈希集(HashSet)來保存用戶名,在cleanup方法中輸出哈希集的值。圖4.7說明了這個作業的流程:
作業1的map和reduce的代碼如下:
1 public static class Map extends Mapper<Text, Text, Text, NullWritable> { 2 3 private Set<String> keys = new HashSet<String>(); 4 5 @Override 6 protected void map(Text key, Text value, Context context) 7 throws IOException, InterruptedException { 8 keys.add(key.toString()); 9 } 10 11 @Override 12 protected void cleanup(Context context) 13 throws IOException, InterruptedException { 14 15 Text outputKey = new Text(); 16 17 for(String key: keys) { 18 outputKey.set(key); 19 context.write(outputKey, NullWritable.get()); 20 } 21 22 } 23 24 } 25 26 public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> { 27 28 @Override 29 protected void reduce(Text key, Iterable<NullWritable> values, Context context) 30 throws IOException, InterruptedException { 31 context.write(key, NullWritable.get()); 32 } 33 34 }
作業1的結果就是來自於日志文件中的所有用戶的集合。集合中的用戶名是唯一的。
作業2
作業2包含了復雜的過濾過程。目的是從全體用戶的用戶數據集中移除不存在於日志文件中的用戶。這是一個只包含map的作業。它用到了復制連接來緩存出現在日志文件中的用戶名,並把他們和全體用戶的數據集連接。由於來自於作業1的用戶唯一的數據集要遠遠小於全體用戶的數據集,就把來自作業1的用戶集放到緩存中了。圖4.8說明了這個作業的流程:
現在是個不錯的時間去熟悉一下附錄D中的復制連接框架。這個框架對KeyValueTextInputFormat和TextOutputFormat提供了內置支持,並假設 KeyValueTextInputFormat生成的鍵是連接鍵。同時,這也是數據被展開的過程。圖4.9是這個框架的類圖:
GenericReplicatedJoin類是執行連接的類。如圖4.9中所示,在GenericReplicatedJoin的類列表中前三個類是可擴展的,相對應的復制連接的行為也是可定制的。readFromInputFormat方法可以用於任意的輸入類型(InputFormat)。getDistributedCacheReader方法可以被重載來支持來自於分布式緩存(distributed cache)的任意文件類型。在這一步中的核心是join方法。join方法將會生成作業的輸出鍵和輸出值。在默認的實現中,兩個數據集的值將會被合並以生成最終的輸出值。這個join方法可以自定義,可以指定僅僅輸出來自於OLTP的用戶表的值,如下所示:
1 public class ReplicatedFilterJob extends GenericReplicatedJoin { 2 3 @Override 4 public Pair join(Pair inputSplitPair, Pair distCachePair) { 5 return inputSplitPair; 6 } 7 8 }
還需要把來自於作業1的文件放到分布式緩存中:
1 for(FileStatus f: fs.listStatus(uniqueUserStatus)) { 2 if(f.getPath().getName().startsWith("part")) { 3 DistributedCache.addCacheFile(f.getPath().toUri(), conf); 4 } 5 }
然后,在驅動(driver)代碼中,調用GenericReplicatedJoin類:
1 public class ReplicatedFilterJob extends GenericReplicatedJoin { 2 3 public static void runJob(Path usersPath, 4 Path uniqueUsersPath, 5 Path outputPath) 6 throws Exception { 7 8 Configuration conf = new Configuration(); 9 10 for(FileStatus f: fs.listStatus(uniqueUsersPath)) { 11 if(f.getPath().getName().startsWith("part")) { 12 DistributedCache.addCacheFile(f.getPath().toUri(), conf); 13 } 14 } 15 16 Job job = new Job(conf); 17 job.setJarByClass(ReplicatedFilterJob.class); 18 job.setMapperClass(ReplicatedFilterJob.class); 19 job.setNumReduceTasks(0); 20 job.setInputFormatClass(KeyValueTextInputFormat.class); 21 outputPath.getFileSystem(conf).delete(outputPath, true); 22 FileInputFormat.setInputPaths(job, usersPath); 23 FileOutputFormat.setOutputPath(job, outputPath); 24 25 if(!job.waitForCompletion(true)) { 26 throw new Exception("Job failed"); 27 } 28 29 } 30 31 @Override 32 public Pair join(Pair inputSplitPair, Pair distCachePair) { 33 return inputSplitPair; 34 } 35 36 }
作業2的輸出就是已被用戶日志數據集的用戶過濾過的用戶集了。
作業3
在最后一步中,需要將作業2生成的已過濾的用戶集和原始的用戶日志合並了。表面上,已過濾的用戶集是足夠小到可以放到內存中,同樣也可以放到分布式緩存中。圖4.10說明了這個作業的流程:
1 FileStatus usersStatus = fs.getFileStatus(usersPath); 2 3 for(FileStatus f: fs.listStatus(usersPath)) { 4 5 if(f.getPath().getName().startsWith("part")) { 6 DistributedCache.addCacheFile(f.getPath().toUri(), conf); 7 } 8 9 ...
這里要再次用到復制連接框架來執行連接。但這次不用自定義join方法的行為,因為兩個數據集中的數據都要出現在最后的輸出中。
執行這個代碼,觀察前述步驟生成的輸出。
$ bin/run.sh com.manning.hip.ch4.joins.semijoin.Main users.txt user-logs.txt output $ hadoop fs -ls output /user/aholmes/output/filtered /user/aholmes/output/result /user/aholmes/output/unique $ hadoop fs -cat output/unique/part* bob jim marie mike $ hadoop fs -cat output/filtered/part* mike 69 VA marie 27 OR jim 21 OR bob 71 CA $ hadoop fs -cat output/result/part* jim logout 93.24.237.12 21 OR mike new_tweet 87.124.79.252 69 VA bob new_tweet 58.133.120.100 71 CA mike logout 55.237.104.36 69 VA jim new_tweet 93.24.237.12 21 OR marie view_user 122.158.130.90 27 OR jim login 198.184.237.49 21 OR marie login 58.133.120.100 27 OR
這些輸出說明了在半連接的作業中的邏輯進程和最終連接的輸出。
小結
在這個技術中說明了如何使用半連接來合並兩個數據集。半連接的創建包括了比其他連接類型更多的步驟。但它確實是一個處理大的數據集的map端連接的強大的工具。當然,這些很大的數據集要能夠被減小到能夠放到內存中。