哈嘍~各位小伙伴們中秋快樂,好久沒更新新的文章啦,今天分享如何使用mapreduce進行join操作。
在離線計算中,我們常常不只是會對單一一個文件進行操作,進行需要進行兩個或多個文件關聯出更多數據,類似與sql中的join操作。
今天就跟大家分享一下如何在MapReduce中實現join操作
需求
現有兩張,一張是產品信息表,一張是訂單表。訂單表中只表存了產品ID,如果想要查出訂單以及產品的相關信息就必須使用關聯。
實現
根據MapReduce特性,大家都知道在reduce端,相同key的key,value對會被放到同一個reduce方法中(不設置partition的話)。
利用這個特點我們可以輕松實現join操作,請看下面示例。
產品表
ID | brand | model |
---|---|---|
p0001 | 蘋果 | iphone11 pro max |
p0002 | 華為 | p30 |
p0003 | 小米 | mate10 |
訂單表
id | name | address | produceID | num |
---|---|---|---|---|
00001 | kris | 深圳市福田區 | p0001 | 1 |
00002 | pony | 深圳市南山區 | p0001 | 2 |
00003 | jack | 深圳市坂田區 | p0001 | 3 |
假如數據量巨大,兩表的數據是以文件的形式存儲在HDFS中,需要用mapreduce程序來實現一下SQL查詢運算:
select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID
MapReduce實現思路
通過將關聯的條件(prodcueID)作為map輸出的key,將兩表滿足join條件的數據並攜帶數據所來源的文件信息,發往同一個
reduce task,在reduce中進行數據的串聯
實現方式一-reduce端join
定義一個Bean
public class RJoinInfo implements Writable{
private String customerName="";
private String customerAddr="";
private String orderID="";
private int orderNum;
private String productID="";
private String productBrand="";
private String productModel="";
// 0是產品,1是訂單
private int flag;
setter/getter
編寫Mapper
public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> {
private static Logger logger = LogManager.getLogger(RJoinMapper.class);
private RJoinInfo rJoinInfo = new RJoinInfo();
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 輸入方式支持很多中包括數據庫等等。這里用的是文件,因此可以直接強轉為文件切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
// 獲取文件名稱
String name = fileSplit.getPath().getName();
logger.info("splitPathName:"+name);
String line = value.toString();
String[] split = line.split("\t");
String productID = "";
if(name.contains("product")){
productID = split[0];
String setProductBrand = split[1];
String productModel = split[2];
rJoinInfo.setProductID(productID);
rJoinInfo.setProductBrand(setProductBrand);
rJoinInfo.setProductModel(productModel);
rJoinInfo.setFlag(0);
}else if(name.contains("orders")){
String orderID = split[0];
String customerName = split[1];
String cutsomerAddr = split[2];
productID = split[3];
String orderNum = split[4];
rJoinInfo.setProductID(productID);
rJoinInfo.setCustomerName(customerName);
rJoinInfo.setCustomerAddr(cutsomerAddr);
rJoinInfo.setOrderID(orderID);
rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
rJoinInfo.setFlag(1);
}
k.set(productID);
context.write(k,rJoinInfo);
}
}
代碼解釋,這里根據split的文件名,判斷是products還是orders,
然后根據是product還是orders獲取不同的數據,最用都以productID為Key發送給Reduce端
編寫Reducer
public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> {
private static Logger logger = LogManager.getLogger(RJoinReducer.class);
@Override
protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException {
List<RJoinInfo> orders = new ArrayList<>();
String productID = key.toString();
logger.info("productID:"+productID);
RJoinInfo rJoinInfo = new RJoinInfo();
for (RJoinInfo value : values) {
int flag = value.getFlag();
if (flag == 0) {
// 產品
try {
BeanUtils.copyProperties(rJoinInfo,value);
} catch (IllegalAccessException e) {
logger.error(e.getMessage());
} catch (InvocationTargetException e) {
logger.error(e.getMessage());
}
}else {
// 訂單
RJoinInfo orderInfo = new RJoinInfo();
try {
BeanUtils.copyProperties(orderInfo,value);
} catch (IllegalAccessException e) {
logger.error(e.getMessage());
} catch (InvocationTargetException e) {
logger.error(e.getMessage());
}
orders.add(orderInfo);
}
}
for (RJoinInfo order : orders) {
rJoinInfo.setOrderNum(order.getOrderNum());
rJoinInfo.setOrderID(order.getOrderID());
rJoinInfo.setCustomerName(order.getCustomerName());
rJoinInfo.setCustomerAddr(order.getCustomerAddr());
// 只輸出key即可,value可以使用nullwritable
context.write(rJoinInfo,NullWritable.get());
}
}
}
代碼解釋:根據productID會分為不同的組發到reduce端,reduce端拿到后一組數據后,其中有一個產品對象和多個訂單對象。
遍歷每一個對象,根據flag區分產品和訂單。保存產品對象,獲取每個訂單對象到一個集合中。當我們對每個對象都分好
類后,遍歷訂單集合將訂單和產品信息集合,然后輸出。
注意:我們這里效率雖然不是最高的,主要是想說明join的思路。
編寫Driver
public class RJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// conf.set("mapreduce.framework.name","yarn");
// conf.set("yarn.resourcemanager.hostname","server1");
// conf.set("fs.defaultFS","hdfs://server1:9000");
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
// 如果是本地運行,可以不用設置jar包的路徑,因為不用拷貝jar到其他地方
job.setJarByClass(RJoinDriver.class);
// job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar");
job.setMapperClass(RJoinMapper.class);
job.setReducerClass(RJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(RJoinInfo.class);
job.setOutputKeyClass(RJoinInfo.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output"));
boolean waitForCompletion = job.waitForCompletion(true);
System.out.println(waitForCompletion);
}
}
上面實現的這種方式有個缺點,就是join操作是在reduce階段完成的,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜
實現方式二-map端join
這種方式適用於關聯表中有小表的情形:
可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join操作並輸出結果,
可以大大提高join操作的並發度,加快處理速度。
編寫Mapper
在Mapper端我們一次性加載數據或者用Distributedbache將文件拷貝到每一個運行的maptask的節點上加載
這里我們使用第二種,在mapper類中定義好小表進行join
static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{
private static Map<String, RJoinInfo> productMap = new HashMap<>();
// 在循環調用map方法之前會先調用setup方法。因此我們可以在setup方法中,先對文件進行處理
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用
URI[] cacheFiles = context.getCacheFiles();
System.out.println(Arrays.toString(new URI[]{cacheFiles[0]}));
// 直接指定名字,默認在工作文件夾的目錄下查找 1⃣
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){
String line;
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split("\t");
String productID = split[0];
String setProductBrand = split[1];
String productModel = split[2];
RJoinInfo rJoinInfo = new RJoinInfo();
rJoinInfo.setProductID(productID);
rJoinInfo.setProductBrand(setProductBrand);
rJoinInfo.setProductModel(productModel);
rJoinInfo.setFlag(0);
productMap.put(productID, rJoinInfo);
}
}
super.setup(context);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String name = fileSplit.getPath().getName();
if (name.contains("orders")) {
String line = value.toString();
String[] split = line.split("\t");
String orderID = split[0];
String customerName = split[1];
String cutsomerAddr = split[2];
String productID = split[3];
String orderNum = split[4];
RJoinInfo rJoinInfo = productMap.get(productID);
rJoinInfo.setProductID(productID);
rJoinInfo.setCustomerName(customerName);
rJoinInfo.setCustomerAddr(cutsomerAddr);
rJoinInfo.setOrderID(orderID);
rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
rJoinInfo.setFlag(1);
context.write(rJoinInfo, NullWritable.get());
}
}
}
代碼解釋:這里我們又重寫了一個setup()方法,這個方法會在執行map()方法前先執行,因此我們可以在這個方法中事先加載好數據。
在上述代碼中,我們直接指定名字就拿到了product.txt文件,這個究竟這個文件是怎么復制在maptask的節點上的呢,還要看下面的driver
編寫Driver
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJarByClass(RJoinDemoInMapDriver.class);
job.setMapperClass(RjoinMapper.class);
job.setOutputKeyClass(RJoinInfo.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2"));
// 指定需要緩存一個文件到所有的maptask運行節點工作目錄
// job.addFileToClassPath(); 將普通文件緩存到task運行節點的classpath下
// job.addArchiveToClassPath();緩存jar包到task運行節點的classpath下
// job.addCacheArchive();緩存壓縮包文件到task運行節點的工作目錄
// job.addCacheFile();將普通文件 1⃣
job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt"));
// 設置reduce的數量為0
job.setNumReduceTasks(0);
boolean waitForCompletion = job.waitForCompletion(true);
System.out.println(waitForCompletion);
}
代碼解釋:上述Driver中,我們通過job.addCacheFile()指定了一個URI本地地址,運行時mapreduce就會將這個文件拷貝到maptask的運行工作目錄中。
好啦~本期分享代碼量偏多,主要是想分享如何使用mapreduce進行join操作的思路。下一篇我會再講一下 計算共同好友的思路以及代碼~
公眾號搜索:喜訊XiCent 獲取更多福利資源~~~~
本文由博客一文多發平台 OpenWrite 發布!