Mapreduce中的reduce數量和分區控制


 

mapreduce中的reduce數量是由什么來進行控制的呢?

1、numReduceTasks

如下是用來進行測試的一段wordcount的代碼

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PartTest {
	
	
	public static void main(String[] args){
		Path inFile = new Path(args[0]);
		Path outFile = new Path(args[1]);
		
		Job job;
		try {
			job = Job.getInstance();
			job.setJarByClass(PartTest.class);
		       
			FileInputFormat.addInputPath(job , inFile);  
			FileOutputFormat.setOutputPath(job, outFile);
			
			job.setReducerClass(PartTestreducer.class);
			job.setMapperClass(PartTestmapper.class);
			
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			
			
			try {
				job.waitForCompletion(true);
			} catch (ClassNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
        
        /** 
         * InputFormat描述map-reduce中對job的輸入定義 
         * setInputPaths():為map-reduce job設置路徑數組作為輸入列表 
         * setInputPath():為map-reduce job設置路徑數組作為輸出列表 
         */  
        
	}

}


class PartTestmapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	private final IntWritable one = new IntWritable(1);  
    //private Text word = new Text(); 
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		/*
		String line = value.toString();
		for(String s : line.split("\\s+")){
			//if(s.length() > 0){
				context.write(new Text(s), one);
			//}
		}
		*/
		
		
		 String[] line = value.toString().split("\\W+");
		 for(int i = 0 ; i<= line.length-1 ;i++){
			 String s = line[i];
		    context.write(new Text(s), new IntWritable(1));
		}
		
		/*
		
		String line = value.toString();  
		Text word = new Text();
        StringTokenizer token = new StringTokenizer(line);  
        while (token.hasMoreTokens()) {  
            word.set(token.nextToken());  
            context.write(word, one);  
        }
        */
	}
	
}

class PartTestreducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	@Override
	protected void reduce(Text arg0, Iterable<IntWritable> arg1,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		int sum = 0;
		for(IntWritable i : arg1){
			sum += i.get();
		}
		context.write(arg0, new IntWritable(sum));
	}
	
	
}

 

將上述代碼打包成 parttest.jar,並上傳到服務器的opt目錄

創建文件/opt/test.txt,並上傳到hdfs的/tmp目錄下

文本內容如下:

hello world
hello test
test hadoop
hadoop hdfs
hive
sql
sqoop

在服務器上執行:

hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out1"

我們可以看到日志輸出文件:

在這里可以看到只啟動了一個reduce任務

然后使用

hadoop fs -ls /tmp/part/out1

可以看到只生成了一個分區文件part-r-00000:

 

 

如果我們把上述代碼進行修改:

                        job = Job.getInstance();
			job.setJarByClass(PartTest.class);
		       
			FileInputFormat.addInputPath(job , inFile);  
			FileOutputFormat.setOutputPath(job, outFile);
			
			job.setNumReduceTasks(3);
			
			job.setReducerClass(PartTestreducer.class);
			job.setMapperClass(PartTestmapper.class);
			
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			

  

我們在代碼里新加了一行 :job.setNumReduceTasks(3);

將代碼重新打包上傳,執行:

hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out2"

將結果輸出到/tmp/part/out2目錄

可以看到啟動了3個reduce任務。

然后使用

hadoop fs -ls /tmp/part/out2

可以看到/tmp/part/out2文件夾中生成了3個part文件:

 

 所以可以使用  setNumReduceTasks  來設置reduce的數量

 

 

2、mapreduce的分區

我們在原來的代碼的最后一段加上如下代碼:

class PartTestPartitioner extends Partitioner<Text,IntWritable>{


	@Override
	//參數含義:第一個參數為map任務的outputkey。class,第二個參數為map任務的outputvalue。class,第三個參數為分區的數量,默認為1
	public int getPartition(Text key, IntWritable value, int numPartitions) {
		// TODO Auto-generated method stub
		
		if(key.toString().startsWith("h")){
		     return 0%numPartitions;
		}
		else if(key.toString().startsWith("s")){
			return 1%numPartitions;
		}
		else{
			return 2%numPartitions;
		}
	}
	
}
 

這段代碼的含義是:

將以h開頭的統計結果輸出到part-r-00000

將以s開頭的統計結果輸出到part-r-00001

將以其他字母開頭的統計結果輸出到part-r-00002

對原有代碼進行如下修改:

                        job = Job.getInstance();
			job.setJarByClass(PartTest.class);
		       
			FileInputFormat.addInputPath(job , inFile);  
			FileOutputFormat.setOutputPath(job, outFile);
			
 job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class);
			
			job.setReducerClass(PartTestreducer.class);
			job.setMapperClass(PartTestmapper.class);
			
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);

  

新加了一行代碼:job.setPartitionerClass(PartTestPartitioner.class);

將代碼重新打包上傳,執行:

hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out3"

將結果輸出到/tmp/part/out3目錄

可以看到啟動了3個reduce任務。

然后使用

hadoop fs -ls /tmp/part/out3

可以看到/tmp/part/out3文件夾中生成了3個part文件:

 

 分別查看三個文件:

可以看到輸出結果已經分別輸出到對應的分區文件。

 

 

注意:

job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class);

NumReduceTasks的數量不能小於partitioner的數量,否則結果會寫到part-r-00000中


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM