sparksql系列(八) sparksql優化


        公司數倉遷移完成了,現在所有的數據一天6T的用戶行為數據全部由一個spark腳本,關聯用戶屬性數據生成最終想要的數據。里面讓我感觸最深的是資源的使用spark優化,再此記錄一篇關於sparksql優化的文章,專門總結以下現在使用的資源優化及以前使用的資源優化。

一:資源優化

對於數據處理的分組

        數據有的上報的多一天1T,有的上報的少一天不到1G,但是需要統一去處理,這時候就可以使用數據分組的方法。將大小類似的數據放到一組內進行統一的處理,例子:將1G以下的分成一個組,將1G到10G的分成一個組,10G到100G的分為一個組。具體的需要根據數據具體的分布來確定。

  優點:數據處理均勻,對於文件個數的生成的控制強,統一管理。

  缺點:數據分組是根據數據大小來確定的,數據出現增長的時候如果不及時發現,會出現傾斜問題。

廣播

        一個運算中如果可以使用廣播,那就盡量不使用別的shuffle,因為廣播除了對主節點有壓力之外,別的方面都是最好的shuffle。

        --conf spark.sql.broadcastTimeout=-1                                             //廣播變量永不超時
        --conf spark.sql.autoBroadcastJoinThreshold=104857600             //廣播大小設置為1G

   sparksql運行數據的時候默認第一次都不是廣播的,因為它在開始的時候不知道數據的大小,所以沒辦法廣播,第一次運行完成之后知道了數據大小,第二次才會廣播

       如果確定一個文件可以廣播的話,建議使用顯式broadcast

  import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.broadcast
val sparkSession= SparkSession.builder().master("local").getOrCreate()
val data = sparkSession.read.textFile("/software/java/idea/data")
broadcast(data)

並發度

        sparksql中難免會涉及到文件IO,網絡IO,這時候並發度就顯得很重要,並發度可以簡單理解為線程數。

        --conf spark.default.parallelism=60          如果代碼中沒有shuffle操作或者repartation生成的文件就是並發的個數
        --conf spark.sql.shuffle.partitions=800     

輸出文件壓縮

        HDFS中文本存儲是最浪費空間做法,所以強烈建議開啟壓縮

        --conf spark.hadoop.mapred.output.compress=true
        --conf spark.hadoop.mapred.output.compression.codec=true
        --conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
        --conf spark.hadoop.mapred.output.compression.type=BLOCK

 慢任務檢測

        sparksql任務里面會同時起來多台機器進行作業,這時候如果一台機器有問題,運行的滿會拉低整個人物執行的時間。把慢任務檢測開啟的話,如果有慢任務,就會再次起來一個相同的任務,誰現執行完成就會把另一個殺掉。從而節約執行的時間。

        --conf spark.speculation=true
        --conf spark.speculation.interval=30000
        --conf spark.speculation.quantile=0.8
        --conf spark.speculation.multiplier=1.5

 repartation

        sparksql讀取文件的時候是根據文件個數來決定task個數的,但是如果出現很多小文件的話,就會嚴重影響任務的執行時間,再讀取之后顯示使用repartation,或者在使用newAPIHadoopFile來合並文件大小

        val newData = sparkContext.newAPIHadoopFile("/software/java/idea/data",classOf[CombineTextInputFormat],classOf[LongWritable],classOf[Text])
             .saveAsTextFile("/software/java/idea/end")

hive中也有對應的參數就是:

        set mapreduce.input.fileinputformat.split.minsize = 1024000000;
        set mapreduce.input.fileinputformat.split.maxsize = 1024000000;(默認256M)
        set mapreduce.input.fileinputformat.split.minsize.per.node= 1024000000;
        set mapreduce.input.fileinputformat.split.maxsize.per.node= 1024000000;(默認1b)
        set mapreduce.input.fileinputformat.split.minsize.per.rack= 1024000000; 
        set mapreduce.input.fileinputformat.split.maxsize.per.rack= 1024000000;(默認1b)

 寫mysql

        sparksql里面可以直接寫數據庫,但是真正寫數據庫的時候是根據partation的個數來確定數據庫連接數的,所以在寫數據庫之前最好根據數據條數去reparttation一下,達到最快的寫入速度

 hive入庫

        set hive.msck.path.validation=ignore;MSCK REPAIR TABLE tablename     //適合分區數較少的load,分區數較多的時候反而時間會加長

        ALTER TABLE externaltable_test  ADD PARTITION(ddate=20190920) LOCATION '/hive/table/table_test/dt=20190920';     //當分區數到達一定的數量之后就可以使用這個來load數據,也是只修改元數據的操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM