靜態分區裁剪(Static Partition Pruning)
用過 Spark 的同學都知道,Spark SQL 在查詢的時候支持分區裁剪,比如我們如果有以下的查詢:
SELECT * FROM Sales_iteblog WHERE day_of_week = 'Mon'
Spark 會自動進行以下的優化:
從上圖可以看到,Spark 在編譯 SQL 的時候自動將 Filter 算子下推到數據源,也就是在 Scan 前進行了 Filter 操作,將 day_of_week = 'Mon' 的數據全部拿出來,其他數據不需要的拿出,這樣 Spark SQL 中處理的數據就變少了,整個 SQL 的查詢數據就會變快,這一切都是編譯的時候(compile time)進行的,所以這個叫做靜態分區裁剪(Static Partition Pruning)。
上面的 SQL 查詢在 Spark 進行了算子下推,已經能夠滿足我們的查詢性能的提升。但是現實世界數據的查詢可不會都這么簡單,我們來看看下面的查詢語句:
從上圖可以看出,查詢引擎直接忽略了 Date.day_of_week = 'Mon' 這個過濾條件,上來就是兩表 join,然后 join 的結果再進行過濾,這個可能導致很多無效的計算,如果 Date.day_of_week = 'Mon' 可以過濾掉大量的無用數據,肯定可以提升查詢性能。在 Spark SQL 里面能夠很好的處理這種情況,它會把 Date.day_of_week = 'Mon' 過濾條件下推到 Date 表的 Scan 之前進行:
動態分區裁剪(Dynamic Partition Pruning)
上面 Spark SQL 進行的算子下推是不是不能再提升查詢性能呢?有沒有一種更好的方法進一步過濾掉一些無用的數據?這就是本文要介紹的動態分區裁剪。動態分區裁剪這個功能是 Spark 3.0 引入的,詳見 SPARK-11150、SPARK-28888。
什么是動態分區裁剪?所謂的動態分區裁剪就是基於運行時(run time)推斷出來的信息來進一步進行分區裁剪。舉個例子,我們有如下的查詢:
SELECT * FROM dim_iteblog JOIN fact_iteblog ON (dim_iteblog.partcol = fact_iteblog.partcol) WHERE dim_iteblog.othercol > 10
動態查詢的執行計划如下:
從上面可以看出,擁有動態分區裁剪,Spark 能夠在運行的時候先對 fact_iteblog 表的 partcol 進行了一次過濾,然后再和 dim_iteblog 表進行 Join,可想而知這個性能一般都會有提升的,特別是在 fact_iteblog 表有很多無用的數據時性能提升會非常大的。
Databricks 公司在10台配置為 i3.xlarge 的集群上進行 TPC-DS 測試,得到的結論是在 102 查詢中相比 Spark 2.4 有 60 個查詢的查詢性能提升了 2 - 18 倍的提升。在 Query 98 的查詢中,性能提升了 100 倍!
相關配置
要啟用動態分區裁剪需要將 spark.sql.optimizer.dynamicPartitionPruning.enabled
參數設置為 true(默認),其他相關參數:
spark.sql.optimizer.dynamicPartitionPruning.useStats
:true(默認),When true, distinct count statistics will be used for computing the data size of the partitioned table after dynamic partition pruning, in order to evaluate if it is worth adding an extra subquery as the pruning filter if broadcast reuse is not applicable.spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio
:0.5,When statistics are not available or configured not to be used, this config will be used as the fallback filter ratio for computing the data size of the partitioned table after dynamic partition pruning, in order to evaluate if it is worth adding an extra subquery as the pruning filter if broadcast reuse is not applicable.spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast
:默認為 true,When true, dynamic partition pruning will seek to reuse the broadcast results from a broadcast hash join operation.
轉自過往記憶大數據 https://www.iteblog.com