spark 分组取topn


java

 1 /** 
 2  *分组取topn,有序数列去除一些项后,仍然有序,所以应当先排序后分组  3  *@author Tele  4  *  5  */
 6 public class TopDemo2 {  7     private static SparkConf conf = new SparkConf().setMaster("local").setAppName("topdemo2");  8     private static JavaSparkContext jsc = new JavaSparkContext(conf);  9 
10     public static <U> void main(String[] args) { 11         JavaRDD<String> rdd = jsc.textFile("./src/main/java/base_demo/top/score.txt"); 12 
13         JavaPairRDD<Integer, String> mapToPair = rdd.mapToPair(new PairFunction<String, Integer, String>() { 14 
15             private static final long serialVersionUID = 1L; 16 
17  @Override 18             public Tuple2<Integer, String> call(String t) throws Exception { 19                 String[] fields = t.split(" "); 20                 return new Tuple2<Integer, String>(Integer.parseInt(fields[1]), fields[0]); 21  } 22  }); 23 
24         // 先排序
25         JavaPairRDD<Integer, String> sortByKey = mapToPair.sortByKey(false); 26 
27         // 互换位置以便分组
28         JavaPairRDD<String, Integer> mapToPair2 = sortByKey 29                 .mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { 30 
31                     private static final long serialVersionUID = 1L; 32 
33  @Override 34                     public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { 35                         return new Tuple2<String, Integer>(t._2, t._1); 36  } 37  }); 38 
39         // 分组
40         JavaPairRDD<String, Iterable<Integer>> groupByKey2 = mapToPair2.groupByKey(); 41 
42         // 取前三
43         JavaPairRDD<String, Iterable<Integer>> result = groupByKey2 44                 .mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() { 45 
46                     private static final long serialVersionUID = 1L; 47 
48  @Override 49                     public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> t) 50                             throws Exception { 51 
52                         return new Tuple2<String, Iterable<Integer>>(t._1, 53                                 IteratorUtils.toList(t._2.iterator()).subList(0, 3)); 54  } 55  }); 56 
57         result.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { 58 
59             private static final long serialVersionUID = 1L; 60 
61  @Override 62             public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { 63                 System.out.println(t._1 + t._2); 64  } 65  }); 66 
67  jsc.close(); 68  } 69 }

scala

 1 object TopDemo2 {  2   def main(args: Array[String]): Unit = {  3     val conf = new SparkConf().setMaster("local").setAppName("topdemo2");  4     val sc = new SparkContext(conf);  5 
 6     val rdd = sc.textFile("./src/main/scala/spark_core/top/score.txt", 1);  7 
 8     rdd.map(lines => {  9       val fields = lines.split(" "); 10       (fields(1).toInt, fields(0)); 11     }).sortByKey(false, 1).map(t => (t._2, t._1)).groupByKey().map(t => { 12       val arr = t._2; 13       val score = arr.take(3); 14  (t._1, score) 15     }).foreach(t => println(t._1 + "---" + t._2)); 16  } 17 }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM