首先看個Not in Subquery的SQL:
// test_partition1 和 test_partition2為Hive外部分區表 select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
對應的完整的邏輯計划和物理計划為:
== Parsed Logical Plan == 'Project [*] +- 'Filter NOT 't1.id IN (list#3 []) : +- 'Project ['id] : +- 'UnresolvedRelation `test_partition2` +- 'SubqueryAlias `t1` +- 'UnresolvedRelation `test_partition1` == Analyzed Logical Plan == id: string, name: string, dt: string Project [id#4, name#5, dt#6] +- Filter NOT id#4 IN (list#3 []) : +- Project [id#7] : +- SubqueryAlias `default`.`test_partition2` : +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9] +- SubqueryAlias `t1` +- SubqueryAlias `default`.`test_partition1` +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] == Optimized Logical Plan == Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))) :- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] +- Project [id#7] +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9] == Physical Plan == BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7))) :- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] +- BroadcastExchange IdentityBroadcastMode +- Scan hive default.test_partition2 [id#7], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
通過上述邏輯計划和物理計划可以看出,Spark SQL在對not in subquery處理,從邏輯計划轉換為物理計划時,會最終選擇BroadcastNestedLoopJoin(對應到Spark源碼中BroadcastNestedLoopJoinExec.scala)策略。
提起BroadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到應用,比如mysql。它的工作方式是循環從一張表(outer table)中讀取數據,然后訪問另一張表(inner table,通常有索引),將outer表中的每一條數據與inner表中的數據進行join,類似一個嵌套的循環並且在循環的過程中進行數據的比對校驗是否滿足一定條件。
對於被連接的數據集較小的情況下,Nested Loop Join是個較好的選擇。但是當數據集非常大時,從它的執行原理可知,效率會很低甚至可能影響整個服務的穩定性。
而Spark SQL中的BroadcastNestedLoopJoin就類似於Nested Loop Join,只不過加上了廣播表(build table)而已。
BroadcastNestedLoopJoin是一個低效的物理執行計划,內部實現將子查詢(select id from test_partition2)進行廣播,然后test_partition1每一條記錄通過loop遍歷廣播的數據去匹配是否滿足一定條件。
private def leftExistenceJoin( // 廣播的數據 relation: Broadcast[Array[InternalRow]], exists: Boolean): RDD[InternalRow] = { assert(buildSide == BuildRight) /* streamed對應物理計划中: Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] */ streamed.execute().mapPartitionsInternal { streamedIter => val buildRows = relation.value val joinedRow = new JoinedRow // 條件是否定義。此處為Some(((id#4 = id#7) || isnull((id#4 = id#7)))) if (condition.isDefined) { streamedIter.filter(l => // exists主要是為了根據joinType來進一步條件判斷數據的返回與否,此處joinType為LeftAnti buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists ) // else } else if (buildRows.nonEmpty == exists) { streamedIter } else { Iterator.empty } } }
由於BroadcastNestedLoopJoin的低效率執行,可能導致長時間占用executor資源,影響集群性能。同時,因為子查詢的結果集要進行廣播,如果數據量特別大,對driver端也是一個嚴峻的考驗,極有可能帶來OOM的風險。因此,在實際生產中,要盡可能利用其他效率相對高的SQL來避免使用Not in Subquery。
雖然通過改寫Not in Subquery的SQL,進行低效率的SQL到高效率的SQL過渡,能夠避免上面所說的問題。但是這往往建立在我們發現任務執行慢甚至失敗,然后排查任務中的SQL,發現"問題"SQL的前提下。那么如何在任務執行前,就"檢查"出這樣的SQL,從而進行提前預警呢?
這里筆者給出一個思路,就是解析Spark SQL計划,根據Spark SQL的join策略匹配條件等,來判斷任務中是否使用了低效的Not in Subquery進行預警,然后通知業務方進行修改。同時,我們在實際完成數據的ETL處理等分析時,也要事前避免類似的低性能SQL。
關聯文章:
Spark SQL如何選擇join策略
關注微信公眾號:大數據學習與分享,獲取更對技術干貨