Spark 3.0 新特性 之 自適應查詢與分區動態裁剪


Spark憋了一年半的大招后,發布了3.0版本,新特性主要與Spark SQL和Python相關。這也恰恰說明了大數據方向的兩大核心:BI與AI。下面是本次發布的主要特性,包括性能、API、生態升級、數據源、SQL兼容、監控和調試等方面的升級。

本次主要整理了性能方面的優化,包括了自適應查詢與動態分區裁剪。

1 自適應查詢

AQE,Adaptive Query Execution,說的簡單點就是讓Spark在運行中根據搜集到的信息靈活采取優化手段,提升性能。

說起這個可以先回想下Spark的發展歷史,在1.x時代Spark通過RDD的編程形成DAG圖,這個階段可以說沒啥優化完全是按照規則來執行;在2.x時代,引入了代價計算,Spark會通過提前進行代價計算,選擇代價最小的查詢計划(跟大部分的數據庫類似,代價計算依賴於數據本身的統計,如數據量、文件大小、分區數等,由於Spark是存儲與計算分離的模式,因此這些統計信息有時候會缺失或者不准確,那么得到的查詢代價自然也就不准確了);在3.x時代,引入自適應查詢,即在運行的過程中可以根據得到的緩存數據信息動態調整分區策略、join策略等。這樣就保證了剛開始表的統計信息不准,可能查詢計划不是最高效的,但是隨着查詢的執行,可以動態優化整個查詢計划。

那么到底自適應都可以做什么呢?

1.1 動態分區合並

在Spark的經典優化策略里,調整分區數從而改變並行度是最基本的優化手段,可以調整的分區數卻不是那么容易找到最優值的。分區數太小,可能導致單個分區內的數據太多,單個任務的執行效率低下;分區數太大,可能導致碎片太多,任務之間來回切換浪費性能。比如經典的shuffle操作后,每個shuffle數據都需要對應的reduce端接收處理,如果分區數過多,有可能導致某幾個任務讀取的數據量很小,造成資源的浪費。

引入AQE后,Spark會自動把數據量很小的分區進行合並處理:

1.2 動態join策略選擇

在Spark中支持多種join策略,這些策略在不同的分布式框架中差不多。分別是:

  • Broadcast Hash Join(BHJ),廣播 join
  • Shuffle Hash Join(SHJ),哈希 join
  • Sort Merge Join(SMJ),排序 join

BHJ是當小表與大表關聯時,把小表廣播到大表的每個分區中,每個分區都與完整的小表進行關聯,最后合並得到結果。像Spark會配置一個參數 spark.sql.autoBroadcastJoinThreshold 來決定小於這個配置的表就認為是小表,然后采用廣播策略(默認10MB)。一般廣播的套路是把小表拷貝到driver端,然后分發到每個executor工作節點上,因此如果表的數據太大,會導致來回復制的數據太多,性能低下,因此BHJ僅適用於廣播小表。

SHJ是針對表的數據量過大時,按照分區列進行打散,兩張表按照不同的分區重新排列數據。不過這種JOIN方法也有個弊端,就是需要對應分區的兩張表數據都同時加載完成,才能開始計算。如果兩張表的數據量都很大,有可能會造成分區節點內存溢出。

SMJ是針對上述的情況,在確定shuffle分區后對數據進行排序,這樣兩張表可以不需要等待數據全部加載到內存,只要對應的排序數據部分加載完成后就可以提前開始。

總結完三種join策略后,可以發現假設由於數據統計信息的缺失或不准確,或者是過濾條件的影響,可能會按照原來表的大小判斷join的策略。比如某個表初始的時候15M,達不到廣播join的要求,但是該表在查詢過程中有個filter條件可以讓表僅保留8M的有效數據,此時就可以采用廣播join了。AQE就是利用這種特性,在運行時動態檢測表的大小,當表的大小達到要求后會優化join為廣播join。

1.3 數據傾斜優化

在分布式查詢中某個查詢任務會同時分拆成多個任務運行在不同的機器上,假設某個任務對應的數據量很大,就會引發數據傾斜的問題。比如下面的兩張表關聯,但是左表的第一個分區數據量很多,就會引發數據傾斜問題.

AQE可以在運行時檢測到數據傾斜,並把大分區分割成多個小分區同時與對應的右表進行關聯。

2 動態分區裁剪

這個比較好理解,正常Spark或Hive在查詢時,會根據查詢條件與分區字段自動過濾底層的數據文件。但是如果過濾條件沒有及時的反映到查詢上,就會導致數據被冗余加載。比如左邊的是沒有動態分區裁剪的情況,兩張表進行關聯操作,左表包含一個過濾條件,右表需要全表讀取。經過動態分區優化后,右表可以直接添加過濾條件,如 id in (select id from lefttable where filter_cond) , 這樣可以提前過濾掉部分數據。

3 關聯提示

之前在Flink中看到過這種用法,即在sql中使用某種代碼提示,讓編譯器根據代碼提示選擇優化策略執行。語法如:/** xxx /。比如 select /* BROADCAST(a) */ * from a join b on a.id = b.id,可以強制a表廣播與b表進行關聯操作。

以上就是主要的性能方面的優化。其他方面由於工作內容涉及的不多,因此就先不過多整理了,感興趣可以去官網或者觀看上面的分享視頻。需要額外一提的是,官方文檔也有兩個很重要的調整:

1 增加了SQL相關的文檔

2 增加了UI方面的說明

后續會分享更多Spark相關的原理和特性文章。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM