1、執行計划(過往記憶https://www.iteblog.com/archives/2562.html)
df.explain(true)//顯示邏輯計划和物理計划,不加true只顯示物理計划
2、邏輯計划優化方法:
謂詞下推,列裁剪,常量替換,常量累加
3、優化方法
數據源方面:
1、hive 使用parquet格式,不要用textfile。列式存儲便於查詢引擎做謂詞下推、更優的壓縮算法(不同列可以采取不同的壓縮算法)減少IO,塊遍歷等優化方法。
2、Kafka根據key的hash值分區,OGG到Kafka 表名作為key,因此不同大小的表可以更改表名,均衡分到不同partition。
sparkSQL程序方面(spark優化):
1、多次用到的表,做cache。默認進行壓縮。
spark.sql.inMemoryColumnarStorage.compressed //默認為true,為每個列選擇壓縮方式 spark.sql.inMemoryColumnarStorage.batchSize //默認為10000 byte 控制列緩存的批量大小。批次大有助於改善內存使用和壓縮,但是緩存數據會有OOM的風險
2、小於10M的表會自動broadcast,走broadcast join,調高廣播表的大小,使其走broadcast join ,但是太大有可能driver端OOM,-1為禁止自動廣播。
當使用的外部變量較大時,也可把外部變量作為廣播變量進行廣播。
spark.sql.autoBroadcastJoinThreshold //默認10485760 (10 MB)
val listBrodcast = spark.sparkContext.broadcast(list)
3、sparkSQL shuffle read partition默認為200,提高可解決部分數據傾斜問題。
spark.sql.shuffle.partitions //默認200
4、讀不可分割的文件,一個文件一個partition,若小文件過多,影響效率,設置多個文件寫入一個分區
spark.sql.files.openCostInBytes //默認4194304 (4 MB),打開一個文件的時間可讀取4MB數據,因此小於4M的文件讀入一個分區(待驗證) spark.sql.files.maxPartitionBytes //默認134217728 (128 MB),文件傳入一個分區里的最大字節數
5、使用高效的算子
1、reduceByKey/aggregateByKey代替groupByKey//前者partition內部會進行預聚合,后者不進行預聚合直接全局shuffle 2、mapPartitions代替map,foreachpartitions 代替foreach//前者會一次性讀取整個partition的數據進行處理,比如建立數據庫連接在foreachpartitions中,不要在foreach 3、filter之后coallease
6、修改序列化器為kryo,並注冊序列化類
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])
7、join方式(https://www.cnblogs.com/suanec/p/7560399.html)