MapReduce中多表合並案例
一.案例需求
訂單數據表t_order:
id |
pid |
amount |
1001 |
01 |
1 |
1002 |
02 |
2 |
1003 |
03 |
3 |
訂單數據order.txt
1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
商品信息表t_product
pid |
pname |
01 |
小米 |
02 |
華為 |
03 |
格力 |
商品數據pd.txt
01 小米 02 華為 03 格力
將商品信息表中數據根據商品pid合並到訂單數據表中。
最終數據形式:
id |
pname |
amount |
1001 |
小米 |
1 |
1004 |
小米 |
4 |
1002 |
華為 |
2 |
1005 |
華為 |
5 |
1003 |
格力 |
3 |
1006 |
格力 |
6 |
二.reduce端表合並(數據傾斜)
通過將關聯條件作為map輸出的key,將兩表滿足join條件的數據並攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯。
1)創建商品和訂合並后的bean類
package com.xyg.mapreduce.table; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class TableBean implements Writable { private String order_id; // 訂單id private String p_id; // 產品id private int amount; // 產品數量 private String pname; // 產品名稱 private String flag;// 表的標記 public TableBean() { super(); } public TableBean(String order_id, String p_id, int amount, String pname, String flag) { super(); this.order_id = order_id; this.p_id = p_id; this.amount = amount; this.pname = pname; this.flag = flag; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } @Override public String toString() { return order_id + "\t" + pname + "\t" + amount + "\t" ; } }
2)編寫TableMapper程序
package com.xyg.mapreduce.table; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{ TableBean bean = new TableBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取輸入文件類型 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); // 2 獲取輸入數據 String line = value.toString(); // 3 不同文件分別處理 if (name.startsWith("order")) {// 訂單表處理 // 3.1 切割 String[] fields = line.split(","); // 3.2 封裝bean對象 bean.setOrder_id(fields[0]); bean.setP_id(fields[1]); bean.setAmount(Integer.parseInt(fields[2])); bean.setPname(""); bean.setFlag("0"); k.set(fields[1]); }else {// 產品表處理 // 3.3 切割 String[] fields = line.split(","); // 3.4 封裝bean對象 bean.setP_id(fields[0]); bean.setPname(fields[1]); bean.setFlag("1"); bean.setAmount(0); bean.setOrder_id(""); k.set(fields[0]); } // 4 寫出 context.write(k, bean); } }
3)編寫TableReducer程序
package com.xyg.mapreduce.table;
import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 1准備存儲訂單的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准備bean對象 TableBean pdBean = new TableBean(); for (TableBean bean : values) { if ("0".equals(bean.getFlag())) {// 訂單表 // 拷貝傳遞過來的每條訂單數據到集合中 TableBean orderBean = new TableBean();` try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 產品表 try { // 拷貝傳遞過來的產品表到內存中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } // 3 表的拼接 for(TableBean bean:orderBeans){ bean.getPname(pdBean.getPname()); // 4 數據寫出去 context.write(bean, NullWritable.get()); } } }
4)編寫TableDriver程序
package com.xyg.mapreduce.table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TableDriver { public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者job對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路徑 job.setJarByClass(TableDriver.class); // 3 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); // 4 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); // 5 指定最終輸出的數據的kv類型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
3)運行程序查看結果
1001 小米 1 1001 小米 1 1002 華為 2 1002 華為 2 1003 格力 3 1003 格力 3
缺點:這種方式中,合並的操作是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜
解決方案: map端實現數據合並
三.map端表合並(Distributedcache)
1.分析
適用於關聯表中有小表的情形;
可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行合並並輸出最終結果,可以大大提高合並操作的並發度,加快處理速度。
2.實操案例
(1)先在驅動模塊中添加緩存文件
package test;
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistributedCacheDriver { public static void main(String[] args) throws Exception { // 1 獲取job信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 設置加載jar包路徑 job.setJarByClass(DistributedCacheDriver.class); // 3 關聯map job.setMapperClass(DistributedCacheMapper.class); // 4 設置最終輸出數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 加載緩存數據 job.addCacheFile(new URI("file:///e:/inputcache/pd.txt")); // 7 map端join的邏輯不需要reduce階段,設置reducetask數量為0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(2)讀取緩存的文件數據
package test;
import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Map<String, String> pdMap = new HashMap<>(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 獲取緩存的文件 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"),"UTF-8")); String line; while(StringUtils.isNotEmpty(line = reader.readLine())){ // 2 切割 String[] fields = line.split("\t"); // 3 緩存數據到集合 pdMap.put(fields[0], fields[1]); } // 4 關流 reader.close(); } Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 獲取產品id String pId = fields[1]; // 4 獲取商品名稱 String pdName = pdMap.get(pId); // 5 拼接 k.set(line + "\t"+ pdName); // 6 寫出 context.write(k, NullWritable.get()); } }