Spark實現分組TopN


一.概述

  在許多數據中,都存在類別的數據,在一些功能中需要根據類別分別獲取前幾或后幾的數據,用於數據可視化或異常數據預警。在這種情況下,實現分組TopN就顯得非常重要了,因此,使用了Spark聚合函數和排序算法實現了分布式TopN計算功能。

  

二.代碼實現

 1 package scala
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 5 import org.apache.spark.sql.{Row, SparkSession}
 6 
 7 /**
 8   * 計算分組topN
 9   * Created by Administrator on 2019/11/20.
10   */
11 object GroupTopN {
12   Logger.getLogger("org").setLevel(Level.WARN) // 設置日志級別
13   def main(args: Array[String]) {
14     //創建測試數據
15     val test_data = Array("CJ20191120,201911", "CJ20191120,201910", "CJ20191105,201910", "CJ20191105,201909", "CJ20191111,201910")
16     val spark = SparkSession.builder().appName("GroupTopN").master("local[2]").getOrCreate()
17     val sc = spark.sparkContext
18     val test_data_rdd = sc.parallelize(test_data).map(row => {
19       val Array(scene, cycle) = row.split(",")
20       Row(scene, cycle)
21     })
22     // 設置數據模式
23     val structType = StructType(Array(
24       StructField("scene", StringType, true),
25       StructField("cycle", StringType, true)
26     ))
27     // 轉換為df
28     val test_data_df = spark.createDataFrame(test_data_rdd, structType)
29     test_data_df.createOrReplaceTempView("test_data_df")
30     // 拼接周期
31     val scene_ws = spark.sql("select scene,concat_ws(',',collect_set(cycle)) as cycles from test_data_df group by scene")
32     scene_ws.count()
33     scene_ws.show()
34     scene_ws.createOrReplaceTempView("scene_ws")
35     /**
36       * 定義參數確定N的大小,暫定為1
37       */
38     val sum = 1
39     // 創建廣播變量,把N的大小廣播出去
40     val broadcast = sc.broadcast(sum)
41     /**
42       * 定義Udf實現獲取組內的前N個數據
43       */
44     spark.udf.register("getTopN", (cycles : String) => {
45       val sum = broadcast.value
46       var mid = ""
47       if(cycles.contains(",")){ // 多值
48         val cycle = cycles.split(",").sorted.reverse // 降序排序
49         val min = Math.min(cycle.length, sum)
50         for(i <- 0 until min){
51           if(mid.equals("")){
52             mid = cycle(i)
53           }else{
54             mid += "," + cycle(i)
55           }
56         }
57       }else{ // 單值
58         mid = cycles
59       }
60       mid
61     })
62 
63     val result = spark.sql("select scene,getTopN(cycles) cycles from scene_ws")
64     result.show()
65     spark.stop()
66   }
67 }

三.結果

  

  

四.備注

  當N大於1時,多個數據會拼接在一起,若想每個一行,可是使用使用列轉行功能,參考我的博客:https://www.cnblogs.com/yszd/p/11266552.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM