spark sql 數據去重
在對spark sql 中的dataframe數據表去除重復數據的時候可以使用dropDuplicates()
方法
dropDuplicates()有4個重載方法
- 第一個
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
這個方法,不需要傳入任何的參數,默認根據所有列進行去重,然后按數據行的順序保留每行數據出現的第一條。
/**
* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `distinct`.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
- 第二個
def dropDuplicates(colNames: Seq[String])
傳入的參數是一個序列。你可以在序列中指定你要根據哪些列的重復元素對數據表進行去重,然后也是返回每一行數據出現的第一條
/**
* (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
cols
}
Deduplicate(groupCols, planWithBarrier)
}
- 第三個
def dropDuplicates(colNames: Array[String])
傳入的參數是一個數組,然后方法會把數組轉換為序列然后再調用第二個方法。
/**
* Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
- 第四個
def dropDuplicates(col1: String, cols: String*)
傳入的參數為字符串,在方法體內會把你傳入的字符串組合成一個序列再調用第二個方法。
/**
* Returns a new [[Dataset]] with duplicate rows removed, considering only
* the subset of columns.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
val colNames: Seq[String] = col1 +: cols
dropDuplicates(colNames)
}
第三和第四個本質上還是調用了第二個方法,所以我們在使用的時候如果需要根據指定的列進行數據去重,可以直接傳入一個Seq。
第一個方法默認根據所有列去重,實際上也是調用了第二個方法,然后傳入參數this.columns
,即所有的列組成的Seq。
所以各位想深究dropDuplicate()
去重的核心代碼,只需要研究第二個去重方法即可。等我有時間我也會把去重的核心源碼講解繼續補充。
dropDuplicates()的坑!
在使用dropDuplicates() 在去重的時候,我發現有時候還是會出現重復數據的情況。
我分析了一下還出現重復數據的原因:
- 數據存在多個excuter中
因為spark是分布式計算的,數據在計算的時候會分布在不同的excutor上,使用dropDuplicate去重的時候,可能只是一個excutor內的數據進行了去重,別的excutor上可能還會有重復的數據。
- 數據是存放在不同分區的,
因為spark是分布式計算的,數據在計算的時候會分散在不同的分區中,使用dropDuplicate去重的時候,不同的區分可能還會存在相同的數據。
我試了只啟動一個excutor多分區的情況下進行計算,沒有出現重復的數據,然后多個excutor將數據先合並到一個分區在去重還是有重復的數據。所以覺得可能是第一種猜測的情況比較大,但是如果只使用一個excutor就失去了分布式計算的意義和優勢,所以還是得想想其它辦法。
各位有什么好的解決辦法也可以在評論區交流!