4.1 連接(Join)
連接是關系運算,可以用於合並關系(relation)。對於數據庫中的表連接操作,可能已經廣為人知了。在MapReduce中,連接可以用於合並兩個或多個數據集。例如,用戶基本信息和用戶活動詳情信息。用戶基本信息來自於OLTP數據庫。用戶活動詳情信息來自於日志文件。
MapReduce的連接操作可以用於以下場景:
- 用戶的人口統計信息的聚合操作(例如:青少年和中年人的習慣差異)。
- 當用戶超過一定時間沒有使用網站后,發郵件提醒他們。(這個一定時間的閾值是用戶自己預定義的)
- 分析用戶的瀏覽習慣。讓系統可以基於這個分析提示用戶有哪些網站特性還沒有使用到。進而形成一個反饋循環。
所有這些場景都要求將多個數據集連接起來。
最常用的兩個連接類型是內連接(inner join)和外連接(outer join)。如下圖所示,內連接比較兩個關系中所有的元組,判斷是否滿足連接條件,然后生成一個滿足連接條件的結果集。與內連接相反的是,外連接並不需要兩個關系的元組都滿足連接條件。在連接條件不滿足的時候,外連接可以將其中一方的數據保留在結果集中。
為了實現內連接和外連接,MapReduce中有三種連接策略,如下所示。這三種連接策略有的在map階段,有的在reduce階段。它們都針對MapReduce的排序-合並(sort-merge)的架構進行了優化。
- 重分區連接(Repartition join)—— reduce端連接。使用場景:連接兩個或多個大型數據集。
- 復制連接(Replication join)—— map端連接。使用場景:待連接的數據集中有一個數據集足夠小到可以完全放在緩存中。
- 半連接(Semi-join)—— 另一個map端連接。使用場景:待連接的數據集中有一個數據集非常大,但同時這個數據集可以被過濾成小到可以放在緩存中。
在介紹完這些連接策略之后,還會介紹另一個策略:決策樹。可以根據實際情況選擇最優策略。
4.1.1 重分區連接(Repartition join)
重分區連接是reduce端連接。它利用MapReduce的排序-合並機制來分組數據。它只使用一個單獨的MapReduce任務,並支持多路連接(N-way join)。多路指的是多個數據集。
Map階段負責從多個數據集中讀取數據,決定每個數據的連接值,將連接值作為輸出鍵(output key)。輸出值(output value)則包含將在reduce階段被合並的值。
Reduce階段,一個reduce接收map函數傳來的每一個輸出鍵的所有輸出值,並將數據分為多個分區。在此之后,reduce對所有的分區進行笛卡爾積(Cartersian product)連接運算,並生成全部的結果集。
以上MapReduce過程如圖4.2所示:
注:過濾(filtering)和投影(projection) 在MapReduce重分區連接中,最好能夠減少map階段傳輸到reduce階段的數據量。因為通過網絡在這兩個階段中排序和傳輸數據會產生很高的成本。如果不能避免reduce端的工作,那么一個最佳實踐就是盡可能在map階段多過濾數據和投影。過濾指的是將map極端的輸入數據中不需要的部分丟棄。投影是關系代數的概念。投影用於減少發送給reduce的字段。例如:在分析用戶數據的時候,如果只關注用戶的年齡,那么在map任務中應該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。 |
技術19:優化重分區連接
《Hadoop in Action》給出了一個例子,說明如何使用Hadoop的社區包(contrib package)org.apache.hadoop.contrib.utils.join實現重分區連接。這個貢獻包打包了所有的處理細節,僅僅需要實現一個非常簡單的方法。
然而,這個社區包對重分區的實現方法的空間效率低下。它需要將待連接的所有輸出值都讀取到內存中,然后進行多路連接(multiway join)。實際上,如果僅僅將小數據集讀取到內存中,然后用小數據集遍歷大數據集來進行連接,這樣將更加高效。
問題
需要在MapReduce中進行重分區連接,但是不希望在reduce階段將所有的數據都放到緩存中。
解決方案
這個技術運用了優化后的重分區框架。它僅僅將一個待連接的數據集放在緩存中,減少了reduce需要放在緩存中的數據。
討論
附錄D.1(http://www.cnblogs.com/datacloud/p/3617079.html)中介紹了優化后的重分區框架的實現。這個實現是根據org.apache.hadoop.contrib.utils.join社區包進行建模。這個優化后的框架僅僅緩存兩個數據集中比較小的那一個,以減少內存消耗。圖4.3是優化后的重分區連接的流程圖:
圖4.4是實現的類圖。類圖中包含兩個部分,一個通用框架和一些類的實現樣例。
使用這個連接框架需要實現抽象類OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。
例如,需要連接用戶詳情數據和用戶活動日志。第一步,判斷兩個數據集中那一個比較小。對於一般的網站來說,用戶詳情數據會比較小,用戶活動日志會比較大。
在如下示例中,用戶數據中有用戶姓名,年齡和所在州
$ cat test-data/ch4/users.txt
anne 22 NY
joe 39 CO
alison 35 NY
mike 69 VA
marie 27 OR
jim 21 OR
bob 71 CA
mary 53 NY
dave 36 VA
dude 50 CA
用戶活動日志中有用戶姓名,進行的動作,來源IP。這個文件一般都要比用戶數據要大得多。
$ cat test-data/ch4/user-logs.txt
jim logout 93.24.237.12
mike new_tweet 87.124.79.252
bob new_tweet 58.133.120.100
mike logout 55.237.104.36
jim new_tweet 93.24.237.12
marie view_user 122.158.130.90
首先,必須實現抽象類OptimizedDataJoinMapperBase。這個將在map端被調用。這個類將創建map的輸出鍵和輸出值。同時,它還將提示整個框架,當前處理的文件是不是比較小的那個。
1 public class SampleMap extends OptimizedDataJoinMapperBase { 2 3 private boolean smaller; 4 5 @Override 6 protected Text generateInputTag(String inputFile) { 7 // tag the row with input file name (data source) 8 smaller = inputFile.contains("users.txt"); 9 return new Text(inputFile); 10 } 11 12 @Override 13 protected String genGroupKey(Object key, OutputValue output) { 14 return key.toString(); 15 } 16 17 @Override 18 protected boolean isInputSmaller(String inputFile) { 19 return smaller; 20 } 21 22 @Override 23 protected OutputValue genMapOutputValue(Object o) { 24 return new TextTaggedOutputValue((Text) o); 25 } 26 }
下一步,你需要實現抽象類 OptimizedDataJoinReducerBase。它將在reduce端被調用。在這個類中,將從map端傳入不同數據集的輸出鍵和輸出值,然后返回reduce端的輸出數組。
1 public class SampleReduce extends OptimizedDataJoinReducerBase { 2 3 private TextTaggedOutputValue output = new TextTaggedOutputValue(); 4 private Text textOutput = new Text(); 5 6 @Override 7 protected OutputValue combine(String key, 8 OutputValue smallValue, 9 OutputValue largeValue) { 10 if(smallValue == null || largeValue == null) { 11 return null; 12 } 13 Object[] values = { 14 smallValue.getData(), largeValue.getData() 15 }; 16 textOutput.set(StringUtils.join(values, "\t")); 17 output.setData(textOutput); 18 return output; 19 }
最后,任務的主代碼(driver code)需要指明InputFormat類,並設置次排序(Secondary sort)。
1 job.setInputFormat(KeyValueTextInputFormat.class); 2 3 job.setMapOutputKeyClass(CompositeKey.class); 4 job.setMapOutputValueClass(TextTaggedOutputValue.class); 5 job.setOutputKeyClass(Text.class); 6 job.setOutputValueClass(Text.class); 7 8 job.setPartitionerClass(CompositeKeyPartitioner.class); 9 job.setOutputKeyComparatorClass(CompositeKeyComparator.class); 10 job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);
現在連接的准備工作就做完了,可以開始運行連接:
$ hadoop fs -put test-data/ch4/users.txt users.txt $ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt $ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output $ hadoop fs -cat output/part*
bob 71 CA new_tweet 58.133.120.100
jim 21 OR logout 93.24.237.12
jim 21 OR new_tweet 93.24.237.12
jim 21 OR login 198.184.237.49
marie 27 OR login 58.133.120.100
marie 27 OR view_user 122.158.130.90
mike 69 VA new_tweet 87.124.79.252
mike 69 VA logout 55.237.104.36
如果和連接的源文件相對比,可以看到因為實現了一個內連接,輸出中不包括用戶anne,alison等不存在於日志文件中的記錄。
小結:
這個連接的實現通過只緩存比較小的數據集來提高來Hadoop社區包的效率。但是,當數據從map階段傳輸到reduce階段的時候,仍然產生了很高的網絡成本。
此外,Hadoop社區包支持多路連接,這里的實現只支持二路連接。
如果要更多地減少reduce端連接的內存足跡(memory footprint),一個簡單的機制是在map函數中更多地進行投影操作。投影減少了map階段的輸出中的字段。例如:在分析用戶數據的時候,如果只關注用戶的年齡,那么在map任務中應該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。這樣就減少了map和reduce之間的網絡負擔,也減少了reduce在連接時的內存消耗。
和原始的社區包一樣,這里的重分區的實現也支持過濾和投影。通過允許genMapOutputValue方法返回空值,就可以支持過濾。通過在genMapOutputValue方法中定義輸出值的內容,就可以支持投影。
如果你既想輸出所有的數據到reduce,又想避免排序的損耗,就需要考慮另外兩種連接策略,復制連接和半連接。