java
1 /**
2 *分组取topn,有序数列去除一些项后,仍然有序,所以应当先排序后分组 3 *@author Tele 4 * 5 */
6 public class TopDemo2 { 7 private static SparkConf conf = new SparkConf().setMaster("local").setAppName("topdemo2"); 8 private static JavaSparkContext jsc = new JavaSparkContext(conf); 9
10 public static <U> void main(String[] args) { 11 JavaRDD<String> rdd = jsc.textFile("./src/main/java/base_demo/top/score.txt"); 12
13 JavaPairRDD<Integer, String> mapToPair = rdd.mapToPair(new PairFunction<String, Integer, String>() { 14
15 private static final long serialVersionUID = 1L; 16
17 @Override 18 public Tuple2<Integer, String> call(String t) throws Exception { 19 String[] fields = t.split(" "); 20 return new Tuple2<Integer, String>(Integer.parseInt(fields[1]), fields[0]); 21 } 22 }); 23
24 // 先排序
25 JavaPairRDD<Integer, String> sortByKey = mapToPair.sortByKey(false); 26
27 // 互换位置以便分组
28 JavaPairRDD<String, Integer> mapToPair2 = sortByKey 29 .mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { 30
31 private static final long serialVersionUID = 1L; 32
33 @Override 34 public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { 35 return new Tuple2<String, Integer>(t._2, t._1); 36 } 37 }); 38
39 // 分组
40 JavaPairRDD<String, Iterable<Integer>> groupByKey2 = mapToPair2.groupByKey(); 41
42 // 取前三
43 JavaPairRDD<String, Iterable<Integer>> result = groupByKey2 44 .mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() { 45
46 private static final long serialVersionUID = 1L; 47
48 @Override 49 public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> t) 50 throws Exception { 51
52 return new Tuple2<String, Iterable<Integer>>(t._1, 53 IteratorUtils.toList(t._2.iterator()).subList(0, 3)); 54 } 55 }); 56
57 result.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { 58
59 private static final long serialVersionUID = 1L; 60
61 @Override 62 public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { 63 System.out.println(t._1 + t._2); 64 } 65 }); 66
67 jsc.close(); 68 } 69 }
scala
1 object TopDemo2 { 2 def main(args: Array[String]): Unit = { 3 val conf = new SparkConf().setMaster("local").setAppName("topdemo2"); 4 val sc = new SparkContext(conf); 5
6 val rdd = sc.textFile("./src/main/scala/spark_core/top/score.txt", 1); 7
8 rdd.map(lines => { 9 val fields = lines.split(" "); 10 (fields(1).toInt, fields(0)); 11 }).sortByKey(false, 1).map(t => (t._2, t._1)).groupByKey().map(t => { 12 val arr = t._2; 13 val score = arr.take(3); 14 (t._1, score) 15 }).foreach(t => println(t._1 + "---" + t._2)); 16 } 17 }