Spark2.2 saveAsTable 函數使用 overWrite 設置 Partition 會造成全覆蓋的問題


 

在使用 CDH 6.0.X 的版本還是自帶的是 Spark2.2 的版本,2.2 版本的 Spark 使用 saveAsTable 如果使用overWrite PartitionBy 的功能會有和 hive 行為不一致的地方。

比如我們目前有兩個分區 2019-03-22 和 2019-03-23 兩個分區,現在我們使用 saveAsTable 想覆蓋其中一天的分區,結果卻是將整個所有分區遮蓋了。重建了整個目錄,這明顯不是我們想要的到的結果。

好在 spark 在 2.3 版本中已經修復了這個問題,如果遇到的同學直接升級 cdh 的版本到 6.1.x 那么將會獲得 spark2.4 ,就可以解決這個問題。但是由於升級集群需要牽扯到的精力的確還是太多,成本太高。所以我還是選擇另外一個辦法來解決這個問題,使用 hive 的語法來 overwrite 分區。

Hive 的分區有兩種情況:

靜態分區 - 我們提供一個分區列表,由 Hive 根據這個列表值進行分區

動態分區 - 我們提供一個列,讓其值變成分區的值,比如上面提到的日期。

來看個例子

DROP TABLE IF EXISTS stats;
CREATE EXTERNAL TABLE stats (
    ad              STRING,
    impressions     INT,
    clicks          INT
) PARTITIONED BY (country STRING, year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
LINES TERMINATED BY '\n';
MSCK REPAIR TABLE stats;
-- Specify static partitions INSERT OVERWRITE TABLE stats PARTITION(country = 'US', year = 2017, month = 3, day = 1) SELECT ad, SUM(impressions), SUM(clicks) FROM impression_logs WHERE log_day = 1 GROUP BY ad;
-- Load data into partitions dynamically SET hive.exec.dynamic.partition = true; SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT OVERWRITE TABLE stats PARTITION(country
= 'US', year = 2017, month = 3, day) SELECT ad, SUM(impressions), SUM(clicks), log_day as day FROM impression_logs GROUP BY ad;

第二個插入操作指定使用 log_day 來作為動態 partition 的一部分。可以實現無數個分區,而第一種插入只能被歸類為一種分區。

最后我們可以讓 spark 來直接使用 sql 將數據寫入到表中以達到我們的目的。

static partitions

self.ss.sql("""
                        INSERT OVERWRITE TABLE analytics_db.alpha_md_day_dump_users
                        PARTITION(the_day='{}')
                        SELECT *
                        FROM _md_day_dump_users
                    """.format(st))

---------------------------------------------------------------

dynamic partitions

self.ss.sql("""
                        INSERT OVERWRITE TABLE analytics_db.alpha_md_day_dump_users
                        PARTITION(the_day=the_day)
                        SELECT the_day, xx, xx, xx
                        FROM _md_day_dump_users
                    """)

如果生成小文件過多我們可以在寫入之前操縱 df進行一次 repartitions。

 

 

Reference:

https://medium.com/a-muggles-pensieve/writing-into-dynamic-partitions-using-spark-2e2b818a007a   Writing Into Dynamic Partitions Using Spark
https://issues.apache.org/jira/browse/SPARK-20236   Overwrite a partitioned data source table should only overwrite related partitions

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM