SparkSQL調優


1、執行計划(過往記憶https://www.iteblog.com/archives/2562.html)

df.explain(true)//顯示邏輯計划和物理計划,不加true只顯示物理計划

2、邏輯計划優化方法:

謂詞下推,列裁剪,常量替換,常量累加

 

3、優化方法

數據源方面:

1、hive 使用parquet格式,不要用textfile。列式存儲便於查詢引擎做謂詞下推、更優的壓縮算法(不同列可以采取不同的壓縮算法)減少IO,塊遍歷等優化方法。

2、Kafka根據key的hash值分區,OGG到Kafka 表名作為key,因此不同大小的表可以更改表名,均衡分到不同partition。

sparkSQL程序方面(spark優化):

1、多次用到的表,做cache。默認進行壓縮。

spark.sql.inMemoryColumnarStorage.compressed //默認為true,為每個列選擇壓縮方式
spark.sql.inMemoryColumnarStorage.batchSize //默認為10000 byte 控制列緩存的批量大小。批次大有助於改善內存使用和壓縮,但是緩存數據會有OOM的風險

2、小於10M的表會自動broadcast,走broadcast join,調高廣播表的大小,使其走broadcast join ,但是太大有可能driver端OOM,-1為禁止自動廣播。

當使用的外部變量較大時,也可把外部變量作為廣播變量進行廣播。

spark.sql.autoBroadcastJoinThreshold //默認10485760 (10 MB)
val listBrodcast = spark.sparkContext.broadcast(list)

3、sparkSQL shuffle read partition默認為200,提高可解決部分數據傾斜問題。

spark.sql.shuffle.partitions //默認200

4、讀不可分割的文件,一個文件一個partition,若小文件過多,影響效率,設置多個文件寫入一個分區

spark.sql.files.openCostInBytes //默認4194304 (4 MB),打開一個文件的時間可讀取4MB數據,因此小於4M的文件讀入一個分區(待驗證)

spark.sql.files.maxPartitionBytes //默認134217728 (128 MB),文件傳入一個分區里的最大字節數

 5、使用高效的算子

1、reduceByKey/aggregateByKey代替groupByKey//前者partition內部會進行預聚合,后者不進行預聚合直接全局shuffle
2、mapPartitions代替map,foreachpartitions 代替foreach//前者會一次性讀取整個partition的數據進行處理,比如建立數據庫連接在foreachpartitions中,不要在foreach
3、filter之后coallease

6、修改序列化器為kryo,並注冊序列化類

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])

 7、join方式(https://www.cnblogs.com/suanec/p/7560399.html)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM