-
概念:
所謂的分布式緩存指的是 hadoop框架可以把用戶指定的小文件發送到各個maptask運行的機器上,進行緩存,便於maptask讀取該文件內容 進行關聯查詢操作,這就是所謂的map端join。
-
適用場合:
-
通常適用於大文件關聯小文件,把小文件進行分布式緩存。
-
-
舉例說明:
訂單數據 商品數據 把商品進行分布式緩存 通過maptask每處理一個訂單 關聯一次商品數據
-
如何高效的利用分布式緩存。
-
hadoop DistributedCache可以把小文件分發到每個maptask那里,但是總不能每處理一條記錄去讀取一次這個緩存的小文件把?
- 既然文件不大,就可以把小文件加載的maptask運行的內存中,也就是創建數據集合 保存該小文件。
-
在什么時間把小文件加載到內存中?
-
protected void setup(Context context) throws IOException, InterruptedException { // NOTHING }
1protected void setup(Context context) throws IOException, InterruptedException {
2// NOTHING
3}
重寫父類的setup方法 該方法會在map方法調用之前調用一次,且調用一次,初始化方法。
在該方法中完成針對分布式小文件的緩存,加載到內存中,便於后續的map方法調用處理。
-
如何使用分布式緩存
-
添加緩存文件: // job.addArchiveToClassPath(archive);緩存 jar 包到 task 運行節點的 classpath 中 // job.addCacheArchive(uri);緩存壓縮包到 task 運行節點的工作目錄 // job.addFileToClassPath(file);//緩存普通文件到 task 運行節點的 classpath 中
441添加緩存文件:
2// job.addArchiveToClassPath(archive);緩存 jar 包到 task 運行節點的 classpath 中
3// job.addCacheArchive(uri);緩存壓縮包到 task 運行節點的工作目錄
4// job.addFileToClassPath(file);//緩存普通文件到 task 運行節點的 classpath 中
- 如果涉及大文件也需要進行分布式緩存執行?
- 調整maptask中默認可以使用內存大小的上限
mapreduce.map.memory.mb
11mapreduce.map.memory.mb
- 緩存數據到nosql數據庫,比如redis mogodb。 內存數據庫。
代碼舉例:
MapSideJoin
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapSideJoin {
static class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
//用來緩存小文件(商品文件中的數據)
Map<String, String> produceMap = new HashMap<String,String>();
Text k = new Text();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//將商品文件中的數據寫到緩存中 千萬別寫成/ pdts.txt否則會提示找不到該文件
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt")));
String line = null;
while((line=br.readLine())!=null){
//一行數據格式為P0001,xiaomi(商品id,商品名稱)
String[] fields = line.split(",");
produceMap.put(fields[0], fields[1]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//一行訂單數據 格式為 1001,20170710,P0001,1(訂單id,創建時間,商品id,購買商品數量)
String line = value.toString();
String[] fields = line.split(",");
//根據訂單數據中商品id在緩存中找出來對應商品信息(商品名稱),進行串接
String productName = produceMap.get(fields[2]);
k.set(line+"\t"+productName);
context.write(k, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//jar包位置
job.setJarByClass(MapSideJoin.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//設置最終輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定需要緩存一個文件到所有的maptask運行節點工作目錄
// job.addArchiveToClassPath(archive);緩存jar包到task運行節點的classpath中
// job.addCacheArchive(uri);緩存壓縮包到task運行節點的工作目錄
// job.addFileToClassPath(file);//緩存普通文件到task運行節點的classpath中
//將產品表文件緩存到task工作節點的工作目錄中去
//緩存普通文件到task運行節點的工作目錄(hadoop幫我們完成)
job.addCacheFile(new URI("/mapjoincache/pdts.txt"));
//不需要reduce,那么也就沒有了shuffle過程
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path("/test/mapjoininput/"));
FileOutputFormat.setOutputPath(job, new Path("/test/mapjoinoutput"));
boolean ex = job.waitForCompletion(true);
System.exit(ex?0:1);
}
}
x
1
import org.apache.hadoop.conf.Configuration;
2
import org.apache.hadoop.fs.Path;
3
import org.apache.hadoop.io.LongWritable;
4
import org.apache.hadoop.io.NullWritable;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Job;
7
import org.apache.hadoop.mapreduce.Mapper;
8
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
9
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10
import java.io.BufferedReader;
11
import java.io.FileInputStream;
12
import java.io.IOException;
13
import java.io.InputStreamReader;
14
import java.net.URI;
15
import java.util.HashMap;
16
import java.util.Map;
17
18
public class MapSideJoin {
19
static class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
20
//用來緩存小文件(商品文件中的數據)
21
Map<String, String> produceMap = new HashMap<String,String>();
22
Text k = new Text();
23
24
25
protected void setup(Context context)
26
throws IOException, InterruptedException {
27
//將商品文件中的數據寫到緩存中 千萬別寫成/ pdts.txt否則會提示找不到該文件
28
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt")));
29
String line = null;
30
while((line=br.readLine())!=null){
31
//一行數據格式為P0001,xiaomi(商品id,商品名稱)
32
String[] fields = line.split(",");
33
produceMap.put(fields[0], fields[1]);
34
}
35
}
36
37
protected void map(LongWritable key, Text value, Context context)
38
throws IOException, InterruptedException {
39
//一行訂單數據 格式為 1001,20170710,P0001,1(訂單id,創建時間,商品id,購買商品數量)
40
String line = value.toString();
41
String[] fields = line.split(",");
42
//根據訂單數據中商品id在緩存中找出來對應商品信息(商品名稱),進行串接
43
String productName = produceMap.get(fields[2]);
44
k.set(line+"\t"+productName);
45
context.write(k, NullWritable.get());
46
}
47
}
48
public static void main(String[] args) throws Exception {
49
Configuration conf = new Configuration();
50
Job job = Job.getInstance(conf);
51
//jar包位置
52
job.setJarByClass(MapSideJoin.class);
53
54
job.setMapperClass(MapJoinMapper.class);
55
job.setMapOutputKeyClass(Text.class);
56
job.setMapOutputValueClass(NullWritable.class);
57
//設置最終輸出類型
58
job.setOutputKeyClass(Text.class);
59
job.setOutputValueClass(NullWritable.class);
60
//指定需要緩存一個文件到所有的maptask運行節點工作目錄
61
// job.addArchiveToClassPath(archive);緩存jar包到task運行節點的classpath中
62
// job.addCacheArchive(uri);緩存壓縮包到task運行節點的工作目錄
63
// job.addFileToClassPath(file);//緩存普通文件到task運行節點的classpath中
64
65
//將產品表文件緩存到task工作節點的工作目錄中去
66
//緩存普通文件到task運行節點的工作目錄(hadoop幫我們完成)
67
job.addCacheFile(new URI("/mapjoincache/pdts.txt"));
68
69
//不需要reduce,那么也就沒有了shuffle過程
70
job.setNumReduceTasks(0);
71
72
FileInputFormat.setInputPaths(job, new Path("/test/mapjoininput/"));
73
FileOutputFormat.setOutputPath(job, new Path("/test/mapjoinoutput"));
74
75
boolean ex = job.waitForCompletion(true);
76
System.exit(ex?0:1);
77
}
78
}
79