摘要:在排序和reducer 階段,reduce 側連接過程會產生巨大的網絡I/O 流量,在這個階段,相同鍵的值被聚集在一起。
本文分享自華為雲社區《MapReduce 示例:減少 Hadoop MapReduce 中的側連接》,作者:Donglian Lin。
在這篇博客中,將使用 MapReduce 示例向您解釋如何在 Hadoop MapReduce 中執行縮減側連接。在這里,我假設您已經熟悉 MapReduce 框架並知道如何編寫基本的 MapReduce 程序。本博客中討論的主題如下:
- 什么是加入?
- MapReduce 中的連接
- 什么是 Reduce 側連接?
- 減少側連接的 MapReduce 示例
- 結論
什么是聯接?
join操作用於基於外鍵將兩個或多個數據庫表合並。通常,公司在其數據庫中為客戶和交易 記錄維護單獨的表 。而且,很多時候這些公司需要使用這些單獨表格中的數據生成分析報告。因此,他們使用公共列(外鍵)(如客戶 ID 等)對這些單獨的表執行連接操作,以生成組合表。然后,他們分析這個組合表以獲得所需的分析報告。
MapReduce 中的連接
就像 SQL join 一樣,我們也可以在 MapReduce 中對不同的數據集進行 join 操作。MapReduce 中有兩種類型的連接操作:
- Map Side Join:顧名思義,join操作是在map階段本身進行的。因此,在 map side join 中,mapper 執行 join 並且每個 map 的輸入都必須根據鍵進行分區和排序。
- 減少副加入:顧名思義,在減少側加入,減速是 負責執行連接操作。由於排序和改組階段將具有相同鍵的值發送到同一個 reducer,因此它比 map side join 相對簡單和容易實現,因此,默認情況下,數據是為我們組織的。
現在,讓我們詳細了解reduce side join。
什么是減少側連接?
如前所述,reduce side join 是在reducer 階段執行join 操作的過程。基本上,reduce side join 以下列方式發生:
- Mapper 根據公共列或連接鍵讀取要組合的輸入數據。
- 映射器處理輸入並向輸入添加標簽以區分屬於不同來源或數據集或數據庫的輸入。
- 映射器輸出中間鍵值對,其中鍵只是連接鍵。
- 在排序和改組階段之后,會為減速器生成一個鍵和值列表。
- 現在,reducer 將列表中存在的值與鍵連接起來,以給出最終的聚合輸出。
減少邊連接的 MapReduce 示例
假設我有兩個單獨的運動場數據集:
- cust_details: 它包含客戶的詳細信息。
- transaction_details: 包含客戶的交易記錄。
使用這兩個數據集,我想知道每個客戶的生命周期價值。在 這樣做時,我將需要以下東西:
- 此人的姓名以及該人訪問的頻率。
- 他/她購買設備所花費的總金額。
上圖只是向您展示了我們將對其執行reduce side join 操作的兩個數據集的schema。單擊下面的按鈕下載包含此 MapReduce 示例的源代碼和輸入文件的整個項目:
在將上面的 MapReduce 示例項目在 reduce 端加入 Eclipse 時,請記住以下幾點:
- 輸入文件位於項目的 input_files 目錄中。將這些加載到您的 HDFS 中。
- 不要忘記根據您的系統或VM構建Hadoop Reference Jars的路徑(存在於reduce side join項目lib目錄中)。
現在,讓我們了解在這個 MapReduce 示例中的 map 和 reduce 階段內部發生了什么關於reduce side join:
1. 地圖階段:
我將為兩個數據集中的每一個設置一個單獨的映射器,即一個映射器用於 cust_details 輸入,另一個用於 transaction_details 輸入。
cust_details 的映射器:
public static class CustsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("cust " + parts[1])); } }
- 我將一次讀取一個元組的輸入。
- 然后,我將令牌化在元組的每個字並用的名字一起取卡斯特ID個人Ø ñ 。
- Ť ħ È Ç烏斯ID將是我的鍵值對鍵,我的映射器將最終生成。
- 我還將添加一個標簽“ Ç烏斯” ,以表明該輸入元組是cust_details類型。
- 因此,我的 cust_details 映射器將生成以下中間鍵值對:
鍵 - 值對:[客戶 ID,客戶名稱]
例如:[4000001,Ç烏斯 克里斯蒂娜],[4000002,卡斯特佩奇]等
transaction_details 的映射器:
-
public static class TxnsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("tnxn " + parts[3])); } }
- 我將獲取金額值而不是人名。
- 在這種情況下,我們將使用“tnxn”作為標簽。
- 因此,客戶 ID 將是映射器最終生成的鍵值對的我的鍵。
- 最后,transaction_details 映射器的輸出將采用以下格式:
鍵值對:[客戶 ID,tnxn 金額]
示例: [4000001, tnxn 40.33]、[4000002, tnxn 198.44] 等。
2. 排序和洗牌階段
排序和改組階段將生成與每個鍵對應的值的數組列表。換句話說,它將中間鍵值對中每個唯一鍵對應的所有值放在一起。排序和改組階段的輸出將采用以下格式:
鍵 - 值列表:
- {cust ID1 – [(cust name1), (tnxn amount1), (tnxn amount2), (tnxn amount3),.....]}
- {客戶 ID2 – [(客戶名稱 2), (tnxn amount1), (tnxn amount2), (tnxn amount3),.....]}
- ……
例子:
- {4000001 – [(cust kristina), (tnxn 40.33), (tnxn 47.05),…]};
- {4000002 – [(cust paige), (tnxn 198.44), (tnxn 5.58),…]};
- ……
現在,框架將為每個唯一的連接鍵(cust id)和相應的值列表調用 reduce() 方法(reduce(Text key, Iterable<Text> values, Context context))。 然后,reducer 將對相應值列表中存在的值執行連接操作,以最終計算所需的輸出。因此,執行的reducer 任務的數量將等於唯一客戶ID 的數量。
現在讓我們了解在這個 MapReduce 示例中,reducer 如何執行連接操作。
3.減速器階段
如果您還記得,執行這種減少側連接操作的主要目標是找出特定客戶訪問綜合體育館的次數以及該客戶在不同運動上花費的總金額。因此,我的最終輸出應采用以下格式:
Key – Value 對:[客戶姓名] (Key) – [總金額,訪問頻率] (Value)
減速機代碼:
public static class ReduceJoinReducer extends Reducer <Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String name = ""; double total = 0.0; int count = 0; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("tnxn")) { count++; total += Float.parseFloat(parts[1]); } else if (parts[0].equals("cust")) { name = parts[1]; } } String str = String.format("%d %f", count, total); context.write(new Text(name), new Text(str)); } }
因此,將在每個減速器中采取以下步驟來實現所需的輸出:
- 在每個減速器中,我都會有一個鍵和值列表,其中鍵只是客戶 ID。值列表將具有來自兩個數據集的輸入,即來自 transaction_details 的金額和來自 cust_details 的名稱。
- 現在,我將遍歷 reducer 中的值列表中存在的值。
- 然后,我將拆分值列表並檢查該值是 transaction_details 類型還是 cust_details 類型。
- 如果是transaction_details類型,我將執行以下步驟:
- 我將計數器值加一來計算這個人的訪問頻率。
- 我將累積更新金額值以計算該人花費的總金額。
- 另一方面,如果值是 cust_details 類型,我會將它存儲在一個字符串變量中。稍后,我會將名稱指定為我的輸出鍵值對中的鍵。
- 最后,我將在我的 HDFS 的輸出文件夾中寫入輸出鍵值對。
因此,我的減速器將生成的最終輸出如下:
克里斯蒂娜,651.05 8
佩奇,706.97 6
…..
而且,我們上面所做的整個過程在 MapReduce 中稱為Reduce Side Join。
源代碼:
上面的減少側連接的 MapReduce 示例的源代碼如下:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoin { public static class CustsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("cust " + parts[1])); } } public static class TxnsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("tnxn " + parts[3])); } } public static class ReduceJoinReducer extends Reducer <Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String name = ""; double total = 0.0; int count = 0; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("tnxn")) { count++; total += Float.parseFloat(parts[1]); } else if (parts[0].equals("cust")) { name = parts[1]; } } String str = String.format("%d %f", count, total); context.write(new Text(name), new Text(str)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Reduce-side join"); job.setJarByClass(ReduceJoin.class); job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class); Path outputPath = new Path(args[2]); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
運行這個程序
最后,在reduce side join上運行上述MapReduce示例程序的命令 如下:
hadoop jar reducejoin.jar ReduceJoin /sample/input/cust_details /sample/input/transaction_details /sample/output
結論:
在排序和reducer 階段,reduce 側連接過程會產生巨大的網絡I/O 流量,在這個階段,相同鍵的值被聚集在一起。因此,如果您有大量具有數百萬個值的不同數據集,您很可能會遇到 OutOfMemory 異常,即您的 RAM 已滿,因此溢出。在我看來,使用reduce side join的優點是:
- 這很容易實現,因為我們利用 MapReduce 框架中的內置排序和改組算法,該算法組合相同鍵的值並將其發送到同一個減速器。
- 在reduce side join 中,您的輸入不需要遵循任何嚴格的格式,因此您也可以對非結構化數據執行連接操作。
一般來說,人們更喜歡 Apache Hive,它是 Hadoop 生態系統的一部分,來執行連接操作。因此,如果您來自 SQL 背景,則無需擔心編寫 MapReduce Java 代碼來執行連接操作。您可以使用 Hive 作為替代方案。