Spark之開窗函數


一.簡介

  開窗函數row_number()是按照某個字段分組,然后取另外一個字段排序的前幾個值的函數,相當於分組topN。如果SQL語句里面使用了開窗函數,那么這個SQL語句必須使用HiveContext執行。

二.代碼實踐【使用HiveContext】

package big.data.analyse.sparksql

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * Created by zhen on 2019/7/6.
  */
object RowNumber {
  /**
    * 設置日志級別
    */
  Logger.getLogger("org").setLevel(Level.WARN)
  def main(args: Array[String]) {
    /**
      * 創建spark入口,支持Hive
      */
    val spark = SparkSession.builder().appName("RowNumber")
      .master("local[2]").enableHiveSupport().getOrCreate()

    /**
      * 創建測試數據
      */
    val array = Array("1,Hadoop,12","5,Spark,6","3,Solr,15","3,HBase,8","6,Hive,16","6,TensorFlow,26")

    val rdd = spark.sparkContext.parallelize(array).map{ row =>
      val Array(id, name, age) = row.split(",")
      Row(id, name, age.toInt)
    }

    val structType = new StructType(Array(
      StructField("id", StringType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))

    /**
      * 轉化為df
      */
    val df = spark.createDataFrame(rdd, structType)
    df.show()

    df.createOrReplaceTempView("technology")

    /**
      * 應用開窗函數row_number
      * 注意:開窗函數只能在hiveContext下使用
      */
    val result_1 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top <= 1")
    result_1.show()

    val result_2 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top <= 2")
    result_2.show()

    val result_3 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top <= 3")
    result_3.show()

    val result_4 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top > 3")
    result_4.show()
  }
}

三.結果【使用HiveContext】

  1.初始數據

    

  2.top<=1時

    

  3.top<=2時

    

  4.top<=3時

    

  5.top>3時【分組中最大為3】

    

四.代碼實現【不使用HiveContext】 

package big.data.analyse.sparksql

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * Created by zhen on 2019/7/6.
  */
object RowNumber {
  /**
    * 設置日志級別
    */
  Logger.getLogger("org").setLevel(Level.WARN)
  def main(args: Array[String]) {
    /**
      * 創建spark入口,不支持Hive
      */
    val spark = SparkSession.builder().appName("RowNumber")
      .master("local[2]").getOrCreate()

    /**
      * 創建測試數據
      */
    val array = Array("1,Hadoop,12","5,Spark,6","3,Solr,15","3,HBase,8","6,Hive,16","6,TensorFlow,26")

    val rdd = spark.sparkContext.parallelize(array).map{ row =>
      val Array(id, name, age) = row.split(",")
      Row(id, name, age.toInt)
    }

    val structType = new StructType(Array(
      StructField("id", StringType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))

    /**
      * 轉化為df
      */
    val df = spark.createDataFrame(rdd, structType)
    df.show()

    df.createOrReplaceTempView("technology")

    /**
      * 應用開窗函數row_number
      * 注意:開窗函數只能在hiveContext下使用
      */
    val result_1 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top <= 1")
    result_1.show()

    val result_2 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top <= 2")
    result_2.show()

    val result_3 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top <= 3")
    result_3.show()

    val result_4 = spark.sql("select id,name,age from (select id,name,age," +
      "row_number() over (partition by id order by age desc) top from technology) t where t.top > 3")
    result_4.show()
  }
}

五.結果【不使用HiveContext】

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM