Spark SQL dropDuplicates


spark sql 數據去重

在對spark sql 中的dataframe數據表去除重復數據的時候可以使用dropDuplicates()方法

dropDuplicates()有4個重載方法

  • 第一個def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

這個方法,不需要傳入任何的參數,默認根據所有列進行去重,然后按數據行的順序保留每行數據出現的第一條。

/**
   * Returns a new Dataset that contains only the unique rows from this Dataset.
   * This is an alias for `distinct`.
   *
   * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
   * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
   * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
   * the state. In addition, too late data older than watermark will be dropped to avoid any
   * possibility of duplicates.
   *
   * @group typedrel
   * @since 2.0.0
   */
  def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

  • 第二個def dropDuplicates(colNames: Seq[String])

傳入的參數是一個序列。你可以在序列中指定你要根據哪些列的重復元素對數據表進行去重,然后也是返回每一行數據出現的第一條

/**
   * (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
   * the subset of columns.
   *
   * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
   * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
   * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
   * the state. In addition, too late data older than watermark will be dropped to avoid any
   * possibility of duplicates.
   *
   * @group typedrel
   * @since 2.0.0
   */
  def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
    val resolver = sparkSession.sessionState.analyzer.resolver
    val allColumns = queryExecution.analyzed.output
    val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
      // It is possibly there are more than one columns with the same name,
      // so we call filter instead of find.
      val cols = allColumns.filter(col => resolver(col.name, colName))
      if (cols.isEmpty) {
        throw new AnalysisException(
          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
      }
      cols
    }
    Deduplicate(groupCols, planWithBarrier)
  }
  • 第三個def dropDuplicates(colNames: Array[String])

傳入的參數是一個數組,然后方法會把數組轉換為序列然后再調用第二個方法。

/**
   * Returns a new Dataset with duplicate rows removed, considering only
   * the subset of columns.
   *
   * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
   * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
   * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
   * the state. In addition, too late data older than watermark will be dropped to avoid any
   * possibility of duplicates.
   *
   * @group typedrel
   * @since 2.0.0
   */
  def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
  • 第四個def dropDuplicates(col1: String, cols: String*)

傳入的參數為字符串,在方法體內會把你傳入的字符串組合成一個序列再調用第二個方法。

/**
   * Returns a new [[Dataset]] with duplicate rows removed, considering only
   * the subset of columns.
   *
   * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
   * will keep all data across triggers as intermediate state to drop duplicates rows. You can use
   * [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
   * the state. In addition, too late data older than watermark will be dropped to avoid any
   * possibility of duplicates.
   *
   * @group typedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
    val colNames: Seq[String] = col1 +: cols
    dropDuplicates(colNames)
  }

第三和第四個本質上還是調用了第二個方法,所以我們在使用的時候如果需要根據指定的列進行數據去重,可以直接傳入一個Seq。

第一個方法默認根據所有列去重,實際上也是調用了第二個方法,然后傳入參數this.columns,即所有的列組成的Seq。

所以各位想深究dropDuplicate()去重的核心代碼,只需要研究第二個去重方法即可。等我有時間我也會把去重的核心源碼講解繼續補充。

dropDuplicates()的坑!

在使用dropDuplicates() 在去重的時候,我發現有時候還是會出現重復數據的情況。

我分析了一下還出現重復數據的原因:

  1. 數據存在多個excuter中

因為spark是分布式計算的,數據在計算的時候會分布在不同的excutor上,使用dropDuplicate去重的時候,可能只是一個excutor內的數據進行了去重,別的excutor上可能還會有重復的數據。

  1. 數據是存放在不同分區的,

因為spark是分布式計算的,數據在計算的時候會分散在不同的分區中,使用dropDuplicate去重的時候,不同的區分可能還會存在相同的數據。

我試了只啟動一個excutor多分區的情況下進行計算,沒有出現重復的數據,然后多個excutor將數據先合並到一個分區在去重還是有重復的數據。所以覺得可能是第一種猜測的情況比較大,但是如果只使用一個excutor就失去了分布式計算的意義和優勢,所以還是得想想其它辦法。

各位有什么好的解決辦法也可以在評論區交流!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM