spark中的聚合操作和分組操作


聚合操作

注意:任何的聚合操作都有默認的分組,聚合是在分組的基礎上進行的。比如,對整體進行求和,那么分組就是整體。所以,在做聚合操作之前,一定要明確是在哪個分組上進行聚合操作
注意:聚合操作,本質上是一個多對一(一對一是多對一的特殊情況)的操作。特別注意的是這個’一‘,可以是一個值(mean, sum等),同樣也可以是一個對象(list, set等對象)

聚合函數

除了DataFrame的某些操作或者通過.stat訪問方法,所有的聚合操作都是以函數的方式出現的。大多數聚合函數可以在org.apache.spark.sql.functions中找到

  • count函數
    使用的方向:
    • 對指定列進行計數
    • 使用count(*)或者count(1)對所有列進行計數
  • countDistinct(統計不同的值得數量)
  • approx_count_distinct
    對統計的精度要求不高使用它,注意:approx_count_distinct帶了另一個參數,該參數指定可容忍的最大誤差。本例中我們指定了一個相當大的誤差率,因此得到的答案與正確值差距很大,但執行速度更快,比countDistinct函數執行耗時更少。檔處理更大的數據集的時候,這種提升會更加明顯。

聚合輸出復雜類型

spark的聚合還可以將某列上的數值聚合到一個list中,或者將唯一值聚合到set集合中。
案例:將國家列直接生成list列和set列

    val path="/Volumes/Data/BigData_code/data/retail-data/all/*.csv"
    //讀取數據
    val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true")
      .load(path).coalesce(5)
    df.cache()
    df.createOrReplaceTempView("dfTable")
    df.show()
    //將Country聚合成set列和list列
    df.agg(collect_set("Country").as("CountrySet"), collect_list("Country").as("CountryList")).show()

分組操作

  • 使用表達式分組
  • 使用Map進行分組
      //使用表達式分組
      df.groupBy("InvoiceNo").agg(
        count("Quantity").as("quan"),     //使用函數方式
        expr("count(Quantity)")     //使用字符串表達式
      ).show()
      //使用Map進行分組
      df.groupBy("InvoiceNo").agg("Quantity"->"count", "Quantity"->"stddev_pop").show()
    
  • window函數
    window函數的使用,請看這篇博客:https://blog.csdn.net/weixin_38653290/article/details/83962789

分組集---(挖個坑)P133

用戶自定義的聚合函數

使用UDAF來計算輸入數據組(與單行相對)的自定義計算。
若要創建UDAF,必須繼承UserDefinedAggregateFunction基類並實現以下方法:

  • inputSchema用於指定輸入參數,輸入參數類型為StructType
  • bufferSchema用於指定UDAF中間結果,中間結果類型為StructType。
  • dataType用於指定返回結果,返回結果的類型為DataType。
  • deterministic是一個布爾值,它指定此UDAF對於某個輸入是否會返回相同的結果。
  • initialize初始化聚合緩沖區的初始值
  • update描述應如何根據給定行更新內部緩沖區。
  • merge描述應如何合並兩個緩沖區
  • evaluate將生成聚合最終結果
    例子:實現自定義聚合函數BoolAnd,它將返回所有行是否為true
class BoolAnd extends UserDefinedAggregateFunction{
  //指定輸入參數
  override def inputSchema: StructType = StructType(
    StructField("Value", BooleanType)::Nil
  )
  //用於指定UDAF中間結果,中間結果使用StructType
  override def bufferSchema: StructType = StructType(
    StructField("value", BooleanType)::Nil
  )
  //用於指定返回結果,返回結果為DataType
  override def dataType: DataType = BooleanType
  //此UDAF對某個輸入是否會返回相同的結果
  override def deterministic: Boolean = true
  //初始化聚合緩沖區的初始值
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=true
  }
  //描述如何根據給定行更新內部緩沖區
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getAs[Boolean](0)&&input.getAs[Boolean](0)
  }
  //描述如何聚合兩個內部緩沖區
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  //生成聚合的最終結果
  override def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

實例化BoolAnd類,並將其注冊為一個函數:

    //准備數據
    val df = spark.range(1).selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
      .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
    df.show()
    //實例化類,注冊為udaf
    val ba = new BoolAnd
    spark.udf.register("booland", ba)
    df.select(ba(col("t")), expr("booland(f)")).show()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM