Spark調優


 

1. Explain查看執行計划

Spark 3.0 大版本發布,Spark SQL 的優化占比將近 50%。Spark SQL 取代 Spark Core,成為新一代的引擎內核,所有其他子框架如 Mllib、Streaming 和 Graph,都可以共享 Spark SQL 的性能優化,都能從

Spark 社區對於 Spark SQL 的投入中受益。

要優化SparkSQL應用時,一定是要了解SparkSQL執行計划的。發現SQL執行慢的根本原因,才能知道應該在哪兒進行優化,是調整SQL的編寫方式、還是用Hint、還是調參,而不是把優化方案拿來試一遍。

用法

.explain(mode="xxx")

從3.0開始,explain方法有一個新的參數mode,該參數可以指定執行計划展示格式:

  • explain(mode="simple"):只展示物理執行計划。
  • explain(mode="extended"):展示物理執行計划和邏輯執行計划。
  • explain(mode="codegen") :展示要Codegen生成的可執行Java代碼。
  • explain(mode="cost"):展示優化后的邏輯執行計划以及相關的統計。
  • explain(mode="formatted"):以分隔的方式輸出,它會輸出更易讀的物理執行計划,並展示每個節點的詳細信息。

 

執行計划處理流程

SQL Query/ DataFrame ——> Unresolved Logical Plan 未決斷的 邏輯執行計划 (SQL語法的校驗) ——> 通過Catlog分析校驗表名列名等信息  ——>  邏輯執行計划 Logical Plan

——> 邏輯優化(比如謂詞下推等)生產一個優化后的邏輯執行計划 Optimized Logical Plan ——>  物理執行計划  ——> Cost Model(代價選擇)CBO 選擇一個代價小的來執行

——> 最終生產一個可執行的Java代碼  ——> RDDS

 

核心的執行過程一共有5個步驟:

 

這些操作和計划都是Spark SQL自動處理的,會生成以下計划:

  • ① Unresolved邏輯執行計划:== Parsed Logical Plan ==

Parser組件檢查SQL語法上是否有問題,然后生成Unresolved(未決斷)的邏輯計划,不檢查表名、不檢查列名。

  • ② Resolved邏輯執行計划:== Analyzed Logical Plan ==

通過訪問Spark中的Catalog存儲庫來解析驗證語義、列名、類型、表名等。

  • ③ 優化后的邏輯執行計划:== Optimized Logical Plan ==

Catalyst優化器根據各種規則進行優化。

  • ④ 物理執行計划:== Physical Plan ==

    1)HashAggregate運算符表數據聚合,一般HashAggregate是成對出現,第一個HashAggregate將執行節點本地的數據局部聚合,另一個HashAggregate是將各個分區的數據進一步進行聚合計算。

    2)Exchange運算符其實就是shuffle,表示需要在集群上移動數據。很多時候HashAggregate會以Exchange分隔開來。

    3)Project運算符是SQL中的投影操作,就是選擇列(例如:select name, age…)。

    4)BroadcastHashJoin運算符表示通過基於廣播方式進行HashJoin。

    5)LocalTableScan運算符就是全表掃描本地的表。

 

import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object explain { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root") val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL") val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate() spark.sql("use gmall") val sqlstr = """ |select
        | province_id, | sum(order_price * sku_num) as amount |from
        |gmall.order_info_tmp oi |join gmall.order_detail_tmp od on oi.id = od.order_id |where province_id > 10
        |group by province_id """.stripMargin println("=====================================explain()-只展示物理執行計划============================================") spark.sql(sqlstr).explain() println("============================explain(mode = \"formatted\")-展示格式化的物理執行計划=============================") spark.sql(sqlstr).explain(mode = "formatted") println("============================explain(mode = \"extended\")-展示邏輯和物理執行計划==============================") spark.sql(sqlstr).explain(mode = "extended") spark.close() } }

 

=====================================explain()-只展示物理執行計划============================================
== Physical Plan ==
*(3) HashAggregate(keys=[province_id#8], functions=[sum(CheckOverflow((promote_precision(cast(order_price#18 as decimal(22,2))) * promote_precision(cast(cast(sku_num#19L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))]) +- Exchange hashpartitioning(province_id#8, 200), true, [id=#47]
   +- *(2) HashAggregate(keys=[province_id#8], functions=[partial_sum(CheckOverflow((promote_precision(cast(order_price#18 as decimal(22,2))) * promote_precision(cast(cast(sku_num#19L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))]) +- *(2) Project [province_id#8, order_price#18, sku_num#19L]
         +- *(2) BroadcastHashJoin [id#1], [order_id#14], Inner, BuildRight :- *(2) Filter ((isnotnull(province_id#8) AND (cast(province_id#8 as int) > 10)) AND isnotnull(id#1)) : +- Scan hive gmall.order_info_tmp [id#1, province_id#8], HiveTableRelation `gmall`.`order_info_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#1, final_total_amount#2, order_status#3, user_id#4, out_trade_no#5, create_time#6, operate_time#7, province_id#8, benefit_reduce_amount#9, original_total_amount#10, feight_fee#11, dt#12]
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])), [id=#41]
               +- *(1) Filter isnotnull(order_id#14) +- Scan hive gmall.order_detail_tmp [order_id#14, order_price#18, sku_num#19L], HiveTableRelation `gmall`.`order_detail_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, order_id#14, user_id#15, sku_id#16, sku_name#17, order_price#18, sku_num#19L, create_time#20, source_type#21, source_id#22, dt#23]


============================explain(mode = "formatted")-展示格式化的物理執行計划=============================
== Physical Plan ==
* HashAggregate (10) +- Exchange (9) +- * HashAggregate (8) +- * Project (7) +- * BroadcastHashJoin Inner BuildRight (6) :- * Filter (2) : +- Scan hive gmall.order_info_tmp (1) +- BroadcastExchange (5) +- * Filter (4) +- Scan hive gmall.order_detail_tmp (3) (1) Scan hive gmall.order_info_tmp Output [2]: [id#30, province_id#37] Arguments: [id#30, province_id#37], HiveTableRelation `gmall`.`order_info_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#30, final_total_amount#31, order_status#32, user_id#33, out_trade_no#34, create_time#35, operate_time#36, province_id#37, benefit_reduce_amount#38, original_total_amount#39, feight_fee#40, dt#41] (2) Filter [codegen id : 2] Input [2]: [id#30, province_id#37] Condition : ((isnotnull(province_id#37) AND (cast(province_id#37 as int) > 10)) AND isnotnull(id#30)) (3) Scan hive gmall.order_detail_tmp Output [3]: [order_id#43, order_price#47, sku_num#48L] Arguments: [order_id#43, order_price#47, sku_num#48L], HiveTableRelation `gmall`.`order_detail_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#42, order_id#43, user_id#44, sku_id#45, sku_name#46, order_price#47, sku_num#48L, create_time#49, source_type#50, source_id#51, dt#52] (4) Filter [codegen id : 1] Input [3]: [order_id#43, order_price#47, sku_num#48L] Condition : isnotnull(order_id#43) (5) BroadcastExchange Input [3]: [order_id#43, order_price#47, sku_num#48L] Arguments: HashedRelationBroadcastMode(List(input[0, string, false])), [id=#89] (6) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [id#30]
Right keys [1]: [order_id#43]
Join condition: None (7) Project [codegen id : 2] Output [3]: [province_id#37, order_price#47, sku_num#48L] Input [5]: [id#30, province_id#37, order_id#43, order_price#47, sku_num#48L] (8) HashAggregate [codegen id : 2] Input [3]: [province_id#37, order_price#47, sku_num#48L] Keys [1]: [province_id#37] Functions [1]: [partial_sum(CheckOverflow((promote_precision(cast(order_price#47 as decimal(22,2))) * promote_precision(cast(cast(sku_num#48L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))] Aggregate Attributes [1]: [sum#56] Results [2]: [province_id#37, sum#57] (9) Exchange Input [2]: [province_id#37, sum#57] Arguments: hashpartitioning(province_id#37, 200), true, [id=#95] (10) HashAggregate [codegen id : 3] Input [2]: [province_id#37, sum#57] Keys [1]: [province_id#37] Functions [1]: [sum(CheckOverflow((promote_precision(cast(order_price#47 as decimal(22,2))) * promote_precision(cast(cast(sku_num#48L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))] Aggregate Attributes [1]: [sum(CheckOverflow((promote_precision(cast(order_price#47 as decimal(22,2))) * promote_precision(cast(cast(sku_num#48L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))#53] Results [2]: [province_id#37, sum(CheckOverflow((promote_precision(cast(order_price#47 as decimal(22,2))) * promote_precision(cast(cast(sku_num#48L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))#53 AS amount#29]


============================explain(mode = "extended")-展示邏輯和物理執行計划==============================
###單個Plan 從下往上看
== Parsed Logical Plan
== 未絕斷的邏輯執行計划 'Aggregate ['province_id], ['province_id, 'sum(('order_price * 'sku_num)) AS amount#58] +- 'Filter ('province_id > 10) +- 'Join Inner, ('oi.id = 'od.order_id) :- 'SubqueryAlias oi : +- 'UnresolvedRelation [gmall, order_info_tmp] +- 'SubqueryAlias od +- 'UnresolvedRelation [gmall, order_detail_tmp] == Analyzed Logical Plan == 經過catlog分析之后的邏輯執行計划 province_id: string, amount: decimal(38,2) Aggregate [province_id#66], [province_id#66, sum(CheckOverflow((promote_precision(cast(order_price#76 as decimal(22,2))) * promote_precision(cast(cast(sku_num#77L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true)) AS amount#58] +- Filter (cast(province_id#66 as int) > 10) +- Join Inner, (id#59 = order_id#72) :- SubqueryAlias oi : +- SubqueryAlias spark_catalog.gmall.order_info_tmp : +- HiveTableRelation `gmall`.`order_info_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#59, final_total_amount#60, order_status#61, user_id#62, out_trade_no#63, create_time#64, operate_time#65, province_id#66, benefit_reduce_amount#67, original_total_amount#68, feight_fee#69, dt#70] +- SubqueryAlias od +- SubqueryAlias spark_catalog.gmall.order_detail_tmp +- HiveTableRelation `gmall`.`order_detail_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#71, order_id#72, user_id#73, sku_id#74, sku_name#75, order_price#76, sku_num#77L, create_time#78, source_type#79, source_id#80, dt#81] == Optimized Logical Plan == 優化后的邏輯執行計划 Aggregate [province_id#66], [province_id#66, sum(CheckOverflow((promote_precision(cast(order_price#76 as decimal(22,2))) * promote_precision(cast(cast(sku_num#77L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true)) AS amount#58] +- Project [province_id#66, order_price#76, sku_num#77L] +- Join Inner, (id#59 = order_id#72) :- Project [id#59, province_id#66] : +- Filter ((isnotnull(province_id#66) AND (cast(province_id#66 as int) > 10)) AND isnotnull(id#59)) : +- HiveTableRelation `gmall`.`order_info_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#59, final_total_amount#60, order_status#61, user_id#62, out_trade_no#63, create_time#64, operate_time#65, province_id#66, benefit_reduce_amount#67, original_total_amount#68, feight_fee#69, dt#70] +- Project [order_id#72, order_price#76, sku_num#77L] +- Filter isnotnull(order_id#72) +- HiveTableRelation `gmall`.`order_detail_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#71, order_id#72, user_id#73, sku_id#74, sku_name#75, order_price#76, sku_num#77L, create_time#78, source_type#79, source_id#80, dt#81] == Physical Plan == 物理執行計划 兩個HashAggregate之間會有Exchange - 即shuffle,第一個先預聚合 第二個再聚合; *(3) HashAggregate(keys=[province_id#66], functions=[sum(CheckOverflow((promote_precision(cast(order_price#76 as decimal(22,2))) * promote_precision(cast(cast(sku_num#77L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))], output=[province_id#66, amount#58]) +- Exchange hashpartitioning(province_id#66, 200), true, [id=#143] +- *(2) HashAggregate(keys=[province_id#66], functions=[partial_sum(CheckOverflow((promote_precision(cast(order_price#76 as decimal(22,2))) * promote_precision(cast(cast(sku_num#77L as decimal(20,0)) as decimal(22,2)))), DecimalType(37,2), true))], output=[province_id#66, sum#86]) +- *(2) Project [province_id#66, order_price#76, sku_num#77L] +- *(2) BroadcastHashJoin [id#59], [order_id#72], Inner, BuildRight :- *(2) Filter ((isnotnull(province_id#66) AND (cast(province_id#66 as int) > 10)) AND isnotnull(id#59)) : +- Scan hive gmall.order_info_tmp [id#59, province_id#66], HiveTableRelation `gmall`.`order_info_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#59, final_total_amount#60, order_status#61, user_id#62, out_trade_no#63, create_time#64, operate_time#65, province_id#66, benefit_reduce_amount#67, original_total_amount#68, feight_fee#69, dt#70] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])), [id=#137] +- *(1) Filter isnotnull(order_id#72) +- Scan hive gmall.order_detail_tmp [order_id#72, order_price#76, sku_num#77L], HiveTableRelation `gmall`.`order_detail_tmp`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#71, order_id#72, user_id#73, sku_id#74, sku_name#75, order_price#76, sku_num#77L, create_time#78, source_type#79, source_id#80, dt#81] Process finished with exit code 0

也可以查看spark web ui

sql 計划圖

2. 資源調優

① 資源規划

資源設定

1、總體原則

以單台服務器128G內存,32線程為例。

先設定單個Executor核數,根據Yarn配置得出每個節點最多的Executor數量,每個節點的yarn內存/每個節點數量=單個節點的數量

總的executor數=單節點數量*節點數。

2、具體提交參數

1)executor-cores

每個executor的最大核數。根據經驗實踐,設定在3~6之間比較合理。

2)num-executors

該參數值=每個節點的executor數 * work節點數

每個node的executor數 = 單節點yarn總核數 / 每個executor的最大cpu核數

考慮到系統基礎服務和HDFS等組件的余量,yarn.nodemanager.resource.cpu-vcores配置為:28,參數executor-cores的值為:4,那么每個node的executor數 = 28/4 = 7,假設集群節點為10,那么num-executors = 7 * 10 = 70

3)executor-memory

該參數值=yarn-nodemanager.resource.memory-mb / 每個節點的executor數量

如果yarn的參數配置為100G,那么每個Executor大概就是100G/7≈14G,同時要注意yarn配置中每個容器允許的最大內存是否匹配。

內存估算

 

  • 估算Other內存 = 自定義數據結構*每個Executor核數
  • 估算Storage內存 = 廣播變量 + cache/Executor數量
  • 估算Executor內存 = 每個Executor核數 * (數據集大小/並行度)

調整內存配置項

一般情況下,各個區域的內存比例保持默認值即可。如果需要更加精確的控制內存分配,可以按照如下思路:

spark.memory.fraction=(估算storage內存+估算Execution內存)/(估算storage內存+估算Execution內存+估算Other內存)得到

spark.memory.storageFraction =(估算storage內存)/(估算storage內存+估算Execution內存)

代入公式計算:

  • Storage堆內內存=(spark.executor.memory–300MB)*spark.memory.fraction*spark.memory.storageFraction
  • Execution堆內內存 = (spark.executor.memory–300MB)*spark.memory.fraction*(1-spark.memory.storageFraction)

② 持久化和序列化

RDD

1、cache

 

 

打成jar,提交yarn任務,並在yarn界面查看spark ui

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.cache.RddCacheDemo spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

通過spark ui看到,rdd使用默認cache緩存級別,占用內存2.5GB,並且storage內存還不夠,只緩存了29%。

2、kryo+序列化緩存

使用kryo序列化並且使用rdd序列化緩存級別。使用kryo序列化需要修改spark的序列化模式,並且需要進程注冊類操作。

打成jar包在yarn上運行。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.cache.RddCacheKryoDemo spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

查看storage所占內存,內存占用減少了1083.6mb並且緩存了100%。使用序列化緩存配合kryo序列化,可以優化存儲內存占用。

 

 

 

根據官網的描述,那么可以推斷出,如果yarn內存資源充足情況下,使用默認級別MEMORY_ONLY是對CPU的支持最好的。但是序列化緩存可以讓體積更小,那么當yarn內存資源不充足情況下可以考慮使用

MEMORY_ONLY_SER配合kryo使用序列化緩存。

DataFrame、DataSet

1、cache

提交任務,在yarn上查看spark ui,查看storage內存占用。內存使用612.3mb。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.cache.DatasetCacheDemo spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

 

DataSet的cache默認緩存級別與RDD不一樣,是MEMORY_AND_DISK。

源碼:Dataset.cache() -> Dataset.persist() -> CacheManager.cacheQuery()

 

 

 

2、序列化緩存

DataSet類似RDD,但是並不使用JAVA序列化也不使用Kryo序列化,而是使用一種特有的編碼器進行序列化對象。

 

打成jar包,提交yarn。查看spark ui,storage占用內存646.2mb。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.cache.DatasetCacheSerDemo spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

 

和默認cache緩存級別差別不大。所以DataSet可以直接使用cache。

從性能上來講,DataSet,DataFrame大於RDD,建議開發中使用DataSet、DataFrame。

③ CPU優化

CPU低效原因

概念

1)並行度

  • spark.default.parallelism

設置RDD 的默認並行度,沒有設置時,由join、reduceByKey和parallelize等轉換決定。

  • spark.sql.shuffle.partitions

適用SparkSQL時,Shuffle Reduce 階段默認的並行度,默認200。此參數只能控制Spark sql、DataFrame、DataSet分區個數。不能控制RDD分區個數

 

 

2)並發度

  同時執行的task數

 

CPU低效原因

  • 1)並行度較低、數據分片較大容易導致 CPU 線程掛起
  • 2)並行度過高、數據過於分散會讓調度開銷更多

Executor 接收到 TaskDescription 之后,首先需要對 TaskDescription 反序列化才能讀取任務信息,然后將任務代碼再反序列化得到可執行代碼,最后再結合其他任務信息創建 TaskRunner。當數據過於分散,分

布式任務數量會大幅增加,但每個任務需要處理的數據量卻少之又少,就 CPU 消耗來說,相比花在數據處理上的比例,任務調度上的開銷幾乎與之分庭抗禮。顯然,在這種情況下,CPU 的有效利用率也是極低

的。

合理利用CPU資源

每個並行度的數據量(總數據量/並行度) 在(Executor內存/core數/2, Executor內存/core數)區間

提交執行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 6g  --class com.atguigu.sparktuning.partition.PartitionDemo spark-tuning-1.0-

SNAPSHOT-jar-with-dependencies.jar

去向yarn申請的executor vcore資源個數為12個(num-executors*executor-cores),如果不修改spark sql分區個數,那么就會像上圖所展示存在cpu空轉的情況。這個時候需要合理控制shuffle分區個數。如果想要

讓任務運行的最快當然是一個task對應一個vcore,但是一般不會這樣設置,為了合理利用資源,一般會將並行度(task數)設置成並發度(vcore數)的2倍到3倍。

修改參數spark.sql.shuffle.partitions(默認200), 根據我們當前任務的提交參數有12個vcore,將此參數設置為24或36為最優效果:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 6g  --class com.atguigu.sparktuning.partition.PartitionTuning spark-tuning-1.0-

SNAPSHOT-jar-with-dependencies.jar

3. SparkSQL語法優化

SparkSQL在整個執行計划處理的過程中,使用了Catalyst 優化器。

① 基於RBO的優化

在Spark 3.0 版本中,Catalyst 總共有 81 條優化規則(Rules),分成 27 組(Batches),其中有些規則會被歸類到多個分組里。因此,如果不考慮規則的重復性,27 組算下來總共會有 129 個優化規則。

如果從優化效果的角度出發,這些規則可以歸納到以下 3 個范疇:

謂詞下推(Predicate Pushdown)

將過濾條件的謂詞邏輯都盡可能提前執行,減少下游處理的數據量。對應PushDownPredicte 優化規則,對於 Parquet、ORC 這類存儲格式,結合文件注腳(Footer)中的統計信息,下推的謂詞能夠大幅減少數

據掃描量,降低磁盤 I/O 開銷。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 6g  --class com.atguigu.sparktuning.PredicateTuning spark-tuning-1.0-

SNAPSHOT-jar-with-dependencies.jar

左外關聯下推規則:左表 left join 右表

 

左表

右表

Join中條件(on

只下推右表

只下推右表

Join后條件(where

兩表都下推

兩表都下推

注意:外關聯時,過濾條件寫在on與where,結果是不一樣的!

列剪裁(Column Pruning)

列剪裁就是掃描數據源的時候,只讀取那些與查詢相關的字段。

常量替換(Constant Folding)

假設我們在年齡上加的過濾條件是 “age < 12 + 18”,Catalyst 會使用 ConstantFolding 規則,自動幫我們把條件變成 “age < 30”。再比如,我們在 select 語句中,摻雜了一些常量表達式,Catalyst 也會自動地用表達式的結果進行替換。

② 基於CBO的優化

CBO優化主要在物理計划層面,原理是計算所有可能的物理計划的代價,並挑選出代價最小的物理執行計划。充分考慮了數據本身的特點(如大小、分布)以及操作算子的特點(中間結果集的分布及大小)及代

價,從而更好的選擇執行代價最小的物理執行計划。

而每個執行節點的代價,分為兩個部分:

  • 1)該執行節點對數據集的影響,即該節點輸出數據集的大小與分布
  • 2)該執行節點操作算子的代價

每個操作算子的代價相對固定,可用規則來描述。而執行節點輸出數據集的大小與分布,分為兩個部分:

  • 1)初始數據集,也即原始表,其數據集的大小與分布可直接通過統計得到;
  • 2)中間節點輸出數據集的大小與分布可由其輸入數據集的信息與操作本身的特點推算。

Statistics 收集

需要先執行特定的SQL語句來收集所需的表和列的統計信息。

  • 生成表級別統計信息(掃表):

ANALYZE TABLE 表名 COMPUTE STATISTICS

生成sizeInBytes和rowCount。

使用ANALYZE語句收集統計信息時,無法計算非HDFS數據源的表的文件大小。

 

  • 生成表級別統計信息(不掃表):

ANALYZE TABLE src COMPUTE STATISTICS NOSCAN

只生成sizeInBytes,如果原來已經生成過sizeInBytes和rowCount,而本次生成的sizeInBytes和原來的大小一樣,則保留rowCount(若存在),否則清除rowCount。

  • 生成列級別統計信息

ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS 列1,列2,列3

生成列統計信息,為保證一致性,會同步更新表統計信息。目前不支持復雜數據類型(如Seq, Map等)和HiveStringType的統計信息生成。

 

  • 顯示統計信息

DESC FORMATTED 表名

在Statistics中會顯示“xxx bytes, xxx rows”分別表示表級別的統計信息。

 

也可以通過如下命令顯示列統計信息:

DESC FORMATTED 表名 列名

執行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 6g  --class com.atguigu.sparktuning.cbo.StaticsCollect spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

使用CBO

通過 "spark.sql.cbo.enabled" 來開啟,默認是false。配置開啟CBO后,CBO優化器可以基於表和列的統計信息,進行一系列的估算,最終選擇出最優的查詢計划。比如:Build側選擇、優化 Join 類型、優化多表 Join 順序等。

參數

描述

默認值

spark.sql.cbo.enabled

CBO總開關。

true表示打開,false表示關閉。

要使用該功能,需確保相關表和列的統計信息已經生成。

false

spark.sql.cbo.joinReorder.enabled

使用CBO來自動調整連續的inner join的順序。

true:表示打開,false:表示關閉

要使用該功能,需確保相關表和列的統計信息已經生成,且CBO總開關打開。

false

spark.sql.cbo.joinReorder.dp.threshold

使用CBO來自動調整連續inner join的表的個數閾值。

如果超出該閾值,則不會調整join順序。

12

 

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 4g  --class com.atguigu.sparktuning.cbo.CBOTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

③ 廣播Join

Spark join策略中,如果當一張小表足夠小並且可以先緩存到內存中,那么可以使用Broadcast Hash Join,其原理就是先將小表聚合到driver端,再廣播到各個大表分區中,那么再次進行join的時候,就相當於大表

的各自分區的數據與小表進行本地join,從而規避了shuffle。

1)通過參數指定自動廣播

廣播join默認值為10MB,由spark.sql.autoBroadcastJoinThreshold參數控制。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 4g  --class com.atguigu.sparktuning.join.AutoBroadcastJoinTuning spark-tuning-

1.0-SNAPSHOT-jar-with-dependencies.jar

2)強行廣播

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 4g  --class com.atguigu.sparktuning.join.ForceBroadcastJoinTuning spark-tuning-

1.0-SNAPSHOT-jar-with-dependencies.jar

④ SMB Join

SMB JOIN是sort merge bucket操作,需要進行分桶,首先會進行排序,然后根據key值合並,把相同key的數據放到同一個bucket中(按照key進行hash)。分桶的目的其實就是把大表化成小表。相同key的數據都在同一個桶中之后,再進行join操作,那么在聯合的時候就會大幅度的減小無關項的掃描。

使用條件:

(1)兩表進行分桶,桶的個數必須相等

(2)兩邊進行join時,join列=排序列=分桶列

不使用SMB Join:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.join.BigJoinDemo spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

使用SMB Join:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

4. 數據傾斜

① 數據傾斜現象

1、現象

絕大多數task任務運行速度很快,但是就是有那么幾個task任務運行極其緩慢,慢慢的可能就接着報內存溢出的問題。

 

2 原因

  數據傾斜一般是發生在shuffle類的算子,比如distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup等,涉及到數據重分區,如果其中某一個key數量特別大,就發生了數據傾斜。

② 數據傾斜大key定位

從所有key中,把其中每一個key隨機取出來一部分,然后進行一個百分比的推算,這是用局部取推算整體,雖然有點不准確,但是在整體概率上來說,我們只需要大概就可以定位那個最多的key了

執行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.join.SampleKeyDemo spark-tuning-1.0-

SNAPSHOT-jar-with-dependencies.jar

③ 單表數據傾斜優化

為了減少shuffle數據量以及reduce端的壓力,通常Spark SQL在map端會做一個partial aggregate(通常叫做預聚合或者偏聚合),即在shuffle前將同一分區內所屬同key的記錄先進行一個預結算,再將結果進行

shuffle,發送到reduce端做一個匯總,類似MR的提前Combiner,所以執行計划中 HashAggregate通常成對出現。

1、適用場景

聚合類的shuffle操作,部分key數據量較大,且大key的數據分布在很多不同的切片。

2、解決邏輯

兩階段聚合(加鹽局部聚合+去鹽全局聚合)

3、案例

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.skew.SkewAggregationTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

④ Join數據傾斜優化

廣播Join

1、適用場景

適用於小表join大表。小表足夠小,可被加載進Driver並通過Broadcast方法廣播到各個Executor中。

2、解決邏輯

在小表join大表時如果產生數據傾斜,那么廣播join可以直接規避掉此shuffle階段。直接優化掉stage。並且廣播join也是Spark Sql中最常用的優化方案。

3、案例演示

2.2.2中的PartitionTuning案例關閉了廣播join,可以看到數據傾斜

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.skew.SkewMapJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

拆分大key 打散大表 擴容小表

1、適用場景

適用於join時出現數據傾斜。

2、解決邏輯

1)將存在傾斜的表,根據抽樣結果,拆分為傾斜key(skew表)和沒有傾斜key(common)的兩個數據集。

2)將skew表的key全部加上隨機前綴,然后對另外一個不存在嚴重數據傾斜的數據集(old表)整體與隨機前綴集作笛卡爾乘積(即將數據量擴大N倍,得到new表)。

3)打散的skew表   join  擴容的new表

       union

       Common表   join  old表

以下為打散大key和擴容小表的實現思路

1)打散大表:實際就是數據一進一出進行處理,對大key前拼上隨機前綴實現打散

2)擴容小表:實際就是將DataFrame中每一條數據,轉成一個集合,並往這個集合里循環添加10條數據,最后使用flatmap壓平此集合,達到擴容的效果.

3、案例演示

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.skew.SkewJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

參設開啟AQE

詳見6.3

5. Job優化

SortShuffle流程 

 

 

① Map端優化

Map端聚合

map-side預聚合,就是在每個節點本地對相同的key進行一次聚合操作,類似於MapReduce中的本地combiner。map-side預聚合之后,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的數據數量,從而也就減少了磁盤IO以及網絡傳輸開銷。

RDD的話建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每個節點本地的相同key進行預聚合。而groupByKey算子是不會進行預聚合的,全量的數據會在集群的各個節點之間分發和傳輸,性能相對來說比較差。

SparkSQL本身的HashAggregte就會實現本地預聚合+全局聚合。

讀取小文件優化

讀取的數據源有很多小文件,會造成查詢性能的損耗,大量的數據分片信息以及對應產生的Task元信息也會給Spark Driver的內存造成壓力,帶來單點問題。

設置參數:

spark.sql.files.maxPartitionBytes=128MB   默認128m

spark.files.openCostInBytes=4194304     默認4m

參數(單位都是bytes):

  • maxPartitionBytes:一個分區最大字節數。
  • openCostInBytes:打開一個文件的開銷。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.map.MapSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源碼理解: DataSourceScanExec.createNonBucketedReadRDD()

 

FilePartition. getFilePartitions()

 

1)切片大小= Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

計算totalBytes的時候,每個文件都要加上一個open開銷

defaultParallelism就是RDD的並行度

2)當(文件1大小+ openCostInBytes)+(文件2大小+ openCostInBytes)+…+(文件n-1大小+ openCostInBytes)+ 文件n <= maxPartitionBytes時,n個文件可以讀入同一個分區,即滿足: N個小文件總大小 + (N-1)*openCostInBytes <=  maxPartitionBytes的話。

增大map溢寫時輸出流buffer

1)map端Shuffle Write有一個緩沖區,初始閾值5m,超過會嘗試增加到2*當前使用內存。如果申請不到內存,則進行溢寫。這個參數是internal,指定無效(見下方源碼)。也就是說資源足夠會自動擴容,所以不需要我們去設置。

2)溢寫時使用輸出流緩沖區默認32k,這些緩沖區減少了磁盤搜索和系統調用次數,適當提高可以提升溢寫效率。

3)Shuffle文件涉及到序列化,是采取批的方式讀寫,默認按照每批次1萬條去讀寫。設置得太低會導致在序列化時過度復制,因為一些序列化器通過增長和復制的方式來翻倍內部數據結構。這個參數是internal,指定無效(見下方源碼)。

綜合以上分析,我們可以調整的就是輸出緩沖區的大小。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.map.MapFileBufferTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

源碼理解:

 

 

 

 

② Reduce端優化

合理設置Reduce數

過多的cpu資源出現空轉浪費,過少影響任務性能。關於並行度、並發度的相關參數介紹,在2.2.1中已經介紹過。

輸出產生小文件優化

1Join后的結果插入新表

join結果插入新表,生成的文件數等於shuffle並行度,默認就是200份文件插入到hdfs上。

解決方式:

1)可以在插入表數據前進行縮小分區操作來解決小文件過多問題,如coalesce、repartition算子。

2)調整shuffle並行度。根據2.2.2的原則來設置。

2、動態分區插入數據

1)沒有Shuffle的情況下。最差的情況下,每個Task中都有表各個分區的記錄,那文件數最終文件數將達到  Task數量 * 表分區數。這種情況下是極易產生小文件的。

INSERT overwrite table A partition ( aa ) 

SELECT * FROM B;

2)有Shuffle的情況下,上面的Task數量 就變成了spark.sql.shuffle.partitions(默認值200)。那么最差情況就會有 spark.sql.shuffle.partitions * 表分區數。

當spark.sql.shuffle.partitions設置過大時,小文件問題就產生了;當spark.sql.shuffle.partitions設置過小時,任務的並行度就下降了,性能隨之受到影響。

最理想的情況是根據分區字段進行shuffle,在上面的sql中加上distribute by aa。把同一分區的記錄都哈希到同一個分區中去,由一個Spark的Task進行寫入,這樣的話只會產生N個文件, 但是這種情況下也容易出現數據傾斜的問題。

        解決思路:

結合第4章解決傾斜的思路,在確定哪個分區鍵傾斜的情況下,將傾斜的分區鍵單獨拎出來:

將入庫的SQL拆成(where 分區 != 傾斜分區鍵 )和 (where 分區 = 傾斜分區鍵) 幾個部分,非傾斜分區鍵的部分正常distribute by 分區字段,傾斜分區鍵的部分 distribute by隨機數,sql如下:

//1.非傾斜鍵部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa != 大key

distribute by aa;

 

//2.傾斜鍵部分

INSERT overwrite table A partition ( aa ) 

SELECT *

FROM B where aa = 大key

distribute by cast(rand() * 5 as int);

案例實操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.DynamicPartitionSmallFileTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

增大reduce緩沖區,減少拉取次數

Spark Shuffle過程中,shuffle reduce task的buffer緩沖區大小決定了reduce task每次能夠緩沖的數據量,也就是每次能夠拉取的數據量,如果內存資源較為充足,適當增加拉取數據緩沖區的大小,可以減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。

reduce端數據拉取緩沖區的大小可以通過spark.reducer.maxSizeInFlight參數進行設置,默認為48MB。

源碼:BlockStoreShuffleReader.read()

 

調節reduce端拉取數據重試次數

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試。對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

reduce端拉取數據重試次數可以通過spark.shuffle.io.maxRetries參數進行設置,該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗,默認為3:

調節reduce端拉取數據等待間隔

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試,在一次失敗后,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。

reduce端拉取數據等待間隔可以通過spark.shuffle.io.retryWait參數進行設置,默認值為5s。

綜合5.2.3、5.2.4、5.2.5,案例實操:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.ReduceShuffleTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

合理利用bypass

當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(默認是200)且不需要map端進行合並操作,則shuffle write過程中不會進行排序操作,使用BypassMergeSortShuffleWriter去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合並成一個文件,並會創建單獨的索引文件。

當你使用SortShuffleManager時,如果確實不需要排序操作,那么建議將這個參數調大一些,大於shuffle read task的數量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

源碼分析:SortShuffleManager.registerShuffle()

 

 

 

SortShuffleManager.getWriter()

 

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.reduce.BypassTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

③整體優化

調節數據本地化等待時長

在 Spark 項目開發階段,可以使用 client 模式對程序進行測試,此時,可以在本地看到比較全的日志信息,日志信息中有明確的 Task 數據本地化的級別,如果大部分都是 PROCESS_LOCAL、NODE_LOCAL,那么就無需進行調節,但是如果發現很多的級別都是 RACK_LOCAL、ANY,那么需要對本地化的等待時長進行調節,應該是反復調節,每次調節完以后,再來運行觀察日志,看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的運行時間有沒有縮短。

注意過猶不及,不要將本地化等待時長延長地過長,導致因為大量的等待時長,使得 Spark 作業的運行時間反而增加了。

下面幾個參數,默認都是3s,可以改成如下:

spark.locality.wait            //建議6s、10s

spark.locality.wait.process    //建議60s

spark.locality.wait.node       //建議30s

spark.locality.wait.rack       //建議20s

 

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g  --class com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

使用堆外內存

1、堆外內存參數

講到堆外內存,就必須去提一個東西,那就是去yarn申請資源的單位,容器。Spark  on yarn模式,一個容器到底申請多少內存資源。

一個容器最多可以申請多大資源,是由yarn參數yarn.scheduler.maximum-allocation-mb決定, 需要滿足:

spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size

≤ yarn.scheduler.maximum-allocation-mb

參數解釋:

  • spark.executor.memory:提交任務時指定的堆內內存。
  • spark.executor.memoryOverhead:堆外內存參數,內存額外開銷。

默認開啟,默認值為spark.executor.memory*0.1並且會與最小值384mb做對比,取最大值。所以spark on yarn任務堆內內存申請1個g,而實際去yarn申請的內存大於1個g的原因。

  • spark.memory.offHeap.size:堆外內存參數,spark中默認關閉,需要將spark.memory.enable.offheap.enable參數設置為true。

注意:很多網上資料說spark.executor.memoryOverhead包含spark.memory.offHeap.size,這是由版本區別的,僅限於spark3.0之前的版本。3.0之后就發生改變,實際去yarn申請的內存資源由三個參數相加。

 

 

測試申請容器上限:

yarn.scheduler.maximum-allocation-mb修改為7G,將三個參數設為如下,大於7G,會報錯:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

將spark.memory.offHeap.size修改為1g后再次提交:

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

2、使用堆外緩存

使用堆外內存可以減輕垃圾回收的工作,也加快了復制的速度。

當需要緩存非常大的數據量時,虛擬機將承受非常大的GC壓力,因為虛擬機必須檢查每個對象是否可以收集並必須訪問所有內存頁。本地緩存是最快的,但會給虛擬機帶來GC壓力,所以,當你需要處理非常多GB的數據量時可以考慮使用堆外內存來進行優化,因為這不會給Java垃圾收集器帶來任何壓力。讓JAVA GC為應用程序完成工作,緩存操作交給堆外。

spark-submit --master yarn --deploy-mode client --driver-memory 1g  --num-executors 3 --executor-cores 4 --conf  spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g  --executor-memory 5g  --class com.atguigu.sparktuning.job.OFFHeapCache spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

調節連接等待時長

在Spark作業運行過程中,Executor優先從自己本地關聯的BlockManager中獲取某份數據,如果本地BlockManager沒有的話,會通過TransferService遠程連接其他節點上Executor的BlockManager來獲取數據。

如果task在運行過程中創建大量對象或者創建的對象較大,會占用大量的內存,這回導致頻繁的垃圾回收,但是垃圾回收會導致工作現場全部停止,也就是說,垃圾回收一旦執行,Spark的Executor進程就會停止工作,無法提供相應,此時,由於沒有響應,無法建立網絡連接,會導致網絡連接超時。

在生產環境下,有時會遇到file not found、file lost這類錯誤,在這種情況下,很有可能是Executor的BlockManager在拉取數據的時候,無法建立連接,然后超過默認的連接等待時長120s后,宣告數據拉取失敗,如果反復嘗試都拉取不到數據,可能會導致Spark作業的崩潰。這種情況也可能會導致DAGScheduler反復提交幾次stage,TaskScheduler反復提交幾次task,大大延長了我們的Spark作業的運行時間。

為了避免長時間暫停(如GC)導致的超時,可以考慮調節連接的超時時長,連接等待時長需要在spark-submit腳本中進行設置,設置方式可以在提交時指定:

--conf spark.core.connection.ack.wait.timeout=300s

調節連接等待時長后,通常可以避免部分的XX文件拉取失敗、XX文件lost等報錯。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 1g --conf spark.core.connection.ack.wait.timeout=300s --class com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

6. Spark3.0 AQE

Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution),即自適應查詢執行。AQE 是 Spark SQL 的一種動態優化機制,在運行時,每當 Shuffle Map 階段執行完畢,AQE 都會結合這個階段的統計信息,基於既定的規則動態地調整、修正尚未執行的邏輯計划和物理計划,來完成對原始查詢語句的運行時優化。

① 動態合並分區

在Spark中運行查詢處理非常大的數據時,shuffle通常會對查詢性能產生非常重要的影響。shuffle是非常昂貴的操作,因為它需要進行網絡傳輸移動數據,以便下游進行計算。

最好的分區取決於數據,但是每個查詢的階段之間的數據大小可能相差很大,這使得該數字難以調整:

(1)如果分區太少,則每個分區的數據量可能會很大,處理這些數據量非常大的分區,可能需要將數據溢寫到磁盤(例如,排序和聚合),降低了查詢。

(2)如果分區太多,則每個分區的數據量大小可能很小,讀取大量小的網絡數據塊,這也會導致I/O效率低而降低了查詢速度。擁有大量的task(一個分區一個task)也會給Spark任務計划程序帶來更多負擔。

 為了解決這個問題,我們可以在任務開始時先設置較多的shuffle分區個數,然后在運行時通過查看shuffle文件統計信息將相鄰的小分區合並成更大的分區。

例如,假設正在運行select max(i) from tbl group by j。輸入tbl很小,在分組前只有2個分區。那么任務剛初始化時,我們將分區數設置為5,如果沒有AQE,Spark將啟動五個任務來進行最終聚合,但是其中會有三個非常小的分區,為每個分區啟動單獨的任務這樣就很浪費。

 

取而代之的是,AQE將這三個小分區合並為一個,因此最終聚只需三個task而不是五個

 

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AQEPartitionTunning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

結合動態申請資源:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.DynamicAllocationTunning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

② 動態切換Join策略

Spark支持多種join策略,其中如果join的一張表可以很好的插入內存,那么broadcast shah join通常性能最高。因此,spark join中,如果小表小於廣播大小閥值(默認10mb),Spark將計划進行broadcast hash join。但是,很多事情都會使這種大小估計出錯(例如,存在選擇性很高的過濾器),或者join關系是一系列的運算符而不是簡單的掃描表操作。

為了解決此問題,AQE現在根據最准確的join大小運行時重新計划join策略。從下圖實例中可以看出,發現連接的右側表比左側表小的多,並且足夠小可以進行廣播,那么AQE會重新優化,將sort merge join轉換成為broadcast hash join。

 

 對於運行是的broadcast hash join,可以將shuffle優化成本地shuffle,優化掉stage 減少網絡傳輸。Broadcast hash join可以規避shuffle階段,相當於本地join。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeDynamicSwitchJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

③ 動態優化Join傾斜

當數據在群集中的分區之間分布不均勻時,就會發生數據傾斜。嚴重的傾斜會大大降低查詢性能,尤其對於join。AQE skew join優化會從隨機shuffle文件統計信息自動檢測到這種傾斜。然后它將傾斜分區拆分成較小的子分區。

 例如,下圖 A join B,A表中分區A0明細大於其他分區

 

因此,skew join 會將A0分區拆分成兩個子分區,並且對應連接B0分區

 

 沒有這種優化,會導致其中一個分區特別耗時拖慢整個stage,有了這個優化之后每個task耗時都會大致相同,從而總體上獲得更好的性能。

可以采取第4章提到的解決方式,3.0有了AQE機制就可以交給Spark自行解決。Spark3.0增加了以下參數。

1)spark.sql.adaptive.skewJoin.enabled  :是否開啟傾斜join檢測,如果開啟了,那么會將傾斜的分區數據拆成多個分區,默認是開啟的,但是得打開aqe。

2)spark.sql.adaptive.skewJoin.skewedPartitionFactor :默認值5,此參數用來判斷分區數據量是否數據傾斜,當任務中最大數據量分區對應的數據量大於的分區中位數乘以此參數,並且也大於spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes參數,那么此任務是數據傾斜。

3)spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes :默認值256mb,用於判斷是否數據傾斜

4)spark.sql.adaptive.advisoryPartitionSizeInBytes :此參數用來告訴spark進行拆分后推薦分區大小是多少。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

如果同時開啟了spark.sql.adaptive.coalescePartitions.enabled動態合並分區功能,那么會先合並分區,再去判斷傾斜,將動態合並分區打開后,重新執行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

修改中位數的倍數為2,重新執行:

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

7. Spark3.0 DPP

Spark3.0支持動態分區裁剪Dynamic Partition Pruning,簡稱DPP,核心思路就是先將join一側作為子查詢計算出來,再將其所有分區用到join另一側作為表過濾條件,從而實現對分區的動態修剪。如下圖所示

 

 將select t1.id,t2.pkey from t1 join t2 on t1.pkey =t2.pkey and t2.id<2 優化成了select t1.id,t2.pkey from t1 join t2 on t1.pkey=t2.pkey and t1.pkey in(select t2.pkey from t2 where t2.id<2)

觸發條件:

(1)待裁剪的表join的時候,join條件里必須有分區字段

(2)如果是需要修剪左表,那么join必須是inner join ,left semi join或right join,反之亦然。但如果是left out join,無論右邊有沒有這個分區,左邊的值都存在,就不需要被裁剪

(3)另一張表需要存在至少一個過濾條件,比如a join b on a.key=b.key and a.id<2

參數spark.sql.optimizer.dynamicPartitionPruning.enabled 默認開啟。

spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g  --class com.atguigu.sparktuning.dpp.DPPTest spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

8. Spark3.0 Hint增強

在spark2.4的時候就有了hint功能,不過只有broadcasthash join的hint,這次3.0又增加了sort merge join,shuffle_hash join,shuffle_replicate nested loop join。

Spark的5種Join策略:https://www.cnblogs.com/jmx-bigdata/p/14021183.html

① broadcasthast join

sparkSession.sql("select /*+ BROADCAST(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ BROADCASTJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MAPJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

② sort merge join

sparkSession.sql("select /*+ SHUFFLE_MERGE(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MERGEJOIN(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

sparkSession.sql("select /*+ MERGE(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

③ shuffle_hash join

sparkSession.sql("select /*+ SHUFFLE_HASH(school) */ *  from test_student student left join test_school school on student.id=school.id").show()

④ shuffle_replicate_nl join

使用條件非常苛刻,驅動表(school表)必須小,且很容易被spark執行成sort merge join。

sparkSession.sql("select /*+ SHUFFLE_REPLICATE_NL(school) */ *  from test_student student inner join test_school school on student.id=school.id").show()

9. 故障排除

① 控制reduce端緩沖大小以避免OOM

在Shuffle過程,reduce端task並不是等到map端task將其數據全部寫入磁盤后再去拉取,而是map端寫一點數據,reduce端task就會拉取一小部分數據,然后立即進行后面的聚合、算子函數的使用等操作。

reduce端task能夠拉取多少數據,由reduce拉取數據的緩沖區buffer來決定,因為拉取過來的數據都是先放在buffer中,然后再進行后續的處理,buffer的默認大小為48MB。

reduce端task會一邊拉取一邊計算,不一定每次都會拉滿48MB的數據,可能大多數時候拉取一部分數據就處理掉了。

雖然說增大reduce端緩沖區大小可以減少拉取次數,提升Shuffle性能,但是有時map端的數據量非常大,寫出的速度非常快,此時reduce端的所有task在拉取的時候,有可能全部達到自己緩沖的最大極限值,即48MB,此時,再加上reduce端執行的聚合函數的代碼,可能會創建大量的對象,這可難會導致內存溢出,即OOM。

如果一旦出現reduce端內存溢出的問題,我們可以考慮減小reduce端拉取數據緩沖區的大小,例如減少為12MB。

在實際生產環境中是出現過這種問題的,這是典型的以性能換執行的原理。reduce端拉取數據的緩沖區減小,不容易導致OOM,但是相應的,reudce端的拉取次數增加,造成更多的網絡傳輸開銷,造成性能的下降。

注意,要保證任務能夠運行,再考慮性能的優化。

② JVM GC導致的shuffle文件拉取失敗

在Spark作業中,有時會出現shuffle file not found的錯誤,這是非常常見的一個報錯,有時出現這種錯誤以后,選擇重新執行一遍,就不再報出這種錯誤。

出現上述問題可能的原因是Shuffle操作中,后面stage的task想要去上一個stage的task所在的Executor拉取數據,結果對方正在執行GC,執行GC會導致Executor內所有的工作現場全部停止,比如BlockManager、基於netty的網絡通信等,這就會導致后面的task拉取數據拉取了半天都沒有拉取到,就會報出shuffle file not found的錯誤,而第二次再次執行就不會再出現這種錯誤。

可以通過調整reduce端拉取數據重試次數和reduce端拉取數據時間間隔這兩個參數來對Shuffle性能進行調整,增大參數值,使得reduce端拉取數據的重試次數增加,並且每次失敗后等待的時間間隔加長。

val conf = new SparkConf()

  .set("spark.shuffle.io.maxRetries", "60")

  .set("spark.shuffle.io.retryWait", "60s")

③ 解決各種序列化導致的報錯

當Spark作業在運行過程中報錯,而且報錯信息中含有Serializable等類似詞匯,那么可能是序列化問題導致的報錯。

序列化問題要注意以下三點:

  • 作為RDD的元素類型的自定義類,必須是可以序列化的;
  • 算子函數里可以使用的外部的自定義變量,必須是可以序列化的;
  • 不可以在RDD的元素類型、算子函數里使用第三方的不支持序列化的類型,例如Connection。

④ 解決算子函數返回NULL導致的問題

在一些算子函數里,需要我們有一個返回值,但是在一些情況下我們不希望有返回值,此時我們如果直接返回NULL,會報錯,例如Scala.Math(NULL)異常。

如果你遇到某些情況,不希望有返回值,那么可以通過下述方式解決:

  • 返回特殊值,不返回NULL,例如“-1”;
  • 在通過算子獲取到了一個RDD之后,可以對這個RDD執行filter操作,進行數據過濾,將數值為-1的數據給過濾掉;
  • 在使用完filter算子后,繼續調用coalesce算子進行優化。

⑤ 解決YARN-CLIENT模式導致的網卡流量激增問題

YARN-client模式的運行原理如下圖所示:

 

在YARN-client模式下,Driver啟動在本地機器上,而Driver負責所有的任務調度,需要與YARN集群上的多個Executor進行頻繁的通信。

假設有100個Executor, 1000個task,那么每個Executor分配到10個task,之后,Driver要頻繁地跟Executor上運行的1000個task進行通信,通信數據非常多,並且通信品類特別高。這就導致有可能在Spark任務運行過程中,由於頻繁大量的網絡通訊,本地機器的網卡流量會激增。

注意,YARN-client模式只會在測試環境中使用,而之所以使用YARN-client模式,是由於可以看到詳細全面的log信息,通過查看log,可以鎖定程序中存在的問題,避免在生產環境下發生故障。

在生產環境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不會造成本地機器網卡流量激增問題,如果YARN-cluster模式下存在網絡通信的問題,需要運維團隊進行解決。

⑥ 解決YARN-CLUSTER模式的JVM棧內存溢出無法執行問題

YARN-cluster模式的運行原理如下圖所示:

 

當Spark作業中包含SparkSQL的內容時,可能會碰到YARN-client模式下可以運行,但是YARN-cluster模式下無法提交運行(報出OOM錯誤)的情況。

YARN-client模式下,Driver是運行在本地機器上的,Spark使用的JVM的PermGen的配置,是本地機器上的spark-class文件,JVM永久代的大小是128MB,這個是沒有問題的,但是在YARN-cluster模式下,Driver運行在YARN集群的某個節點上,使用的是沒有經過配置的默認設置,PermGen永久代大小為82MB。

SparkSQL的內部要進行很復雜的SQL的語義解析、語法樹轉換等等,非常復雜,如果sql語句本身就非常復雜,那么很有可能會導致性能的損耗和內存的占用,特別是對PermGen的占用會比較大。

所以,此時如果PermGen的占用好過了82MB,但是又小於128MB,就會出現YARN-client模式下可以運行,YARN-cluster模式下無法運行的情況。

解決上述問題的方法時增加PermGen的容量,需要在spark-submit腳本中對相關參數進行設置,設置方法如代碼清單所示。

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

通過上述方法就設置了Driver永久代的大小,默認為128MB,最大256MB,這樣就可以避免上面所說的問題。

⑦ 解決SparkSQL導致的JVM棧內存溢出

當SparkSQL的sql語句有成百上千的or關鍵字時,就可能會出現Driver端的JVM棧內存溢出。

JVM棧內存溢出基本上就是由於調用的方法層級過多,產生了大量的,非常深的,超出了JVM棧深度限制的遞歸。(我們猜測SparkSQL有大量or語句的時候,在解析SQL時,例如轉換為語法樹或者進行執行計划的生成的時候,對於or的處理是遞歸,or非常多時,會發生大量的遞歸)

此時,建議將一條sql語句拆分為多條sql語句來執行,每條sql語句盡量保證100個以內的子句。根據實際的生產環境試驗,一條sql語句的or關鍵字控制在100個以內,通常不會導致JVM棧內存溢出。

⑧ 持久化與checkpoint的使用

Spark持久化在大部分情況下是沒有問題的,但是有時數據可能會丟失,如果數據一旦丟失,就需要對丟失的數據重新進行計算,計算完后再緩存和使用,為了避免數據的丟失,可以選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統上(比如HDFS)。

一個RDD緩存並checkpoint后,如果一旦發現緩存丟失,就會優先查看checkpoint數據存不存在,如果有,就會使用checkpoint數據,而不用重新計算。也即是說,checkpoint可以視為cache的保障機制,如果cache失敗,就使用checkpoint的數據。

使用checkpoint的優點在於提高了Spark作業的可靠性,一旦緩存出現問題,不需要重新計算數據,缺點在於,checkpoint時需要將數據寫入HDFS等文件系統,對性能的消耗較大。

⑨ 內存泄漏排查

內存泄露是指程序中已動態分配的堆內存由於某種原因程序未釋放或無法釋放,造成系統內存的浪費,導致程序運行速度減慢,甚至系統崩潰等嚴重后果。

在Spark Streaming中往往會因為開發者代碼未正確編寫導致無法回收或釋放對象,造成Spark Streaming內存泄露越跑越慢甚至崩潰的結果。那么排查內存泄露需要一些第三方的工具。

IBM HeapAnalyzer

官網地址

https://www.ibm.com/developerworks/community/groups/service/html/communityview?communityUuid=4544bafe-c7a2-455f-9d43-eb866ea60091

 

點擊下載 內存泄露分析工具

下載下來是一個jar包

 

那么需要編寫bat批處理來運行

創建run.bat

 

編輯

title ibm-heap-analyzer

 

path=%PATH%;%C:\JAVA\jdk1.8.0_51\bin

 

E:

 

cd E:\IBM heapAnalyzer\IBM_DUMP_wjfx

 

java.exe -Xms1048M -Xmx4096M -jar ha456.jar

 

路徑需要改成自己當前路徑

點擊run.bat運行

 

運行成功

模擬內存泄露場景

內存泄露的原因往往是因為對象無法釋放或被回收造成,那么在本項目中就模擬此場景。

 

 

 

如上圖所示,在計算學員知識點正確率與掌握度代碼中,在最后提交offset提交偏移量后,循環往map里添加LearnMode對象,使每處理一批數據就往map里添加100000個LearnMode對象,使堆內存撐滿。

查找driver進程

在集群上提交spark streaming任務

ps -ef |grep com.atguigu.qzpoint.streaming.QzPointStreaming 

通過此命令查找到driver進程號

 

進程號為6860

 

 

通過Spark Ui發現該Spark Straming task任務發生長時間卡住現象,GC出現異常。疑似發生內存泄露

JMAP命令

 使用jmap -heap pid命令查看6860進程,內存使用情況。

 jmap -heap 6860    

⑩ 頻繁GC問題

1、打印GC詳情

統計一下GC啟動的頻率和GC使用的總時間,在spark-submit提交的時候設置參數

--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

如果出現了多次Full GC,首先考慮的是可能配置的Executor內存較低,這個時候需要增加Executor Memory來調節。

2、如果一個任務結束前,Full GC執行多次,說明老年代空間被占滿了,那么有可能是沒有分配足夠的內存。

  • 1.調整executor的內存,配置參數executor-memory
  • 2.調整老年代所占比例:配置-XX:NewRatio的比例值
  • 3.降低spark.memory.storageFraction減少用於緩存的空間

3、如果有太多Minor GC,但是Full GC不多,可以給Eden分配更多的內存。

  • 1.比如Eden代的內存需求量為E,可以設置Young代的內存為-Xmn=4/3*E,設置該值也會導致Survivor區域擴張
  • 2.調整Eden在年輕代所占的比例,配置-XX:SurvivorRatio的比例值

4、調整垃圾回收器,通常使用G1GC,即配置-XX:+UseG1GC。當Executor的堆空間比較大時,可以提升G1 region size(-XX:G1HeapRegionSize),在提交參數指定:

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:G1HeapRegionSize=16M -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM