mapreduce中的reduce數量是由什么來進行控制的呢?
1、numReduceTasks
如下是用來進行測試的一段wordcount的代碼
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartTest {
public static void main(String[] args){
Path inFile = new Path(args[0]);
Path outFile = new Path(args[1]);
Job job;
try {
job = Job.getInstance();
job.setJarByClass(PartTest.class);
FileInputFormat.addInputPath(job , inFile);
FileOutputFormat.setOutputPath(job, outFile);
job.setReducerClass(PartTestreducer.class);
job.setMapperClass(PartTestmapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
try {
job.waitForCompletion(true);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/**
* InputFormat描述map-reduce中對job的輸入定義
* setInputPaths():為map-reduce job設置路徑數組作為輸入列表
* setInputPath():為map-reduce job設置路徑數組作為輸出列表
*/
}
}
class PartTestmapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final IntWritable one = new IntWritable(1);
//private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
/*
String line = value.toString();
for(String s : line.split("\\s+")){
//if(s.length() > 0){
context.write(new Text(s), one);
//}
}
*/
String[] line = value.toString().split("\\W+");
for(int i = 0 ; i<= line.length-1 ;i++){
String s = line[i];
context.write(new Text(s), new IntWritable(1));
}
/*
String line = value.toString();
Text word = new Text();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
*/
}
}
class PartTestreducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum = 0;
for(IntWritable i : arg1){
sum += i.get();
}
context.write(arg0, new IntWritable(sum));
}
}
將上述代碼打包成 parttest.jar,並上傳到服務器的opt目錄
創建文件/opt/test.txt,並上傳到hdfs的/tmp目錄下
文本內容如下:
hello world hello test test hadoop hadoop hdfs hive sql sqoop
在服務器上執行:
hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out1"
我們可以看到日志輸出文件:
在這里可以看到只啟動了一個reduce任務
然后使用
hadoop fs -ls /tmp/part/out1
可以看到只生成了一個分區文件part-r-00000:
如果我們把上述代碼進行修改:
job = Job.getInstance();
job.setJarByClass(PartTest.class);
FileInputFormat.addInputPath(job , inFile);
FileOutputFormat.setOutputPath(job, outFile);
job.setNumReduceTasks(3);
job.setReducerClass(PartTestreducer.class);
job.setMapperClass(PartTestmapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
我們在代碼里新加了一行 :job.setNumReduceTasks(3);
將代碼重新打包上傳,執行:
hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out2"
將結果輸出到/tmp/part/out2目錄
可以看到啟動了3個reduce任務。
然后使用
hadoop fs -ls /tmp/part/out2
可以看到/tmp/part/out2文件夾中生成了3個part文件:
所以可以使用 setNumReduceTasks 來設置reduce的數量
2、mapreduce的分區
我們在原來的代碼的最后一段加上如下代碼:
class PartTestPartitioner extends Partitioner<Text,IntWritable>{ @Override //參數含義:第一個參數為map任務的outputkey。class,第二個參數為map任務的outputvalue。class,第三個參數為分區的數量,默認為1 public int getPartition(Text key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub if(key.toString().startsWith("h")){ return 0%numPartitions; } else if(key.toString().startsWith("s")){ return 1%numPartitions; } else{ return 2%numPartitions; } } }
這段代碼的含義是:
將以h開頭的統計結果輸出到part-r-00000
將以s開頭的統計結果輸出到part-r-00001
將以其他字母開頭的統計結果輸出到part-r-00002
對原有代碼進行如下修改:
job = Job.getInstance();
job.setJarByClass(PartTest.class);
FileInputFormat.addInputPath(job , inFile);
FileOutputFormat.setOutputPath(job, outFile);
job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class);
job.setReducerClass(PartTestreducer.class);
job.setMapperClass(PartTestmapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
新加了一行代碼:job.setPartitionerClass(PartTestPartitioner.class);
將代碼重新打包上傳,執行:
hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out3"
將結果輸出到/tmp/part/out3目錄
可以看到啟動了3個reduce任務。
然后使用
hadoop fs -ls /tmp/part/out3
可以看到/tmp/part/out3文件夾中生成了3個part文件:
分別查看三個文件:
可以看到輸出結果已經分別輸出到對應的分區文件。
注意:
job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class);
NumReduceTasks的數量不能小於partitioner的數量,否則結果會寫到part-r-00000中