轉自:http://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html
列式存儲布局(比如 Parquet)可以加速查詢,因為它只檢查所有需要的列並對它們的值執行計算,因此只讀取一個數據文件或表的小部分數據。Parquet 還支持靈活的壓縮選項,因此可以顯著減少磁盤上的存儲。
如果您在 HDFS 上擁有基於文本的數據文件或表,而且正在使用 Spark SQL 對它們執行查詢,那么強烈推薦將文本數據文件轉換為 Parquet 數據文件,以實現性能和存儲收益。當然,轉換需要時間,但查詢性能的提升在某些情況下可能達到 30 倍或更高,存儲的節省可高達 75%!
已有文章介紹使用 Parquet 存儲為 BigSQL、Hive 和 Impala 帶來類似的性能收益,本文將介紹如何編寫一個簡單的 Scala 應用程序,將現有的基於文本的數據文件或表轉換為 Parquet 數據文件,還將展示給 Spark SQL 帶來的實際存儲節省和查詢性能提升。
讓我們轉換為 Parquet 吧!
Spark SQL 提供了對讀取和寫入 Parquet 文件的支持,能夠自動保留原始數據的模式。Parquet 模式通過 Data Frame API,使數據文件對 Spark SQL 應用程序 “不言自明”。當然,Spark SQL 還支持讀取已存儲為 Parquet 的現有 Hive 表,但您需要配置 Spark,以便使用 Hive 的元存儲來加載所有信息。在我們的示例中,不涉及 Hive 元存儲。
以下 Scala 代碼示例將讀取一個基於文本的 CSV 表,並將它寫入 Parquet 表:
def convert(sqlContext: SQLContext, filename: String, schema: StructType, tablename: String) { // import text-based table first into a data frame val df = sqlContext.read.format("com.databricks.spark.csv"). schema(schema).option("delimiter", "|").load(filename) // now simply write to a parquet file df.write.parquet("/user/spark/data/parquet/"+tablename) } // usage exampe -- a tpc-ds table called catalog_page schema= StructType(Array( StructField("cp_catalog_page_sk", IntegerType,false), StructField("cp_catalog_page_id", StringType,false), StructField("cp_start_date_sk", IntegerType,true), StructField("cp_end_date_sk", IntegerType,true), StructField("cp_department", StringType,true), StructField("cp_catalog_number", LongType,true), StructField("cp_catalog_page_number", LongType,true), StructField("cp_description", StringType,true), StructField("cp_type", StringType,true))) convert(sqlContext, hadoopdsPath+"/catalog_page/*", schema, "catalog_page")
上面的代碼將會讀取 hadoopdsPath+"/catalog_page/* 中基於文本的 CSV 文件,並將轉換的 Parquet 文件保存在 /user/spark/data/parquet/ 下。此外,轉換的 Parquet 文件會在 gzip 中自動壓縮,因為 Spark 變量 spark.sql.parquet.compression.codec 已在默認情況下設置為 gzip。您還可以將壓縮編解碼器設置為 uncompressed、snappy 或 lzo。
轉換 1 TB 數據將花費多長時間?
50 分鍾,在一個 6 數據節點的 Spark v1.5.1 集群上可達到約 20 GB/分的吞吐量。使用的總內存約為 500GB。HDFS 上最終的 Parquet 文件的格式為:
... /user/spark/data/parquet/catalog_page/part-r-00000-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet /user/spark/data/parquet/catalog_page/part-r-00001-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet ...
存儲節省
以下 Linux 輸出顯示了 TEXT 和 PARQUET 在 HDFS 上的大小比較:
% hadoop fs -du -h -s /user/spark/hadoopds1000g 897.9 G /user/spark/hadoopds1000g % hadoop fs -du -h -s /user/spark/data/parquet 231.4 G /user/spark/data/parquet
1 TB 數據的存儲節省了將近 75%!
查詢性能提升
Parquet 文件是自描述性的,所以保留了模式。要將 Parquet 文件加載到 DataFrame 中並將它注冊為一個 temp 表,可執行以下操作:
val df = sqlContext.read.parquet(filename) df.show df.registerTempTable(tablename)
要對比性能,然后可以分別對 TEXT 和 PARQUET 表運行以下查詢(假設所有其他 tpc-ds 表也都已轉換為 Parquet)。您可以利用 spark-sql-perf 測試工具包來執行查詢測試。舉例而言,現在來看看 TPC-DS 基准測試中的查詢 #76,
("q76", """ | SELECT | channel, col_name, d_year, d_qoy, i_category, COUNT(*) sales_cnt, | SUM(ext_sales_price) sales_amt | FROM( | SELECT | 'store' as channel, ss_store_sk col_name, d_year, d_qoy, i_category, | ss_ext_sales_price ext_sales_price | FROM store_sales, item, date_dim | WHERE ss_store_sk IS NULL | AND ss_sold_date_sk=d_date_sk | AND ss_item_sk=i_item_sk | UNION ALL | SELECT | 'web' as channel, ws_ship_customer_sk col_name, d_year, d_qoy, i_category, | ws_ext_sales_price ext_sales_price | FROM web_sales, item, date_dim | WHERE ws_ship_customer_sk IS NULL | AND ws_sold_date_sk=d_date_sk | AND ws_item_sk=i_item_sk | UNION ALL | SELECT | 'catalog' as channel, cs_ship_addr_sk col_name, d_year, d_qoy, i_category, | cs_ext_sales_price ext_sales_price | FROM catalog_sales, item, date_dim | WHERE cs_ship_addr_sk IS NULL | AND cs_sold_date_sk=d_date_sk | AND cs_item_sk=i_item_sk) foo | GROUP BY channel, col_name, d_year, d_qoy, i_category | ORDER BY channel, col_name, d_year, d_qoy, i_category | limit 100
查詢時間如下:
TIME TEXT PARQUET Query time (sec) 698 21
查詢 76 的查詢時間從將近 12 分鍾加速到不到半分鍾,提高了 30 倍!