使用Spark加載數據到SQL Server列存儲表


原文地址https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/

介紹

SQL Server的Bulk load默認為串行,這味着例如,一個BULK INSERT語句將生成一個線程將數據插入表中。但是,對於並發負載,您可以使用多個批量插入語句插入同一張表,前提是需要閱讀多個文件。

考慮要求所在的情景:

  • 從大文件加載數據(比如,超過 20 GB)
  • 拆分文件不是一個選項,因為它將是整個大容量負載操作中的一個額外步驟。
  • 每個傳入的數據文件大小不同,因此很難識別大塊數(將文件拆分為)並動態定義為每個大塊執行的批量插入語句。
  • 要加載的多個文件跨越多個 GB(例如超過 20 GB 及以上),每個GB 包含數百萬條記錄。

在這種情況下,使用 Apache Spark是並行批量數據加載到 SQL 表的流行方法之一。

在本文中,我們使用 Azure Databricks spark engine使用單個輸入文件將數據以並行流(多個線程將數據加載到表中)插入 SQL Server。目標表可能是HeapClustered IndexClustered Columnstore Index。本文旨在展示如何利用Spark提供的高度分布式框架,在加載到 SQL Server或 Azure SQL的聚集列存儲索引表之前仔細對數據分區。

本文中分享的最有趣的觀察是展示使用Spark默認配置時列存儲表的行組質量降低,以及如何通過高效使用Spark分區來提高質量。從本質上講,提高行組質量是決定查詢性能的重要因素。

 

環境設置

數據集:

  • 單張表的一個自定義數據集。一個 27 GB 的 CSV 文件,110 M 記錄,共 36 列。其中列的類型有int, nvarchar, datetime等。

數據庫:

  • Azure SQL Database – Business Critical, Gen5 80vCores

ELT 平台:

  • Azure Databricks – 6.6 (includes Apache Spark 2.4.5, Scala 2.11)
  • Standard_DS3_v2 14.0 GB Memory, 4 Cores, 0.75 DBU (8 Worker Nodes Max)

存儲:

  • Azure Data Lake Storage Gen2

先決條件:

在進一步瀏覽本文之前,請花一些時間了解此處將數據加載到聚集列存儲表中的概述:Data Loading performance considerations with Clustered Columnstore indexes

在此測試中,數據從位於 Azure Data Lake Storage Gen 2的 CSV 文件中加載。CSV 文件大小為 27 GB,有 110 M 記錄,有 36 列。這是一個帶有隨機數據的自定義數據集。

批量加載或預處理(ELT\ETL)的典型架構看起來與下圖相似:

使用BULK INSERTS    

在第一次測試中,單個BULK INSERT用於將數據加載到帶有聚集列存儲索引的 Azure SQL 表中,這里沒有意外,根據所使用的 BATCHSIZE,它花了 30 多分鍾才完成。請記住,BULK INSERT是一個單一的線程操作,因此單個流會讀取並將其寫入表中,從而降低負載吞吐量

 

使用Azure Databricks

為了實現寫入到 SQL Server和讀取ADLS (Azure Data Lake Storage) Gen 2的最大並發性和高吞吐量,Azure Databricks 被選為平台的選擇,盡管我們還有其他選擇,即 Azure Data Factory或其他基於Spark引擎的平台。

使用Azure Databricks加載數據的優點是 Spark 引擎通過專用的 Spark API並行讀取輸入文件。這些 API將使用一定數量的分區,這些分區映射到單個或多個輸入文件,映射是在文件的一部分或整個文件上完成的。數據讀入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) 。在這種情況下,數據被加載到DataFrame中,然后進行轉換(設置與目標表匹配的DataFrame schema),然后數據准備寫入 SQL 表。

要將DataFrame中的數據寫入 SQL Server中,必須使用Microsoft's Apache Spark SQL Connector。這是一個高性能的連接器,使您能夠在大數據分析中使用事務數據,和持久化結果用於即席查詢或報告。連接器允許您使用任何 SQL Server(本地數據庫或雲中)作為 Spark 作業的輸入數據源或輸出目標。

  GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks

請注意,目標表具有聚集列存儲索引,以實現高負載吞吐量,但是,您也可以將數據加載到Heap,這也將提供良好的負載性能。對於本文的相關性,我們只討論加載到列存儲表。我們使用不同的 BATCHSIZE 值將數據加載到Clustered Columnstore Index中 -請參閱此文檔,了解 BATCHSIZE 在批量加載到聚集列存儲索引表期間的影響。

以下是Clustered Columnstore Index上的數據加載測試運行,BATCHSIZE為 102400 和 1048576:

 

 

 

 

請注意,我們正在使用 Azure Databricks使用的默認並行和分區,並將數據直接推至 SQL Server聚集列存儲索引表。我們沒有調整 Azure Databricks使用的任何默認配置。無論所定義的批次大小,我們所有的測試都大致在同一時間完成。

將數據加載到 SQL 中的 32 個並發線程是由於上述已提供的數據磚群集的大小。該集群最多有 8 個節點,每個節點有 4 個內核,即 8*4 = 32 個內核,最多可運行 32 個並發線程。

查看行組(Row Groups)

有關我們使用 BATCHSIZE 1048576 插入數據的表格,以下是在 SQL 中創建的行組數:

行組總數:

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')
216 

行組的質量:

SELECT *
FROM sys.dm_db_column_store_row_group_physical_stats
WHERE object_id = OBJECT_ID('largetable110M_1048576')

在這種情況下,我們只有一個delta store在OPEN狀態 (total_rows = 3810) 和 215 行組處於壓縮狀態, 這是有道理的, 因為如果插入的批次大小是>102400 行, 數據不再delta store存儲, 而是直接插入一個壓縮行組的列存儲。在這種情況下,壓縮狀態中的所有行組都有 >102400 條記錄。現在,有關行組的問題是:

為什么我們有216行組?

為什么當我們的BatchSize設置為 1048576 時,每個行組的行數不同?

請注意,每個行組的數據大約等於上述結果集中的 500000 條記錄。

這兩個問題的答案是 Azure Databricks Spark引擎對數據分區控制了寫入聚集列存儲索引表行組的數據行數。讓我們來看看 Azure Databricks為有關數據集創建的分區數:

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216

因此,我們為數據集創建了 216 個分區。請記住,這些是分區的默認數。每個分區都有大約 500000 條記錄。

# Number of records in each partition
from pyspark.sql.functions
import spark_partition_id
df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)

將Spark分區中的記錄數與行組中的記錄數進行比較,您就會發現它們是相等的。甚至分區數也等於行組數。因此,從某種意義上說,1048576 的 BATCHSIZE 正被每個分區中的行數過度拉大。

sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn")
sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser")
sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd")

servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";"
table_name = "<Your Table Name>"

# Write data to SQL table with BatchSize 1048576
df_gl.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", sqldbuser) \
.option("password", sqldbpwd) \
.option("schemaCheckEnabled", False) \
.option("BatchSize", 1048576) \
.option("truncate", True) \
.save()

行組質量

行組質量由行組數和每個行組記錄決定。由於聚集列存儲索引通過掃描單行組的列段掃描表,則最大化每個行組中的行數可增強查詢性能。當行組具有大量行數時,數據壓縮會改善,這意味着從磁盤中讀取的數據更少。為了獲得最佳的查詢性能,目標是最大限度地提高聚集列索引中每個行組的行數。行組最多可有 1048576 行。但是,需要注意的是,由於聚集列索引,行組必須至少有 102400 行才能實現性能提升。此外,請記住,行組的最大大小(100萬)可能在每一個情況下都達到,文件行組大小不只是最大限制的一個因素,但受到以下因素的影響。

  • 字典大小限制,即 16 MB
  • 插入指定的批次大小
  • 表的分區方案,因為行組不跨分區
  • 內存壓力導致行組被修剪
  • 索引重組,重建

話雖如此,現在一個重要的考慮是讓行組大小盡可能接近 100 萬條記錄。在此測試中,由於每個行組的大小接近 500000 條記錄,我們有兩個選項可以達到約 100 萬條記錄的大小:

  • 在Spark中,更改分區數,使每個分區盡可能接近 1048576 條記錄,
  • 保持Spark分區(默認值),一旦數據加載到表中,就運行 ALTER INDEX REORG,將多個壓縮行組組合成一組。

選項#1很容易在Python或Scala代碼中實現,該代碼將在Azure Databricks上運行,負載相當低。

選項#2是數據加載后需要采取的額外步驟,當然,這將消耗 SQL 上的額外 CPU ,並增加整個加載過程所需的時間。

為了保持本文的相關性,讓我們來討論更多關於Spark分區,以及如何從其默認值及其在下一節的影響中更改它。

 

Spark Partitioning

Spark 引擎最典型的輸入源是一組文件,這些文件通過將每個節點上的適當分區划分為一個或多個 Spark API來讀取這些文件。這是 Spark 的自動分區,將用戶從確定分區數量的憂慮中抽象出來,如果用戶想挑戰,就需控制分區的配置。根據環境和環境設置計算的分區的默認數通常適用於大多數情況下。但是,在某些情況下,更好地了解分區是如何自動計算的,如果需要,用戶可以更改分區計數,從而在性能上產生明顯差異。

注意:大型Spark群集可以生成大量並行線程,這可能導致 Azure SQL DB 上的內存授予爭議。由於內存超時,您必須留意這種可能性,以避免提前修剪。請參閱本文以了解更多詳細信息,了解表的模式和行數等也可能對內存授予產生影響。

spark.sql.files.maxPartitionBytes是控制分區大小的重要參數,默認設置為128 MB。它可以調整以控制分區大小,因此也會更改由此產生的分區數。

spark.default.parallelism這相當於worker nodes核心的總數。

最后,我們有coalesce()repartition(),可用於增加/減少分區數,甚至在數據已被讀入Spark。

只有當您想要減少分區數時,才能使用coalesce() 因為它不涉及數據的重排。請考慮此data frame的分區數為 16,並且您希望將其增加到 32,因此您決定運行以下命令。

df = df.coalesce(32)
print(df.rdd.getNumPartitions())

但是,分區數量不會增加到 32 個,並且將保持在 16 個,因為coalesce()不涉及數據重排。這是一個性能優化的實現,因為無需昂貴的數據重排即可減少分區。

如果您想將上述示例的分區數減少到 8,則會獲得預期的結果。

df = df.coalesce(8)
print(df.rdd.getNumPartitions())

這將合並數據並產生 8 個分區。

repartition() 另一個幫助調整分區的函數。對於同一示例,您可以使用以下命令將數據放入 32 個分區。

df = df.repartition(32)
print(df.rdd.getNumPartitions())

最后,還有其他功能可以改變分區數,其中是groupBy(), groupByKey(), reduceByKey() join()。當在 DataFrame 上調用這些功能時,會導致跨機器或通常跨執行器對數據進行重排,最終在默認情況下將數據重新划分為 200 個分區。此默認 數字可以使用spark.sql.shuffle.partitions配置進行控制。

 

數據加載

現在,了解分區在 Spark 中的工作原理以及如何更改分區,是時候實施這些學習了。在上述實驗中,分區數為 216默認情況下),這是因為文件的大小為 27 GB,因此將 27 GB 除以 128 MB(默認情況下由 Spark 定義的最大分區字節)提供了216 個分區

Spark重新分區的影響

對 PySpark 代碼的更改是重新分區數據並確保每個分區現在有 1048576 行或接近它。為此,首先在DataFrame中獲取記錄數量,然后將其除以 1048576。此划分的結果將是用於加載數據的分區數,假設分區數為n但是,可能有一些分區現在有 >=1048576 行,因此,為了確保每個分區都<=1048576行,我們將分區數作為n+1使用n+1在分區結果為 0 的情況下也很重要。在這種情況下,您將有一個分區。

由於數據已加載到DataFrame中,而 Spark 默認已創建分區,我們現在必須再次重新分區數據,分區數等於n+1。

# Get the number of partitions before re-partitioning
print(df_gl.rdd.getNumPartitions())
216
 
# Get the number of rows of DataFrame and get the number of partitions to be used.
rows = df_gl.count()
n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)

因此,在重新划分分區后,分區數量從216 個減少到 105 (n+1),因此每個分區現在都有接近1048576行。

此時,讓我們將數據再次寫入 SQL 表中,並驗證行組質量。這一次,每個行組的行數將接近每個分區中的行數(略低於 1048576)。讓我們看看下面:

重新分區后的行組

SELECT COUNT(1)
FROM sys.dm_db_column_store_row_group_physical_stats 
WHERE object_id = OBJECT_ID('largetable110M_1048576')
105

重新分區后的行組質量

從本質上講,這次整體數據加載比之前慢了 2 秒,但行組的質量要好得多。行組數量減少到一半,行組幾乎已填滿到最大容量。請注意,由於DataFrame的重新划分,將消耗額外的時間,這取決於數據幀的大小和分區數。

請注意,您不會總是獲得每row_group 100 萬條記錄。它將取決於數據類型、列數等,以及之前討論的因素-請參閱sys.dm_db_column_store_row_group_physical_stats

 

關鍵點

  1. 建議在將數據批量加載到 SQL Server時使用BatchSize(無論是 CCI 還是Heap)。但是,如果 Azure Databricks 或任何其他 Spark 引擎用於加載數據,則數據分區在確定聚集列存儲索引中的行組質量方面起着重要作用。
  2. 使用BULK INSERT命令加載數據將遵守命令中提到的BATCHSIZE,除非其他因素影響插入行組的行數。
  3. Spark 中的數據分區不應基於某些隨機數,最好動態識別分區數,並將n+1 用作分區數
  4. 由於聚集列存儲索引通過掃描單行組的列段掃描表,則最大化每個行組中的記錄數可增強查詢性能。為了獲得最佳的查詢性能,目標是最大限度地提高聚集列存儲索引中每個行組的行數。
  5. Azure Databricks的數據加載速度在很大程度上取決於選擇的集群類型及其配置。此外,請注意,到目前為止,Azure Databricks連接器僅支持Apache Spark 2.4.5。微軟已經發布了對Spark 3.0的支持,它目前在預覽版中,我們建議您在開發測試環境中徹底測試此連接器。
  6. 根據data frame的大小、列數、數據類型等,進行重新划分的時間會有所不同,因此您必須從端端角度考慮這次對整體數據加載的考慮。

 

Azure Data Factory

這是一篇非常好的數據ETL文章,Spark和SQL Server列存儲表功能的組合。

Azure Data Factory是當前最成熟,功能最強大的ETL/ELT數據集成服務。其架構就是使用Spark作為計算引擎。

https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM