一、任务调参
1.1 spark.executor.memory
executor执行分配的内存大小
1.2 spark.executor.cores
executor执行分配的核数
1.3 spark.executor.instances
需要的executor个数,等同num-executors,可以使用 spark.dynamicAllocation.enabled
1.4 spark.executor.memoryOverhead="1024m'
这个参数表示每个executor配备的可使用的堆外内存大小。在调spark应用程序时可能经常会碰到OOM,然后错误日志中提示让提高这个参数指定的值的情况。这种情况其实多发生在有数据倾斜的情况,这个调整经常是治标不治本,解决倾斜是根本。默认情况是配置的executor内存容量的10%
1.5 spark.executor.extraJavaOptions
Jvm参数值,我们有时候发现我们的job突然跑的很慢,一方面可以去看看Yarn上的资源分配情况,另一方面也可以没看看是不是有大量的时间用来做GC导致的。
例:最大的堆外内存:spark.executor.extraJavaOptions="-XX:MaxDirectMemorySize=1G"
1.6 spark.dynamicAllocation.enabled
可以避免使用spark.executor.instances或num-executors,使用动态资源分配,动态分配可以使的 Spark 的应用在有后续积压的在等待的 task 时请求 executor,并且在空闲时释放这些 executor
1.7 推测执行
推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例。spark推测式执行默认是关闭的,可通过spark.speculation属性来开启
--conf spark.speculation=true
--conf spark.speculation.interval=100
--conf spark.speculation.quantile=0.9
--conf spark.speculation.multiplier=1.5
(1)当spark.speculation设置为true时,就会对task开启推测执行,也就是在一个stage下跑的慢的tasks有机会重新启动;
(2)spark.speculation.interval,100ms,Spark检测tasks推测机制的间隔时间;
(3)spark.speculation.quantile,0.9,当一个stage下多少百分比的tasks运行完毕后才开启推测执行机制,0.9即90%的任务都运行完毕后开启推测执行;
(4)spark.speculation.multiplier,1.5,一个task的运行时间是所有task的运行时间中位数的几倍时,才会被认为该task需要重新启动。
1.8 配置metastore
配置[hive/in-memory]作为metastore,默认in-memory
在写非SQL代码时,SparkSession初始化时默认是用in-memory,不会加载hive-site.xml,如果需要访问元数据,需要在初始化时添加enableHiveSupport,设置metastore为hive
spark.sql.catalogImplementation="hive"
1.9 提升Shuffle计算性能
spark.shuffle.service.enabled=true
NodeManager中一个长期运行的辅助服务,用于提升Shuffle计算性能。默认为false,表示不启用该功能。
spark.shuffle.service.port 7337
Shuffle服务监听数据获取请求的端口。可选配置,默认值为“7337”
1.10 动态分区
hive.exec.dynamic.partition="true"
hive.exec.dynamic.partition.mode="nonstrict"
1.11 设置类型隐式转换
Hive 默认支持隐式转换,Spark需要设置以下参数来有限度支持隐式转换
spark.sql.storeAssignmentPolicy=LEGACY
1.12 小文件合并问题
Spark SQL在写入数据的时候是并行写入,并没有一个合并的过程。小文件过多,会增大Namenode的压力,同时对查询性能也有很大影响。通常在Hive中可以引入 hive.spark.mergefiles=true 来为hive的执行计划增加一个合并Job,但Spark SQL不支持这个做法。
spark.sql.adaptive.enabled=true; --动态调整Shuffle Partition
spark.sql.adaptive.advisoryPartitionSizeInBytes=262144000; --合并连续的随机播放分区,以避免执行过多的小任务。
spark.sql.adaptive.maxNumPostShufflePartitions=200; --reduce个数区间最大值,同时也是shuffle分区数的初始值
spark.sql.adaptive.forceApply=true; --强制开启AQE
spark.sql.adaptive.coalescePartitions.parallelismFirst=false; --不适用默认并行度设置
spark.sql.adaptive.coalescePartitions.minPartitionSize =52428800; --动态合并