數據源
A 2 B 9 C 4 D 9 Z 42
要實現的輸出
Z 42 D 9 B 9 C 4 A 2
看字符順序,其實什么也沒有,只是按照后面的數字進行一次倒序排序,實現思路,1利用hadoop自帶的排序功能,2.KV互換
實現代碼
public class SVJob { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "192.168.9.181:9001"); String[] ars = new String[] { "hdfs://192.168.9.181:9000/user/hadoop/input/examples/SortByValue/", "hdfs://192.168.9.181:9000/user/hadoop/output/examples/SortByValue" }; String[] otherArgs = new GenericOptionsParser(conf, ars) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("SortByValue: <in> <out>"); System.exit(2); } Job job = new Job(conf, "SortByValue"); job.setJarByClass(SVJob.class); job.setMapperClass(SVMapper.class); job.setReducerClass(SVReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setSortComparatorClass(IntWritableDecreasingComparator.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
public class SVMapper extends Mapper<Object, Text, IntWritable, Text> { protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] keyValueStrings = line.split("\t"); if(keyValueStrings.length != 2) { //新手,不知道怎么記錄日志,也不清楚怎么退出 各位大神如果知道請通知我,謝謝 System.err.println("string format error!!!!!"); return; } int outkey = Integer.parseInt(keyValueStrings[1]); String outvalue = keyValueStrings[0]; context.write(new IntWritable(outkey), new Text(outvalue)); } }
public class SVReducer extends Reducer<IntWritable, Text, Text, IntWritable> { protected void reduce(IntWritable key, Iterable<Text> values,Context context)throws IOException, InterruptedException { for(Text value : values){ context.write(value, key); } } }
因為我們要實現倒序排序要有自定義的排序方法
public class IntWritableDecreasingComparator extends Comparator { @SuppressWarnings("rawtypes") public int compare( WritableComparable a,WritableComparable b){ return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } }
這樣就完成了,可以自定義排序了