一.概述
在許多數據中,都存在類別的數據,在一些功能中需要根據類別分別獲取前幾或后幾的數據,用於數據可視化或異常數據預警。在這種情況下,實現分組TopN就顯得非常重要了,因此,使用了Spark聚合函數和排序算法實現了分布式TopN計算功能。
二.代碼實現
1 package scala 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.sql.types.{StringType, StructField, StructType} 5 import org.apache.spark.sql.{Row, SparkSession} 6 7 /** 8 * 計算分組topN 9 * Created by Administrator on 2019/11/20. 10 */ 11 object GroupTopN { 12 Logger.getLogger("org").setLevel(Level.WARN) // 設置日志級別 13 def main(args: Array[String]) { 14 //創建測試數據 15 val test_data = Array("CJ20191120,201911", "CJ20191120,201910", "CJ20191105,201910", "CJ20191105,201909", "CJ20191111,201910") 16 val spark = SparkSession.builder().appName("GroupTopN").master("local[2]").getOrCreate() 17 val sc = spark.sparkContext 18 val test_data_rdd = sc.parallelize(test_data).map(row => { 19 val Array(scene, cycle) = row.split(",") 20 Row(scene, cycle) 21 }) 22 // 設置數據模式 23 val structType = StructType(Array( 24 StructField("scene", StringType, true), 25 StructField("cycle", StringType, true) 26 )) 27 // 轉換為df 28 val test_data_df = spark.createDataFrame(test_data_rdd, structType) 29 test_data_df.createOrReplaceTempView("test_data_df") 30 // 拼接周期 31 val scene_ws = spark.sql("select scene,concat_ws(',',collect_set(cycle)) as cycles from test_data_df group by scene") 32 scene_ws.count() 33 scene_ws.show() 34 scene_ws.createOrReplaceTempView("scene_ws") 35 /** 36 * 定義參數確定N的大小,暫定為1 37 */ 38 val sum = 1 39 // 創建廣播變量,把N的大小廣播出去 40 val broadcast = sc.broadcast(sum) 41 /** 42 * 定義Udf實現獲取組內的前N個數據 43 */ 44 spark.udf.register("getTopN", (cycles : String) => { 45 val sum = broadcast.value 46 var mid = "" 47 if(cycles.contains(",")){ // 多值 48 val cycle = cycles.split(",").sorted.reverse // 降序排序 49 val min = Math.min(cycle.length, sum) 50 for(i <- 0 until min){ 51 if(mid.equals("")){ 52 mid = cycle(i) 53 }else{ 54 mid += "," + cycle(i) 55 } 56 } 57 }else{ // 單值 58 mid = cycles 59 } 60 mid 61 }) 62 63 val result = spark.sql("select scene,getTopN(cycles) cycles from scene_ws") 64 result.show() 65 spark.stop() 66 } 67 }
三.結果
四.備注
當N大於1時,多個數據會拼接在一起,若想每個一行,可是使用使用列轉行功能,參考我的博客:https://www.cnblogs.com/yszd/p/11266552.html