一、Mapjoin案例
1.需求:有兩個文件,分別是訂單表、商品表,
訂單表有三個屬性分別為訂單時間、商品id、訂單id(表示內容量大的表),
商品表有兩個屬性分別為商品id、商品名稱(表示內容量小的表,用於加載到內存),
要求結果文件為在訂單表中的每一行最后添加商品id對應的商品名稱。
2.解決思路:
將商品表加載到內存中,然后再map方法中將訂單表中的商品id對應的商品名稱添加到該行的最后,不需要Reducer,並在Driver執行類中設置setCacheFile和numReduceTask。
3.代碼如下:
public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
HashMap<String, String> pdMap = new HashMap<>();
//1.商品表加載到內存
protected void setup(Context context) throws IOException {
//加載緩存文件
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8"));
String line;
while(StringUtils.isNotEmpty(line = br.readLine()) ) {
//切分
String[] fields = line.split("\t");
//緩存
pdMap.put(fields[0], fields[1]);
}
br.close();
}
//2.map傳輸
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
//獲取數據
String line = value.toString();
//切割
String[] fields = line.split("\t");
//獲取訂單中商品id
String pid = fields[1];
//根據訂單商品id獲取商品名
String pName = pdMap.get(pid);
//拼接數據
line = line + "\t" + pName;
//輸出
context.write(new Text(line), NullWritable.get());
}
}
public class CacheDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 1.獲取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.獲取jar包
job.setJarByClass(CacheDriver.class);
// 3.獲取自定義的mapper與reducer類
job.setMapperClass(CacheMapper.class);
// 5.設置reduce輸出的數據類型(最終的數據類型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6.設置輸入存在的路徑與處理后的結果路徑
FileInputFormat.setInputPaths(job, new Path("c://table1029//in"));
FileOutputFormat.setOutputPath(job, new Path("c://table1029//out"));
//加載緩存商品數據
job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));
//設置一下reducetask的數量
job.setNumReduceTasks(0);
// 7.提交任務
boolean rs = job.waitForCompletion(true);
System.out.println(rs ? 0 : 1);
}
}
二、Reducejoin案例
1.需求:同上的兩個數據文件,要求將訂單表中的商品id替換成對應的商品名稱。
2.解決思路:封裝TableBean類,包含屬性:時間、商品id、訂單id、商品名稱、flag(flag用來判斷是哪張表),
使用Mapper讀兩張表,通過context對象獲取切片對象,然后通過切片獲取切片名稱和路徑的字符串來判斷是哪張表,再將切片的數據封裝到TableBean對象,最后以產品id為key、TableBean對象為value傳輸到Reducer端;
Reducer接收數據后通過flag判斷是哪張表,因為一個reduce中的所有數據的key是相同的,將商品表的商品id和商品名稱讀入到一個TableBean對象中,然后將訂單表的中的數據讀入到TableBean類型的ArrayList對象中,然后將ArrayList中的每個TableBean的商品id替換為商品名稱,然后遍歷該數組以TableBean為key輸出。
3.代碼如下:
/**
* @author: PrincessHug
* @date: 2019/3/30, 2:37
* @Blog: https://www.cnblogs.com/HelloBigTable/
*/
public class TableBean implements Writable {
private String timeStamp;
private String productId;
private String orderId;
private String productName;
private String flag;
public TableBean() {
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(timeStamp);
out.writeUTF(productId);
out.writeUTF(orderId);
out.writeUTF(productName);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
timeStamp = in.readUTF();
productId = in.readUTF();
orderId = in.readUTF();
productName = in.readUTF();
flag = in.readUTF();
}
@Override
public String toString() {
return timeStamp + "\t" + productName + "\t" + orderId;
}
}
public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//通過切片獲取文件信息
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();
//獲取一行數據、定義TableBean對象
String line = value.toString();
TableBean tb = new TableBean();
Text t = new Text();
//判斷是哪一張表
if (name.contains("order.txt")){
String[] fields = line.split("\t");
tb.setTimeStamp(fields[0]);
tb.setProductId(fields[1]);
tb.setOrderId(fields[2]);
tb.setProductName("");
tb.setFlag("0");
t.set(fields[1]);
}else {
String[] fields = line.split("\t");
tb.setTimeStamp("");
tb.setProductId(fields[0]);
tb.setOrderId("");
tb.setProductName(fields[1]);
tb.setFlag("1");
t.set(fields[0]);
}
context.write(t,tb);
}
}
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//分別創建用來存儲訂單表和產品表的集合
ArrayList<TableBean> orderBean = new ArrayList<>();
TableBean productBean = new TableBean();
//遍歷values,通過flag判斷是產品表還是訂單表
for (TableBean v:values){
if (v.getFlag().equals("0")){
TableBean tableBean = new TableBean();
try {
BeanUtils.copyProperties(tableBean,v);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBean.add(tableBean);
}else {
try {
BeanUtils.copyProperties(productBean,v);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//拼接表
for (TableBean ob:orderBean) {
ob.setProductName(productBean.getProductName());
context.write(ob,NullWritable.get());
}
}
}
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//jar包
job.setJarByClass(TableDriver.class);
//Mapper、Reducer
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
//Mapper輸出數據類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
//Reducer輸出數據類型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
//輸入輸出路徑
FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\reducejoin\\in"));
FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\reducejoin\\out"));
//提交任務
if (job.waitForCompletion(true)){
System.out.println("運行完成!");
}else {
System.out.println("運行失敗!");
}
}
}
