針對hive on mapreduce
1:我們可以通過一些配置項來使Hive在執行結束后對結果文件進行合並:
參數詳細內容可參考官網:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
1
2
3
4
|
hive.merge.mapfiles 在 map-only job后合並文件,默認
true
hive.merge.mapredfiles 在map-reduce job后合並文件,默認
false
hive.merge.size.per.task 合並后每個文件的大小,默認256000000
hive.merge.smallfiles.avgsize 平均文件大小,是決定是否執行合並操作的閾值,默認16000000
|
例如:
set hive.merge.mapfiles = true:在只有map的作業結束時合並小文件, set hive.merge.mapredfiles = true:在Map-Reduce的任務結束時合並小文件,默認為False; set hive.merge.size.per.task = 256000000; 合並后每個文件的大小,默認256000000 set hive.merge.smallfiles.avgsize=256000000; 當輸出文件的平均大小小於該值時並且(mapfiles和mapredfiles為true)
2:如果結果表使用了壓縮格式,則必須配合Sequence File來存儲,否則無法進行合並
3:Hadoop的歸檔文件格式也是解決小文件問題的方式之一。而且Hive提供了原生支持,如果使用的不是分區表,則可創建成外部表,並使用har://協議來指定路徑
4:對於通常的應用,使用Hive結果合並就能達到很好的效果。如果不想因此增加運行時間,可以自行編寫一些腳本,在系統空閑時對分區內的文件進行合並,也能達到目的。
5:Reducer數量的減少也即意味着結果文件的減少,從而解決產生小文件的問題。
但是,對於通過sparksql來處理數據的話,在conf里添加上面參數調整是沒有作用的,不過可以通過下面的方式來規避小文件:
1.通過使用repartition重分區動態調整文件輸出個數
比如 spark.sql("sql").repartition(1).write().mode(SaveMode.Overwrite).saveAsTable("test");
2.使用Adaptive Execution動態設置shuffle partition
1
2
3
4
5
6
7
8
9
10
11
12
13
|
SparkConf conf =
new
SparkConf();
conf.set(
"spark.sql.adaptive.enabled"
,
"true"
);
conf.set(
"spark.sql.adaptive.shuffle.targetPostShuffleInputSize"
,
"67108864b"
);
conf.set(
"spark.sql.adaptive.join.enabled"
,
"true"
);
conf.set(
"spark.sql.autoBroadcastJoinThreshold"
,
"20971520"
);
SparkSession spark = SparkSession
.builder()
.appName(
"JointSitePlan"
)
.master(
"local"
)
.config(conf)
.enableHiveSupport()
.getOrCreate();
|
shuffle partition是通過參數spark.sql.shuffle.partitions來指定的,默認是200,但是對於數據不大,或者數據傾斜的情況,會生成很多的小文件,幾兆甚至幾KB大小,自適應執行則會根據參數 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 動態調整reducer數量.
附:
我在spark sql執行insert overwrite操作時,僅加了set spark.sql.hive.mergeFiles=true; 也可以有效阻止小文件的產生,可能是因為我的數據量本身就比較大
參考: