一:資源優化
對於數據處理的分組
數據有的上報的多一天1T,有的上報的少一天不到1G,但是需要統一去處理,這時候就可以使用數據分組的方法。將大小類似的數據放到一組內進行統一的處理,例子:將1G以下的分成一個組,將1G到10G的分成一個組,10G到100G的分為一個組。具體的需要根據數據具體的分布來確定。
優點:數據處理均勻,對於文件個數的生成的控制強,統一管理。
缺點:數據分組是根據數據大小來確定的,數據出現增長的時候如果不及時發現,會出現傾斜問題。
廣播
一個運算中如果可以使用廣播,那就盡量不使用別的shuffle,因為廣播除了對主節點有壓力之外,別的方面都是最好的shuffle。
--conf spark.sql.broadcastTimeout=-1 //廣播變量永不超時
--conf spark.sql.autoBroadcastJoinThreshold=104857600 //廣播大小設置為1G
sparksql運行數據的時候默認第一次都不是廣播的,因為它在開始的時候不知道數據的大小,所以沒辦法廣播,第一次運行完成之后知道了數據大小,第二次才會廣播
如果確定一個文件可以廣播的話,建議使用顯式broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.broadcast
val sparkSession= SparkSession.builder().master("local").getOrCreate()
val data = sparkSession.read.textFile("/software/java/idea/data")
broadcast(data)
並發度
sparksql中難免會涉及到文件IO,網絡IO,這時候並發度就顯得很重要,並發度可以簡單理解為線程數。
--conf spark.default.parallelism=60 如果代碼中沒有shuffle操作或者repartation生成的文件就是並發的個數
--conf spark.sql.shuffle.partitions=800
輸出文件壓縮
HDFS中文本存儲是最浪費空間做法,所以強烈建議開啟壓縮
--conf spark.hadoop.mapred.output.compress=true
--conf spark.hadoop.mapred.output.compression.codec=true
--conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
--conf spark.hadoop.mapred.output.compression.type=BLOCK
慢任務檢測
sparksql任務里面會同時起來多台機器進行作業,這時候如果一台機器有問題,運行的滿會拉低整個人物執行的時間。把慢任務檢測開啟的話,如果有慢任務,就會再次起來一個相同的任務,誰現執行完成就會把另一個殺掉。從而節約執行的時間。
--conf spark.speculation=true
--conf spark.speculation.interval=30000
--conf spark.speculation.quantile=0.8
--conf spark.speculation.multiplier=1.5
repartation
sparksql讀取文件的時候是根據文件個數來決定task個數的,但是如果出現很多小文件的話,就會嚴重影響任務的執行時間,再讀取之后顯示使用repartation,或者在使用newAPIHadoopFile來合並文件大小
val newData = sparkContext.newAPIHadoopFile("/software/java/idea/data",classOf[CombineTextInputFormat],classOf[LongWritable],classOf[Text])
.saveAsTextFile("/software/java/idea/end")
hive中也有對應的參數就是:
set mapreduce.input.fileinputformat.split.minsize = 1024000000;
set mapreduce.input.fileinputformat.split.maxsize = 1024000000;(默認256M)
set mapreduce.input.fileinputformat.split.minsize.per.node= 1024000000;
set mapreduce.input.fileinputformat.split.maxsize.per.node= 1024000000;(默認1b)
set mapreduce.input.fileinputformat.split.minsize.per.rack= 1024000000;
set mapreduce.input.fileinputformat.split.maxsize.per.rack= 1024000000;(默認1b)
寫mysql
sparksql里面可以直接寫數據庫,但是真正寫數據庫的時候是根據partation的個數來確定數據庫連接數的,所以在寫數據庫之前最好根據數據條數去reparttation一下,達到最快的寫入速度
hive入庫
set hive.msck.path.validation=ignore;MSCK REPAIR TABLE tablename //適合分區數較少的load,分區數較多的時候反而時間會加長
ALTER TABLE externaltable_test ADD PARTITION(ddate=20190920) LOCATION '/hive/table/table_test/dt=20190920'; //當分區數到達一定的數量之后就可以使用這個來load數據,也是只修改元數據的操作。
