0.簡介
TopN算法是一個經典的算法,由於每個map都只是實現了本地的TopN算法,而假設map有M個,在歸約的階段只有M x N個,這個結果是可以接受的並不會造成性能瓶頸。
這個TopN算法在map階段將使用TreeMap來實現排序,以到達可伸縮的目的。
當然算法有兩種,一種是唯一鍵,就是說key的類型是唯一的(是指在比較的實際階段),比如本篇就是唯一鍵的TopN實現;
另一種就是非唯一鍵,比如key值可能會有A、B、C三種,然后分別對他們求TopN,當然,我們假設數據是混在一起的,非唯一鍵方面的內容,將會寫到另一篇博客上。
進入正題
一、輸入、期望輸出、思路。
由於是唯一鍵實際上與排序有關的只是value部分,我們大可以簡單點,輸入數據為一列數字好了。
TopN.txt內容如下:
20 78 56 45 23 15 12 35 79 68 98 63 111 222 333 444 555
但我們設置N=10時,期望輸出為:
555
444
333
222
111
98
79
78
68
63
思路嘛,在簡介部分已經說的很清楚了,沒必要再贅述了,直接上代碼:
2.用Java編寫MapReduce程序實現TopN:
為了能夠真正意義上的稱為TopN,這里在context里設置了N的值。所以在輸入參數的時候也許相應的增加!
package TopN; import java.io.IOException; import java.util.StringTokenizer; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TopN { public static class TopTenMapper extends Mapper<Object, Text, NullWritable, IntWritable> { private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(); public void map(Object key, Text value, Context context) { int N = 10; //默認為Top10 N = Integer.parseInt(context.getConfiguration().get("N")); StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { repToRecordMap.put(Integer.parseInt(itr.nextToken()), " "); if (repToRecordMap.size() > N) { repToRecordMap.remove(repToRecordMap.firstKey()); } } } protected void cleanup(Context context) { for (Integer i : repToRecordMap.keySet()) { try { context.write(NullWritable.get(), new IntWritable(i)); } catch (Exception e) { e.printStackTrace(); } } } } public static class TopTenReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> { private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(); public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int N = 10; //默認為Top10 N = Integer.parseInt(context.getConfiguration().get("N")); for (IntWritable value : values) { repToRecordMap.put(value.get(), " "); if (repToRecordMap.size() > N) { repToRecordMap.remove(repToRecordMap.firstKey()); } } for (Integer i : repToRecordMap.descendingMap().keySet()) { context.write(NullWritable.get(), new IntWritable(i)); } } } public static void main(String[] args) throws Exception { if (args.length != 3) { throw new IllegalArgumentException( "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: hadoop jar <jar-name> " + "TopN.TopN " + "<the value of N>" + "<input-path> " + "<output-path>"); } Configuration conf = new Configuration(); conf.set("N", args[0]); Job job = Job.getInstance(conf, "TopN"); job.setJobName("TopN"); Path inputPath = new Path(args[1]); Path outputPath = new Path(args[2]); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.setJarByClass(TopN.class); job.setMapperClass(TopTenMapper.class); job.setReducerClass(TopTenReducer.class); job.setNumReduceTasks(1); job.setMapOutputKeyClass(NullWritable.class);// map階段的輸出的key job.setMapOutputValueClass(IntWritable.class);// map階段的輸出的value job.setOutputKeyClass(NullWritable.class);// reduce階段的輸出的key job.setOutputValueClass(IntWritable.class);// reduce階段的輸出的value System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3.用Scala寫Spark程序實現TopN:
依然簡潔的代碼:
package spark import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions import org.apache.spark.rdd.RDD.rddToPairRDDFunctions object TopN { def main(args: Array[String]) { var N = 10 //這里指定N的值 val conf = new SparkConf().setAppName(" TopN ") .setMaster("local") var sc = new SparkContext(conf) sc.setLogLevel("Warn") val file = sc.textFile("e:\\TopN.txt") val rdd = file.flatMap(_.split(" ")).map(x => (x.toInt, null)) .sortByKey(false).map(_._1).take(N) .foreach { println } } }