MapReduce實現的Join


MapReduce Join

對兩份數據data1和data2進行關鍵詞連接是一個很通用的問題,如果數據量比較小,可以在內存中完成連接。

如果數據量比較大,在內存進行連接操會發生OOM。mapreduce join可以用來解決大數據的連接。

1 思路

1.1 reduce join

在map階段, 把關鍵字作為key輸出,並在value中標記出數據是來自data1還是data2。因為在shuffle階段已經自然按key分組,reduce階段,判斷每一個value是來自data1還是data2,在內部分成2組,做集合的乘積。

這種方法有2個問題:

1, map階段沒有對數據瘦身,shuffle的網絡傳輸和排序性能很低。

2, reduce端對2個集合做乘積計算,很耗內存,容易導致OOM。

1.2 map join

兩份數據中,如果有一份數據比較小,小數據全部加載到內存,按關鍵字建立索引。大數據文件作為map的輸入文件,對map()函數每一對輸入,都能夠方便地和已加載到內存的小數據進行連接。把連接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組的,並且連接好了的數據。

這種方法,要使用hadoop中的DistributedCache把小數據分布到各個計算節點,每個map節點都要把小數據庫加載到內存,按關鍵字建立索引。

這種方法有明顯的局限性:有一份數據比較小,在map端,能夠把它加載到內存,並進行join操作。

1.3 使用內存服務器,擴大節點的內存空間

針對map join,可以把一份數據存放到專門的內存服務器,在map()方法中,對每一個<key,value>的輸入對,根據key到內存服務器中取出數據,進行連接

1.4 使用BloomFilter過濾空連接的數據

對其中一份數據在內存中建立BloomFilter,另外一份數據在連接之前,用BloomFilter判斷它的key是否存在,如果不存在,那這個記錄是空連接,可以忽略。

1.5 使用mapreduce專為join設計的包

在mapreduce包里看到有專門為join設計的包,對這些包還沒有學習,不知道怎么使用,只是在這里記錄下來,作個提醒。

jar: mapreduce-client-core.jar

package: org.apache.hadoop.mapreduce.lib.join

2 實現map join

相對而言,map join更加普遍,下面的代碼使用DistributedCache實現map join

2.1 背景

有客戶數據customer和訂單數據orders。

customer

客戶編號 姓名 地址 電話
1 hanmeimei ShangHai 110
2 leilei BeiJing 112
3 lucy GuangZhou 119

** order**

訂單編號 客戶編號 其它字段被忽略
1 1 50
2 1 200
3 3 15
4 3 350
5 3 58
6 1 42
7 1 352
8 2 1135
9 2 400
10 2 2000
11 2 300

要求對customer和orders按照客戶編號進行連接,結果要求對客戶編號分組,對訂單編號排序,對其它字段不作要求

客戶編號 訂單編號 訂單金額 姓名 地址 電話
1 1 50 hanmeimei ShangHai 110
1 2 200 hanmeimei ShangHai 110
1 6 42 hanmeimei ShangHai 110
1 7 352 hanmeimei ShangHai 110
2 8 1135 leilei BeiJing 112
2 9 400 leilei BeiJing 112
2 10 2000 leilei BeiJing 112
2 11 300 leilei BeiJing 112
3 3 15 lucy GuangZhou 119
3 4 350 lucy GuangZhou 119
3 5 58 lucy GuangZhou 119
  1. 在提交job的時候,把小數據通過DistributedCache分發到各個節點。
  2. map端使用DistributedCache讀到數據,在內存中構建映射關系--如果使用專門的內存服務器,就把數據加載到內存服務器,map()節點可以只保留一份小緩存;如果使用BloomFilter來加速,在這里就可以構建;
  3. map()函數中,對每一對<key,value>,根據key到第2)步構建的映射里面中找出數據,進行連接,輸出。

2.2 程序實現

public class Join extends Configured implements Tool {
	// customer文件在hdfs上的位置。
	// TODO: 改用參數傳入
	private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
	private static class CustomerBean {		
		private int custId;
		private String name;
		private String address;
		private String phone;
		
		public CustomerBean() {}
		
		public CustomerBean(int custId, String name, String address,
				String phone) {
			super();
			this.custId = custId;
			this.name = name;
			this.address = address;
			this.phone = phone;
		}

		
		
		public int getCustId() {
			return custId;
		}

		public String getName() {
			return name;
		}

		public String getAddress() {
			return address;
		}

		public String getPhone() {
			return phone;
		}
	}
	
	private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
		private int custId;
		private int orderId;

		public void set(int custId, int orderId) {
			this.custId = custId;
			this.orderId = orderId;
		}
		
		public int getCustId() {
			return custId;
		}
		
		public int getOrderId() {
			return orderId;
		}
		
		@Override
		public void write(DataOutput out) throws IOException {
			out.writeInt(custId);
			out.writeInt(orderId);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			custId = in.readInt();
			orderId = in.readInt();
		}

		@Override
		public int compareTo(CustOrderMapOutKey o) {
			int res = Integer.compare(custId, o.custId);
			return res == 0 ? Integer.compare(orderId, o.orderId) : res;
		}
		
		@Override
		public boolean equals(Object obj) {
			if (obj instanceof CustOrderMapOutKey) {
				CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
				return custId == o.custId && orderId == o.orderId;
			} else {
				return false;
			}
		}
		
		@Override
		public String toString() {
			return custId + "\t" + orderId;
		}
	}
	
	private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
		private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
		private final Text outputValue = new Text();
		
		/**
		 * 在內存中customer數據
		 */
		private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			
			// 格式: 訂單編號	客戶編號	訂單金額
			String[] cols = value.toString().split("\t");			
			if (cols.length < 3) {
				return;
			}
			
			int custId = Integer.parseInt(cols[1]);		// 取出客戶編號
			CustomerBean customerBean = CUSTOMER_MAP.get(custId);
			
			if (customerBean == null) {			// 沒有對應的customer信息可以連接
				return;
			}
			
			StringBuffer sb = new StringBuffer();
			sb.append(cols[2])
				.append("\t")
				.append(customerBean.getName())
				.append("\t")
				.append(customerBean.getAddress())
				.append("\t")
				.append(customerBean.getPhone());
			
			outputValue.set(sb.toString());
			outputKey.set(custId, Integer.parseInt(cols[0]));
			
			context.write(outputKey, outputValue);
		}
		
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
			FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));
			
			BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
			String line = null;
			String[] cols = null;
			
			// 格式:客戶編號	姓名	地址	電話
			while ((line = reader.readLine()) != null) {
				cols = line.split("\t");
				if (cols.length < 4) {				// 數據格式不匹配,忽略
					continue;
				}
				
				CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
				CUSTOMER_MAP.put(bean.getCustId(), bean);
			}
		}
	}

	/**
	 * reduce
	 * @author Ivan
	 *
	 */
	private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
		@Override
		protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			// 什么事都不用做,直接輸出
			for (Text value : values) {
				context.write(key, value);
			}
		}
	}
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		if (args.length < 2) {
			new IllegalArgumentException("Usage: <inpath> <outpath>");
			return;
		}
		
		ToolRunner.run(new Configuration(), new Join(), args);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = Job.getInstance(conf, Join.class.getSimpleName());
		job.setJarByClass(SecondarySortMapReduce.class);
		
		// 添加customer cache文件
		job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// map settings
		job.setMapperClass(JoinMapper.class);
		job.setMapOutputKeyClass(CustOrderMapOutKey.class);
		job.setMapOutputValueClass(Text.class);
		
		// reduce settings
		job.setReducerClass(JoinReducer.class);
		job.setOutputKeyClass(CustOrderMapOutKey.class);
		job.setOutputKeyClass(Text.class);
		
		boolean res = job.waitForCompletion(true);
		
		return res ? 0 : 1;
	}
}
運行環境
  • 操作系統: Centos 6.4
  • Hadoop: Apache Hadoop-2.5.0

客戶數據文件在hdfs上的位置硬編碼為
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 運行程序之前先把客戶數據上傳到這個位置。

  • 程序運行結果

@Hadoop中兩表JOIN的處理方法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM