MapReduce實驗


承接上一篇,NoSQL實驗

MapReduce實驗

如何在Eclipse中運行MapReduce程序,參考廈大數據庫實驗室博客
總體代碼:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Prac1 {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(Prac1.class);

		int cmd = 1;
		String[] file = {"input"+cmd,"output"+cmd};
		if(cmd==1) 
		{			
			job.setMapperClass(Prac1.Mapper1.class);
			job.setCombinerClass(Prac1.Reducer1.class);
			job.setReducerClass(Prac1.Reducer1.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			FileInputFormat.setInputPaths(job, new Path(file[0]));
			FileOutputFormat.setOutputPath(job, new Path(file[1]));
			
		}else if(cmd==2)
		{			
			job.setMapperClass(Prac1.Mapper2.class);
			job.setReducerClass(Prac1.Reducer2.class);
			job.setOutputKeyClass(IntWritable.class);
			job.setOutputValueClass(IntWritable.class);
			FileInputFormat.setInputPaths(job, new Path(file[0]));
			FileOutputFormat.setOutputPath(job, new Path(file[1]));
		}else
		{			
			job.setMapperClass(Prac1.Mapper3.class);
			job.setReducerClass(Prac1.Reducer3.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			FileInputFormat.setInputPaths(job, new Path(file[0]));
			FileOutputFormat.setOutputPath(job, new Path(file[1]));
		}
		System.exit(job.waitForCompletion(true)?0:1);
	}

1.編程實現文件合並和去重操作

對於兩個輸入文件,即文件A和文件B,請編寫MapReduce程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新的輸出文件C。下面是輸入文件和輸出文件的一個樣例供參考。
輸入文件A的樣例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
輸入文件B的樣例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根據輸入文件A和B合並得到的輸出文件C的樣例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x

設計思路:

操作 <key, value>
map()讀取 每一行的行號 每一項內容,例如"20150104 z"
map()輸出 讀取的value,每一項內容 空白Text,只是占位表示一下
reduce()讀取 獨立的每一項內容 空白Text的迭代器
reduce()輸出 獨立的每一項內容 空白Text,只是占位表示一下

Java代碼

public static class Mapper1 extends Mapper<Object,Text,Text,Text>
{
   public void map(Object key,Text value,Mapper<Object,Text,Text,Text>.Context context) throws IOException, InterruptedException
   {
   	context.write(value, new Text(""));
   }
}

public static class Reducer1 extends Reducer<Text,Text,Text,Text>
{
   
   public void reduce(Text key,Iterable<Text> values,Reducer<Text,Text,Text,Text>.Context context) throws IOException,InterruptedException
   {
   	context.write(key, new Text(""));
   }
}

2. 編寫程序實現對輸入文件的排序

現在有多個輸入文件,每個文件中的每行內容均為一個整數。要求讀取所有文件中的整數,進行升序排序后,輸出到一個新的文件中,輸出的數據格式為每行兩個整數,第一個數字為第二個整數的排序位次,第二個整數為原待排列的整數。下面是輸入文件和輸出文件的一個樣例供參考。
輸入文件1的樣例如下:
33
37
12
40
輸入文件2的樣例如下:
4
16
39
5
輸入文件3的樣例如下:
1
45
25
根據輸入文件1、2和3得到的輸出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45

設計思路:

操作 <key, value>
map()讀取 文件每一行的行號 每一行的數字
map()輸出 每一行的數字 1,無意義,標識字符
reduce()讀取 排序好的數字 1,無意義,標識字符
reduce()輸出 序號 排序好的數字
  • 序號:編寫的Reducer類中繼承了原生的Reducer類中,重載了reduce函數,並且引入了一個記數的全局變量(static標記),每接收一個<key,value>便加一。
  • 疑惑之處:在map以后的shuffle是看不到的,MapReduce框架中就包含了shulffle並且在shuffle中就包含了排序,但是按照我的理解在shuffle中的排序應該只是針對一個map的機器或者是任務才對,(模糊記得要先到緩存再到磁盤,所以我在網上看到許多同學都是用了partion歸類,我不知道有啥用,我覺得我這個程序能完成任務的原因可能是我才用的是偽分布式安裝的hadoop,只有一台機器。(????)

Java代碼

public static class Mapper2 extends Mapper<Object,Text,IntWritable,IntWritable>
{
	public void map(Object key,Text value,Context context) throws IOException, InterruptedException
	{
		IntWritable data = new IntWritable();
		data.set(Integer.parseInt(value.toString()));
		context.write(data, new IntWritable(1));
	}
}

public static class Reducer2 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>
{
	private static int sum = 1;
	public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
	{
		for(IntWritable num:values) 
		{
			context.write(new IntWritable(sum),key);
			sum++;//可能存在重復的數字
		}
	}
}

3. 對給定的表格進行信息挖掘

下面給出一個child-parent的表格,要求挖掘其中的父子輩關系,給出祖孫輩關系的表格。
輸入文件內容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
輸出文件內容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse

設計思路:

操作 <key, value>
map()讀取 每一行的行號 子輩和父輩的內容
map()輸出 針對每一行都輸出兩次,第一次是子輩,第二次是父輩 第一次是父輩,第二次是子輩
reduce()讀取 1.接收到子輩 父輩
reduce()讀取 2.接收到父輩 一定有子輩,可能存在祖輩
reduce()讀取 3.接收到祖輩 父輩
reduce()輸出 子輩 祖輩

上述的子輩、父輩和祖輩都是相較而言的。比如如果四世同堂的話,第三世相較於第一世是子輩,相較於第四世是父輩。
每接收到一份數據,交換順序輸出兩次,在shuffle中按照鍵值歸並,這樣就能將存在祖輩和子輩的數據找出來,並且為了不將祖輩和子輩混淆,在map輸出的value中標記是old還是young。

Java代碼

public static class Mapper3 extends Mapper<Object,Text,Text,Text>
{
	public void map(Object key,Text value,Context context) throws IOException, InterruptedException
	{
		String[] splStr = value.toString().split(" ");
		String child = splStr[0];
		String parent = splStr[1];
		
		if(child.equals("child")&&parent.equals("parent"))
			return;
		context.write(new Text(child), new Text("old#"+parent));
		context.write(new Text(parent), new Text("young#"+child));
	}
}

public static class Reducer3 extends Reducer<Text,Text,Text,Text>
{
	private static boolean head = true ;
	public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
	{
		if(head)
		{
			context.write(new Text("grandchild"), new Text("grandparent"));
			head = false;
		}
		ArrayList<String> grandchild = new ArrayList<String>();
		ArrayList<String> grandparent = new ArrayList<String>();
		String[] temp;
		for(Text val:values)
		{
			temp = val.toString().split("#");
			if(temp[0].equals("young"))
				grandchild.add(temp[1]);
			else
				grandparent.add(temp[1]);
		}
		if(gc.size()==0||gp.size()==0)
                        return;
		for(String gc:grandchild)
			for(String gp:grandparent)
					context.write(new Text(gc), new Text(gp));
	}
}

人生此處,絕對樂觀


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM