現在有多個輸入文件,每個文件中的每行內容均為一個整數。要求讀取所有文件中的整數,進行升序排序后,輸出到一個新的文件中,輸出的數據格式為每行兩個整數,第一個整數為第二個整數的排序位次,第二個整數為原待排列的整數。


package org.apache.hadoop.examples;
import java.util.HashMap;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class B_sortInt {
	public static Integer numsum = new Integer(0);
	public B_sortInt() {
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		String[] otherArgs = new String[]{"input","output"};
		if(otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> [<in>...] <out>");
			System.exit(2);
		}

		Job job = Job.getInstance(conf, "sort");
		job.setJarByClass(B_sortInt.class);
		job.setMapperClass(B_sortInt.TokenizerMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
		job.setReducerClass(B_sortInt.IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		for(int i = 0; i < otherArgs.length - 1; ++i) {
			FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
		}

		FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
		System.exit(job.waitForCompletion(true)?0:1);
	}

	public static class IntSumReducer extends Reducer<IntWritable, Text, Text, Text> {
		private Text word = new Text();
		public IntSumReducer() {
		}

		public void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
			this.word.set(key.toString());
			numsum+=1;
			context.write(new Text(numsum.toString()), word);
		}
		//System.out.println(key.toString()+"\n"+result.toString());
	}


	public static class TokenizerMapper extends Mapper<Object, Text, IntWritable, Text> {
		private IntWritable one = new IntWritable();
		public TokenizerMapper() {
		}

		public void map(Object key, Text value, Mapper<Object, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while(itr.hasMoreTokens()) {
				String tmpstr = itr.nextToken();
				this.one.set(Integer.parseInt(tmpstr));
				//System.out.println("["+one.toString()+"]");
				context.write(one, new Text("a"));
			}

		}
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM