@
Reduce Join原理
Map
端的主要工作:為來自不同表或文件的key/value
對,打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。Reduce
端的主要工作:在Reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在Map階段已經打標志)分開,最后進行合並就ok了。- 該方法的缺點:這種方式的缺點很明顯就是會造成Map和Reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。
案例實操
需求分析
通過將關聯條件作為Map輸出的key,將兩表滿足Join條件的數據並攜帶數據所來源的文件信息,發往同一個ReduceTask,在Reduce中進行數據的串聯。
MR分析
替換的前提是: 相同pid
的數據,需要分到同一個區
0號區: 1001 01 1
01 小米
1號區: 1002 02 2
02 華為
注意:
- 分區時,以pid為條件進行分區!
- 兩種不同的數據,經過同一個Mapper的map()處理,因此需要在map()中,判斷切片數據的來源,根據來源執行不同的封裝策略
- 一個Mapper只能處理一種切片的數據,所以在Map階段無法完成join操作,需要在reduce中實現Join
- 在Map階段,封裝數據。 自定義的Bean需要能夠封裝,兩個切片中的所有的數據
- 在reduce輸出時,只需要將來自於order.txt中的數據,將pid替換為pname,而不需要輸出所有的key-value
- 在Map階段對數據打標記,標記哪些key-value屬於order.txt,哪些屬於pd.txt
order.txt---->切片(orderId,pid,amount)----JoinMapper.map()------>JoinReducer
pd.txt----->切片(pid,pname)----JoinMapper.map()
MR實現
Mapper:
keyin-valuein:
map:
keyout=valueout:
Reducer:
keyin-valuein:
reduce:
keyout=valueout:
ReduceJoin
ReduceJoin需要在Reduce階段實現Join功能,一旦數據量過大,效率低!
后面有一種方法使用MapJoin解決ReduceJoin低效的問題!
代碼實現
創建商品和訂合並后的Bean類,JoinBean.java
public class JoinBean implements Writable{
private String orderId;
private String pid;
private String pname;
private String amount;
private String source;
@Override
public String toString() {
return orderId + "\t" + pname + "\t" + amount ;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getAmount() {
return amount;
}
public void setAmount(String amount) {
this.amount = amount;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(pid);
out.writeUTF(pname);
out.writeUTF(amount);
out.writeUTF(source);
}
@Override
public void readFields(DataInput in) throws IOException {
orderId=in.readUTF();
pid=in.readUTF();
pname=in.readUTF();
amount=in.readUTF();
source=in.readUTF();
}
}
編寫Mapper類,ReduceJoinMapper.java
/*
* Map階段無法完成Join,只能封裝數據,在Reduce階段完成Join
*
* 1. order.txt: 1001 01 1
* pd.txt : 01 小米
*
* 2. Bean必須能封裝所有的數據
*
* 3. Reduce只需要輸出來自於order.txt的數據,需要在Mapper中對數據打標記,標記數據的來源
*
* 4. 在Mapper中需要獲取當前切片的來源,根據來源執行不同的封裝邏輯
*/
public class ReduceJoinMapper extends Mapper<LongWritable, Text, NullWritable, JoinBean>{
private NullWritable out_key=NullWritable.get();
private JoinBean out_value=new JoinBean();
private String source;
// setUp()在map()之前先運行,只運行一次
@Override
protected void setup(Mapper<LongWritable, Text, NullWritable, JoinBean>.Context context)
throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
FileSplit split=(FileSplit) inputSplit;
source=split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, JoinBean>.Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
//打標記
out_value.setSource(source);
if (source.equals("order.txt")) {
out_value.setOrderId(words[0]);
out_value.setPid(words[1]);
out_value.setAmount(words[2]);
// 保證所有的屬性不為null
out_value.setPname("nodata");
}else {
out_value.setPid(words[0]);
out_value.setPname(words[1]);
// 保證所有的屬性不為null
out_value.setOrderId("nodata");
out_value.setAmount("nodata");
}
context.write(out_key, out_value);
}
}
自定義分區器,MyPartitioner.java
/*
* 1.保證pid相同的key-value分到同一個區
*/
public class MyPartitioner extends Partitioner<NullWritable, JoinBean>{
@Override
public int getPartition(NullWritable key, JoinBean value, int numPartitions) {
return (value.getPid().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
編寫Reducer類,JoinBeanReducer.java
/*
* order.txt: 1001 01 1
* pd.txt : 01 小米
* orderid,pid,amount,source,pname
* 1. (null,1001,01,1,order.txt,nodata)
* (null,nodata,01,nodata,pd.txt,小米)
*
* 2. 在輸出之前,需要把數據按照source屬性分類
* 只能在reduce中分類
*/
public class JoinBeanReducer extends Reducer<NullWritable, JoinBean, NullWritable, JoinBean>{
//分類的集合
private List<JoinBean> orderDatas=new ArrayList<>();
private Map<String, String> pdDatas=new HashMap<>();
//根據source分類
@Override
protected void reduce(NullWritable key, Iterable<JoinBean> values,
Reducer<NullWritable, JoinBean, NullWritable, JoinBean>.Context arg2)
throws IOException, InterruptedException {
for (JoinBean value : values) {
if (value.getSource().equals("order.txt")) {
// 將value對象的屬性數據取出,封裝到一個新的JoinBean中
// 因為value至始至終都是同一個對象,只不過每次迭代,屬性會隨之變化
JoinBean joinBean = new JoinBean();
try {
BeanUtils.copyProperties(joinBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderDatas.add(joinBean);
}else {
//數據來源於pd.txt
pdDatas.put(value.getPid(), value.getPname());
}
}
}
// Join數據,寫出
@Override
protected void cleanup(Reducer<NullWritable, JoinBean, NullWritable, JoinBean>.Context context)
throws IOException, InterruptedException {
//只輸出來自orderDatas的數據
for (JoinBean joinBean : orderDatas) {
// 從Map中根據pid取出pname,設置到bean的pname屬性中
joinBean.setPname(pdDatas.get(joinBean.getPid()));
context.write(NullWritable.get(), joinBean);
}
}
}
編寫驅動類,CustomIFDriver.java
public class CustomIFDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("e:/mrinput/reducejoin");
Path outputPath=new Path("e:/mroutput/reducejoin");
//作為整個Job的配置
Configuration conf = new Configuration();
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①創建Job
Job job = Job.getInstance(conf);
job.setJarByClass(CustomIFDriver.class);
// 為Job創建一個名字
job.setJobName("wordcount");
// ②設置Job
// 設置Job運行的Mapper,Reducer類型,Mapper,Reducer輸出的key-value類型
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(JoinBeanReducer.class);
// Job需要根據Mapper和Reducer輸出的Key-value類型准備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化
// 如果Mapper和Reducer輸出的Key-value類型一致,直接設置Job最終的輸出類型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(JoinBean.class);
// 設置輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 設置分區器
job.setPartitionerClass(MyPartitioner.class);
//需要Join的數據量過大 order.txt 10億,pd.txt 100w,提高MR並行運行的效率
// Map階段: 修改片大小,切的片多,MapTask運行就多
// Reduce階段: 修改ReduceTask數量
//可以設置ReduceTasks的數量,默認為1,將輸出在一個文件。
//在此案例中,如果是3,則分為三個文件。如果超過三,其余文件則是空的。
//job.setNumReduceTasks(3);
// ③運行Job
job.waitForCompletion(true);
}
}
運行結果: