前言
Catalyst是Spark SQL核心優化器,早期主要基於規則的優化器RBO,后期又引入基於代價進行優化的CBO。但是在這些版本中,Spark SQL執行計划一旦確定就不會改變。由於缺乏或者不准確的數據統計信息(如行數、不同值的數量、NULL值、最大/最小值等)和對成本的錯誤估算導致生成的初始計划不理想,從而導致執行效率相對低下。
那么就引來一個思考:我們如何能夠在運行時獲取更多的執行信息,然后根據這些信息來動態調整並選擇一個更優的執行計划呢?
Spark SQL自適應執行優化引擎(Adaptive Query Execution,簡稱AQE)應運而生,它可以根據執行過程中的中間數據優化后續執行,從而提高整體執行效率。核心在於:通過在運行時對查詢執行計划進行優化,允許Spark Planner在運行時執行可選的執行計划,這些計划將基於運行時統計數據進行優化,從而提升性能。
AQE完全基於精確的運行時統計信息進行優化,引入了一個基本的概念Query Stages,並且以Query Stage為粒度,進行運行時的優化,其工作原理如下所示:
-
由shuffle和broadcast exchange把查詢執行計划分為多個query stage,query stage執行完成時獲取中間結果
- query stage邊界是運行時優化的最佳時機(天然的執行間歇;分區、數據大小等統計信息已經產生)
整個AQE的工作原理以及流程為:
- 運行沒有依賴的stage
- 在一個stage完成時再依據新的統計信息優化剩余部分
- 執行其他已經滿足依賴的stage
- 重復步驟(2)(3)直至所有stage執行完成
Spark從2.3版本,就開始"試驗"SparkSQL自適應查詢執行功能(Adaptive Query Execution),並在Spark3.0正式發布。

自適應查詢執行框架(AQE)
自適應查詢執行最重要的問題之一是何時進行重新優化。Spark算子通常是pipeline化的,並以並行的方式執行。然而shuffle或broadcast exchange會打破這個pipeline。我們稱它們為物化點,並使用術語"查詢階段"來表示查詢中由這些物化點限定的子部分。每個查詢階段都會物化它的中間結果,只有當運行物化的所有並行進程都完成時,才能繼續執行下一個階段。這為重新優化提供了一個絕佳的機會,因為此時所有分區上的數據統計都是可用的,並且后續操作還沒有開始。

當查詢開始時,自適應查詢執行框架首先啟動所有葉子階段(leaf stages)—— 這些階段不依賴於任何其他階段。一旦其中一個或多個階段完成物化,框架便會在物理查詢計划中將它們標記為完成,並相應地更新邏輯查詢計划,同時從完成的階段檢索運行時統計信息。
基於這些新的統計信息,框架將運行優化程序、物理計划程序以及物理優化規則,其中包括常規物理規則(regular physical rules)和自適應執行特定的規則,如coalescing partitions(合並分區)、skew join handling(join數據傾斜處理)等。現在我們有了一個新優化的查詢計划,其中包含一些已完成的階段,自適應執行框架將搜索並執行子階段已全部物化的新查詢階段,並重復上面的execute-reoptimize-execute過程,直到完成整個查詢。
在Spark 3.0中,AQE框架帶來了以下三個特性:
- Dynamically coalescing shuffle partitions(動態合並shuffle的分區)可以簡化甚至避免調整shuffle分區的數量。用戶可以在開始時設置相對較多的shuffle分區數,AQE會在運行時將相鄰的小分區合並為較大的分區
- Dynamically switching join strategies(動態調整join策略)在一定程度上避免由於缺少統計信息或着錯誤估計大小(當然也可能兩種情況同時存在),而導致執行次優計划的情況。這種自適應優化可以在運行時sort merge join轉換成broadcast hash join,從而進一步提升性能
- Dynamically optimizing skew joins(動態優化數據傾斜的join)skew joins可能導致負載的極端不平衡,並嚴重降低性能。在AQE從shuffle文件統計信息中檢測到任何傾斜后,它可以將傾斜的分區分割成更小的分區,並將它們與另一側的相應分區連接起來。這種優化可以並行化傾斜處理,獲得更好的整體性能。
下面我們來詳細介紹這三個特性。
動態合並shuffle的分區
當在Spark中運行查詢來處理非常大的數據時,shuffle通常對查詢性能有非常重要的影響。shuffle是一個昂貴的操作,因為它需要在網絡中移動數據,以便數據按照下游操作所要求的方式重新分布。
分區的數量是shuffle的一個關鍵屬性。分區的最佳數量取決於數據,但是數據大小可能在不同的階段、不同的查詢之間有很大的差異,這使得這個分區數很難調優:
- 如果分區數太少,那么每個分區處理的數據可能非常大,處理這些大分區的任務可能需要將數據溢寫到磁盤(例如,涉及排序或聚合的操作),從而減慢查詢速度
- 如果分區數太多,那么每個分區處理的數據可能非常小,並且將有大量的網絡數據獲取來讀取shuffle塊,這也會由於低效的I/O模式而減慢查詢速度。大量的task也會給Spark任務調度程序帶來更多的負擔
為了解決這個問題,我們可以在開始時設置相對較多的shuffle分區數,然后在運行時通過查看shuffle文件統計信息將相鄰的小分區合並為較大的分區。
假設我們運行如下SQL:
SELECT max(i)FROM tbl GROUP BY j
tbl表的輸入數據相當小,所以在分組之前只有兩個分區。我們把初始的shuffle分區數設置為5,因此在shuffle的時候數據被打亂到5個分區中。如果沒有AQE,Spark將啟動5個task來完成最后的聚合。然而,這里有三個非常小的分區,為每個分區啟動一個單獨的task將是一種浪費。

使用AQE之后,Spark將這三個小分區合並為一個,因此,最終的聚合只需要執行3個task,而不是5個task。
動態調整join策略
Spark支持多種join策略(如broadcast hash join、shuffle hash join、sort merge join),通常broadcast hash join是性能最好的,前提是參與join的一張表的數據能夠裝入內存。由於這個原因,當Spark估計參與join的表數據量小於廣播大小的閾值時,它會將join策略調整為broadcast hash join。但是,很多情況都可能導致這種大小估計出錯——例如存在一個非常有選擇性的過濾器。
為了解決這個問題,AQE現在根據最精確的連接關系大小在運行時重新規划join策略。在下面的示例中可以看到join的右側比估計值小得多,並且小到足以進行廣播,因此在AQE重新優化之后,靜態計划的sort merge join會被轉換為broadcast hash join。
對於在運行時轉換的broadcast hash join,我們可以進一步將常規的shuffle優化為本地化shuffle來減少網絡流量。
動態優化數據傾斜的join
當數據在集群中的分區之間分布不均時,就會發生數據傾斜。嚴重的傾斜會顯著降低查詢性能,特別是在進行join操作時。AQE傾斜join優化從shuffle文件統計信息中自動檢測到這種傾斜。然后,它將傾斜的分區分割成更小的子分區,這些子分區將分別從另一端連接到相應的分區。
假設表A join 表B,其中表A的分區A0里面的數據明顯大於其他分區。

skew join optimization將把分區A0分成兩個子分區,並將每個子分區join表B的相應分區B0。

如果沒有這個優化,將有四個任務運行sort merge join,其中一個任務將花費非常長的時間。在此優化之后,將有5個任務運行join,但每個任務將花費大致相同的時間,從而獲得總體更好的性能。
AQE查詢計划
AQE查詢計划的一個主要區別是,它通常隨着執行的進展而演變。引入了幾個AQE特定的計划節點,以提供有關執行的更多詳細信息。
此外,AQE使用了一種新的查詢計划字符串格式,可以顯示初始和最終的查詢執行計划。
|| AdaptiveSparkPlan節點
應用了AQE的查詢通常有一個或多個AdaptiveSparkPlan節點作為每個查詢或子查詢的root節點。在執行之前或期間,isFinalPlan標志將顯示為false。查詢完成后,此標志將變為true,並且AdaptiveSparkPlan節點下的計划將不再變化。


|| CustomShuffleReader節點
CustomShuffleReader節點是AQE優化的關鍵。它可以根據在shuffle map stage收集的統計信息動態調整shuffle后的分區數。在Spark UI中,用戶可以將鼠標懸停在該節點上,以查看它應用於無序分區的優化。
當CustomShuffleReader的標志為coalesced時,表示AQE已根據目標分區大小在shuffle后檢測並合並了小分區。此節點的詳細信息顯示合並后的無序分區數和分區大小。


當CustomShuffleReader的標志為"skewed"時,這意味着AQE在排序合並連接操作之前檢測到一個或多個分區中的數據傾斜。此節點的詳細信息顯示了傾斜分區的數量以及從傾斜分區拆分的新分區的總數。


coalesced和skewed也可以同時發生:


|| 檢測join策略改變
通過比較AQE優化前后查詢計划join節點的變化,可以識別join策略的變化。在dbr7.3中,AQE查詢計划字符串將包括初始計划(應用任何AQE優化之前的計划)和當前或最終計划。這樣可以更好地了解應用於查詢的優化AQE。


Spark UI將只顯示當前計划。為了查看使用Spark UI的效果,用戶可以比較查詢執行之前和執行完成后的計划圖:


|| 檢測傾斜join
傾斜連接優化的效果可以通過連接節點名來識別。
在Spark UI中:

在查詢計划字符串中:

AQE的TPC-DS表現
在我們使用TPC-DS數據和查詢的實驗中,自適應查詢執行的查詢性能提高了8倍,32個查詢的性能提高了1.1倍以上。下面是通過AQE獲得的10個TPC-DS查詢性能提高最多的圖表。

這些改進大部分來自動態分區合並和動態join策略調整,因為隨機生成的TPC-DS數據沒有傾斜。在實際生產中,AQE 帶來了更大的性能提升。
啟用AQE
可以通過設置參數spark.sql.adaptive為true來啟用AQE(在Spark3.0中默認為false)。
如果查詢滿足以下條件建議啟用:
- 不是一個流查詢
- 至少包含一個exchange(通常在有join、聚合或窗口操作時)或是一個子查詢
通過減少查詢優化對靜態統計的依賴,AQE解決了Spark基於成本優化的最大難題之一:統計信息收集開銷和估計精度之間的平衡。
為了獲得最佳的估計精度和規划結果,通常需要維護詳細的、最新的統計信息,其中一些統計信息的收集成本很高,比如列直方圖,它可用於提高選擇性和基數估計或檢測數據傾斜。AQE在很大程度上消除了對此類統計數據的需要,以及手動調優工作的需要。
除此之外,AQE還使SQL查詢優化對於任意udf和不可預測的數據集更改(例如數據大小的突然增加或減少、頻繁的和隨機的數據傾斜等)更有彈性。不再需要提前"知道"您的數據。隨着查詢的運行,AQE將計算出數據並改進查詢計划,提高查詢性能以獲得更快的分析和系統性能。
本文主要參譯自:
1.https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
2.https://databricks.com/blog/2020/10/21/faster-sql-adaptive-query-execution-in-databricks.html
關於Spark3.0更多特性,感興趣的同學建議去Spark官網和Databricks官方博客學習。
關於AQE也可以參考:
https://my.oschina.net/hblt147/blog/3006406
https://www.cnblogs.com/zz-ksw/p/11254294.html
https://issues.apache.org/jira/browse/SPARK-23128
推薦文章:
Apache Spark 3.0.0重磅發布 —— 重要特性全面解析
Spark在處理數據的時候,會將數據都加載到內存再做處理嗎?
SparkSQL中產生笛卡爾積的幾種典型場景以及處理策略
Spark SQL如何選擇join策略
Spark實現推薦系統中的相似度算法
Spark閉包 | driver & executor程序代碼執行
Spark SQL | 目前Spark社區最活躍的組件之一
關注大數據學習與分享,獲取更多技術干貨
