MapReduce關系代數運算


常見關系代數運算包括:選擇、投影、並、交、差以及自然連接操作等,都可以十分容易利用MapReduce框架進行並行化計算

 

關系R
NAME SEX AGE
小明 25
小紅 18
小張 22
小米 23
小麗 21
小王 19
小美 25
小朱 26

選擇操作

將關系R的數據存儲在relationR文件,然后移入HDFS下的data文件夾,如代碼1-1

代碼1-1

root@lejian:/data# cat relationR
小明 男 25
小紅 女 18
小張 男 22
小米 女 23
小麗 女 21
小王 男 19
小美 女 25
小朱 女 26
root@lejian:/data# hadoop fs -put selection /data
root@lejian:/data# hadoop fs -ls -R /data
-rw-r--r--   1 root supergroup        112 2017-01-07 15:03 /data/relationR

 

對於關系R的應用條件C,選擇性別為女的數據,只需在Map階段對每個輸入的記錄進行判斷,將滿足條件的數據輸出即可,輸出鍵值為(key,null)。Reduce階段無需做額外的工作

代碼1-2

<?xml version="1.0"?>
<configuration>
	<property>
		<name>sex</name>
		<value>女</value>
	</property>
</configuration>

 

代碼1-3

package com.hadoop.mapreduce;

public class Person {

	private String name;
	private String sex;
	private int age;

	public Person(String line) {
		super();
		String[] lines = line.split(" ");
		this.name = lines[0];
		this.sex = lines[1];
		this.age = Integer.parseInt(lines[2]);
	}

	public String getName() {
		return name;
	}

	public String getSex() {
		return sex;
	}

	public int getAge() {
		return age;
	}

	public String getVal(String col) {
		if ("name".equals(col)) {
			return name;
		}
		if ("sex".equals(col)) {
			return sex;
		}
		return age + "";
	}

	@Override
	public String toString() {
		return name + " " + sex + " " + age;
	}

}

 

代碼1-4

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SelectionMap extends Mapper<LongWritable, Text, Text, NullWritable> {

	private String sex = "";
	private Text val = new Text();

	protected void setup(Context context) throws java.io.IOException, InterruptedException {
		Configuration conf = context.getConfiguration();
		sex = conf.get("sex");
	};

	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
		Person person = new Person(value.toString());
		if (sex.equals(person.getVal("sex"))) {
			val.set(person.toString());
			context.write(val, NullWritable.get());
		}
	};

}

 

代碼1-5

package com.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Selection {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		if (args == null || args.length != 2) {
			throw new RuntimeException("請輸入輸入路徑、輸出路徑");
		}
		Configuration conf = new Configuration();
		conf.addResource("conf.xml");
		Job job = Job.getInstance(conf);
		job.setJobName("Selection");
		job.setMapperClass(SelectionMap.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.addInputPaths(job, args[0]);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

 

運行代碼1-5,運行結果如代碼1-6

代碼1-6

root@lejian:/data# hadoop jar selection.jar com.hadoop.mapreduce.Selection /data /output
…………
root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r--   1 root supergroup          0 2017-01-07 15:05 /output/_SUCCESS
-rw-r--r--   1 root supergroup         70 2017-01-07 15:05 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
小麗 女 21
小朱 女 26
小米 女 23
小紅 女 18
小美 女 25

 

投影操作

例如在關系R上應用投影操作獲得屬性AGE的所有值,我們只需要在Map階段將每條記錄的AGE屬性和NullWritable輸出,而Reduce端僅獲取key即可,注意,此時投影操作具有去重功能

代碼1-7

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ProjectionMap extends Mapper<LongWritable, Text, IntWritable, NullWritable> {

	private IntWritable age = new IntWritable();

	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
		Person person = new Person(value.toString());
		age.set(person.getAge());
		context.write(age, NullWritable.get());
	};

}

 

代碼1-8

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class ProjectionReduce extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {

	protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws java.io.IOException, InterruptedException {
		context.write(key, NullWritable.get());
	};

}

 

代碼1-9

package com.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Projection {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		if (args == null || args.length != 2) {
			throw new RuntimeException("請輸入輸入路徑、輸出路徑");
		}
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("Projection");
		job.setMapperClass(ProjectionMap.class);
		job.setReducerClass(ProjectionReduce.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.addInputPaths(job, args[0]);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

 

運行代碼1-9,運行結果如代碼1-10

代碼1-10

root@lejian:/data# hadoop jar projection.jar com.hadoop.mapreduce.Projection /data /output
…………
root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r--   1 root supergroup          0 2017-01-07 15:52 /output/_SUCCESS
-rw-r--r--   1 root supergroup         21 2017-01-07 15:52 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
18
19
21
22
23
25
26

 

交運算

如果有一個關系A和關系B為同一個模式,希望得到關系A和關系B的交集,那么在Map階段對於A和B中的每一條記錄r輸出(r,1),在Reduce階段匯總計數,如果計數為2,則將該條記錄輸出。依舊以Person類為例,這里把Person作為主鍵,為了使得關系A和關系B相同的Person發送到同一個Reduce節點進行計算,需要對原先代碼1-3的Person類進行修改,如代碼1-11,MapReduce默認會先調用對象的compareTo方法進行對象間的比較,如果對象相等,再比較其hashCode,如果hashCode相等,則認為這兩個對象為同一個對象

修改代碼1-3的Person類為代碼1-11

代碼1-11

package com.hadoop.mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Person implements WritableComparable<Person> {

	private String name;
	private String sex;
	private int age;

	public Person() {
		super();
		// TODO Auto-generated constructor stub
	}

	public Person(String line) {
		super();
		String[] lines = line.split(" ");
		this.name = lines[0];
		this.sex = lines[1];
		this.age = Integer.parseInt(lines[2]);
	}

	public String getName() {
		return name;
	}

	public String getSex() {
		return sex;
	}

	public int getAge() {
		return age;
	}

	public String getVal(String col) {
		if ("name".equals(col)) {
			return name;
		}
		if ("sex".equals(col)) {
			return sex;
		}
		return age + "";
	}

	@Override
	public String toString() {
		return name + " " + sex + " " + age;
	}

	@Override
	public int hashCode() {
		int res = 20;
		res = name.hashCode() + 10 * res;
		res = sex.hashCode() + 10 * res;
		res = age + 10 * res;
		return res;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(name);
		out.writeUTF(sex);
		out.writeInt(age);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		name = in.readUTF();
		sex = in.readUTF();
		age = in.readInt();
	}

	@Override
	public int compareTo(Person o) {
		// TODO Auto-generated method stub
		if (hashCode() > o.hashCode()) {
			return 1;
		}
		if (hashCode() < o.hashCode()) {
			return -1;
		}
		return 0;
	}

	public static void main(String[] args) {
		System.out.println(new Person("Lily female 22").hashCode());
	}

}

 

 將關系A和關系B移入HDFS下的data文件夾,如代碼1-12

root@lejian:/data# cat relationA 
Tom male 21
Amy female 19
Daivd male 16
Lily female 22
Lucy female 20
John male 19
Rose female 19
Jojo female 26
root@lejian:/data# cat relationB
Daivd male 16
Jack male 15
Lily female 22
Lucy female 20
Tom male 25
root@lejian:/data# hadoop fs -put relation* /data
root@lejian:/data# hadoop fs -ls -R /data
-rw-r--r--   1 root supergroup        113 2017-01-07 20:48 /data/relationA
-rw-r--r--   1 root supergroup         69 2017-01-07 20:48 /data/relationB

 

代碼1-13

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class IntersectionMap extends Mapper<LongWritable, Text, Person, IntWritable> {

	private static final IntWritable ONE = new IntWritable(1);

	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
		Person person = new Person(value.toString());
		context.write(person, ONE);
	};

}

 

代碼1-14

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class IntersectionReduce extends Reducer<Person, IntWritable, Person, NullWritable> {
	protected void reduce(Person key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
		int count = 0;
		for (IntWritable val : values) {
			count += val.get();
		}
		if (count == 2) {
			context.write(key, NullWritable.get());
		}
	};
}

 

代碼1-15

package com.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Intersection {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		if (args == null || args.length != 2) {
			throw new RuntimeException("請輸入輸入路徑、輸出路徑");
		}
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("Intersection");
		job.setJarByClass(Intersection.class);
		
		job.setMapperClass(IntersectionMap.class);
		job.setMapOutputKeyClass(Person.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setReducerClass(IntersectionReduce.class);
		job.setOutputKeyClass(Person.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.addInputPaths(job, args[0]);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

 

運行代碼1-15,運行結果如代碼1-16

代碼1-16

root@lejian:/data# hadoop jar intersection.jar com.hadoop.mapreduce.Intersection /data /output
…………
root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r--   1 root supergroup          0 2017-01-07 20:30 /output/_SUCCESS
-rw-r--r--   1 root supergroup         44 2017-01-07 20:30 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
Daivd male 12
Lily female 22
Lucy female 20

 

差運算

計算關系A-關系B的差集,即找出在關系A中存在而在關系B中不存在的記錄,在Map階段,對於關系A和關系B中每一條記錄r輸出鍵值對(r,A),(r,B),在Reduce階段檢查每一條記錄r和其對應的關系名稱,只有關系名稱只存在A,才輸出記錄

先顯示HDFS中data文件夾下得relationA和relationB的文件內容,如代碼1-17

代碼1-17

root@lejian:/data# hadoop fs -ls -R /data
-rw-r--r--   1 root supergroup        113 2017-01-07 20:48 /data/relationA
-rw-r--r--   1 root supergroup         69 2017-01-07 20:48 /data/relationB
root@lejian:/data# hadoop fs -cat /data/relationA
Tom male 21
Amy female 19
Daivd male 16
Lily female 22
Lucy female 20
John male 19
Rose female 19
Jojo female 26
root@lejian:/data# hadoop fs -cat /data/relationB
Daivd male 16
Jack male 15
Lily female 22
Lucy female 20
Tom male 25

 

代碼1-18

package com.hadoop.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class DifferenceMap extends Mapper<LongWritable, Text, Person, Text> {

	private Text relationName = new Text();

	protected void setup(Context context) throws java.io.IOException, InterruptedException {
		FileSplit fileSplit = (FileSplit) context.getInputSplit();
		relationName.set(fileSplit.getPath().getName());
	};

	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
		Person person = new Person(value.toString());
		context.write(person, relationName);
	};

}

 

代碼1-19

package com.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DifferenceReduce extends Reducer<Person, Text, Person, NullWritable> {

	private String remove = "";

	protected void setup(Context context) throws java.io.IOException, InterruptedException {
		Configuration conf = context.getConfiguration();
		remove = conf.get("remove");
	};

	protected void reduce(Person key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
		for (Text val : values) {
			if (remove.equals(val.toString())) {
				return;
			}
		}
		context.write(key, NullWritable.get());
	};

}

 

代碼1-20

package com.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Difference {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		if (args == null || args.length != 3) {
			throw new RuntimeException("請輸入輸入路徑、輸出路徑和被減集合");
		}
		Configuration conf = new Configuration();
		conf.set("remove", args[2]);
		Job job = Job.getInstance(conf);
		job.setJobName("Difference");
		job.setJarByClass(Difference.class);

		job.setMapperClass(DifferenceMap.class);
		job.setMapOutputKeyClass(Person.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(DifferenceReduce.class);
		job.setOutputKeyClass(Person.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPaths(job, args[0]);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

 

運行代碼1-20,運行結果如代碼1-21

代碼1-21

root@lejian:/data# hadoop jar difference.jar com.hadoop.mapreduce.Difference /data /output relationB
…………
root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r--   1 root supergroup          0 2017-01-08 08:59 /output/_SUCCESS
-rw-r--r--   1 root supergroup         69 2017-01-08 08:59 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
Tom male 21
Amy female 19
John male 19
Jojo female 26
Rose female 19

 

自然連接

如代碼1-22,student集合的第一列是id,第二列是姓名,第三列是性別,第四列是年齡,grade集合第一列是id,第二列是科目,第三列是科目成績,需要對student集合和grade集合做自然連接。在Map階段將student和grade中每一條記錄r作為value,而記錄中的id作為key輸出。在Reduce階段則將同一鍵收集而來的數據根據它們的來源(student或grade)做笛卡爾積然后將結果輸出

代碼1-22中,將student集合和grade集合存儲在HDFS下的data文件夾中

代碼1-22

root@lejian:/data# cat student 
1 Amy female 18
2 Tom male 19
3 Sam male 21
4 John male 19
5 Lily female 21
6 Rose female 20
root@lejian:/data# cat grade 
1 Math 89
2 Math 75
4 English 85
3 English 95
5 Math 91
5 English 88
6 Math 78
6 English 99
2 English 80
root@lejian:/data# hadoop fs -put student /data
root@lejian:/data# hadoop fs -put grade /data
root@lejian:/data# hadoop fs -ls -R /data
-rw-r--r--   1 root supergroup        105 2017-01-08 09:59 /data/grade
-rw-r--r--   1 root supergroup         93 2017-01-08 09:59 /data/student

 

代碼1-23

package com.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class NaturalJoinMap extends Mapper<LongWritable, Text, IntWritable, Text> {

	private String fileName = "";
	private Text val = new Text();
	private IntWritable stuKey = new IntWritable();

	protected void setup(Context context) throws java.io.IOException, InterruptedException {
		FileSplit fileSplit = (FileSplit) context.getInputSplit();
		fileName = fileSplit.getPath().getName();
	};

	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
		String[] arr = value.toString().split(" ");
		stuKey.set(Integer.parseInt(arr[0]));
		val.set(fileName + " " + value.toString());
		context.write(stuKey, val);
	};

}

 

代碼1-24

package com.hadoop.mapreduce;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NaturalJoinReduce extends Reducer<IntWritable, Text, Text, NullWritable> {

	private Text student = new Text();
	private Text value = new Text();

	protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
		List<String> grades = new ArrayList<String>();
		for (Text val : values) {
			if (val.toString().contains("student")) {
				student.set(studentStr(val.toString()));
			} else {
				grades.add(gradeStr(val.toString()));
			}
		}
		for (String grade : grades) {
			value.set(student.toString() + grade);
			context.write(value, NullWritable.get());
		}
	};

	private String studentStr(String line) {
		String[] arr = line.split(" ");
		StringBuilder str = new StringBuilder();
		for (int i = 1; i < arr.length; i++) {
			str.append(arr[i] + " ");
		}
		return str.toString();
	}

	private String gradeStr(String line) {
		String[] arr = line.split(" ");
		StringBuilder str = new StringBuilder();
		for (int i = 2; i < arr.length; i++) {
			str.append(arr[i] + " ");
		}
		return str.toString();
	}

}

 

代碼1-25

package com.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NaturalJoin {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		if (args == null || args.length != 2) {
			throw new RuntimeException("請輸入輸入路徑、輸出路徑");
		}
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJobName("NaturalJoin");
		job.setJarByClass(NaturalJoin.class);

		job.setMapperClass(NaturalJoinMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(NaturalJoinReduce.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPaths(job, args[0]);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

 

運行代碼1-25,運行結果如代碼1-26

代碼1-26

root@lejian:/data# hadoop jar naturalJoin.jar com.hadoop.mapreduce.NaturalJoin /data /output
…………
root@lejian:/data# hadoop fs -ls -R /output
-rw-r--r--   1 root supergroup          0 2017-01-08 11:19 /output/_SUCCESS
-rw-r--r--   1 root supergroup        237 2017-01-08 11:19 /output/part-r-00000
root@lejian:/data# hadoop fs -cat /output/part-r-00000
1 Amy female 18 Math 89 
2 Tom male 19 English 80 
2 Tom male 19 Math 75 
3 Sam male 21 English 95 
4 John male 19 English 85 
5 Lily female 21 English 88 
5 Lily female 21 Math 91 
6 Rose female 20 English 99 
6 Rose female 20 Math 78 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM