針對hive on mapreduce
1:我們可以通過一些配置項來使Hive在執行結束后對結果文件進行合並:
參數詳細內容可參考官網:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
hive.merge.mapfiles 在 map-only job后合並文件,默認true hive.merge.mapredfiles 在map-reduce job后合並文件,默認false hive.merge.size.per.task 合並后每個文件的大小,默認256000000 hive.merge.smallfiles.avgsize 平均文件大小,是決定是否執行合並操作的閾值,默認16000000
2:如果結果表使用了壓縮格式,則必須配合Sequence File來存儲,否則無法進行合並
3:Hadoop的歸檔文件格式也是解決小文件問題的方式之一。而且Hive提供了原生支持,如果使用的不是分區表,則可創建成外部表,並使用har://協議來指定路徑
4:對於通常的應用,使用Hive結果合並就能達到很好的效果。如果不想因此增加運行時間,可以自行編寫一些腳本,在系統空閑時對分區內的文件進行合並,也能達到目的。
5:Reducer數量的減少也即意味着結果文件的減少,從而解決產生小文件的問題。
但是,對於通過sparksql來處理數據的話,在conf里添加上面參數調整是沒有作用的,不過可以通過下面的方式來規避小文件:
1.通過使用repartition重分區動態調整文件輸出個數
比如 spark.sql("sql").repartition(1).write().mode(SaveMode.Overwrite).saveAsTable("test");
2.使用Adaptive Execution動態設置shuffle partition
SparkConf conf = new SparkConf(); conf.set("spark.sql.adaptive.enabled", "true"); conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "67108864b"); conf.set("spark.sql.adaptive.join.enabled", "true"); conf.set("spark.sql.autoBroadcastJoinThreshold", "20971520"); SparkSession spark = SparkSession .builder() .appName("JointSitePlan") .master("local") .config(conf) .enableHiveSupport() .getOrCreate();
shuffle partition是通過參數spark.sql.shuffle.partitions來指定的,默認是200,但是對於數據不大,或者數據傾斜的情況,會生成很多的小文件,幾兆甚至幾KB大小,自適應執行則會根據參數 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 動態調整reducer數量,詳細可見 上一篇文章