Spark SQL catalyst概述和SQL Parser的具體實現


 

之前已經對spark core做了較為深入的解讀,在如今SQL大行其道的背景下,spark中的SQL不僅在離線batch處理中使用廣泛,structured streamming的實現也嚴重依賴spark SQL。因此,接下來,會對spark SQL做一個較為深入的了解。

本文首先介紹一下spark sql的整體流程,然后對這個流程之中涉及到的第一個步驟:SQL語法解析部分做一下較為深入的分析。

 

1,spark sql概述

首先截取一張任何介紹spark sql實現都會出現的圖(如下)。

總體執行流程如下:從提供的輸入API(SQL,Dataset, dataframe)開始,依次經過unresolved邏輯計划,解析的邏輯計划,優化的邏輯計划,物理計划,然后根據cost based優化,選取一條物理計划進行執行。從unresolved logical plan開始, sql的查詢是通過抽象語法樹(AST)來表示的,所以以后各個操作都是對AST進行的等價轉換操作。 針對以上過程作如下幾點說明:

1,編程接口:通過像df.groupBy("age")這樣的Dataset接口構造查詢過程,抽象語法樹(AST)會自動建立。而通過“SELECT name, count(age) FROM people where age > 21 group by name” 這樣的sql語句進行查詢時,需要增加一個步驟是,需要將SQL解析成AST(spark 2.2中目前是借助於antlr4來做的,具體見后面分析)。

2,經過步驟1后,我們可以得到unresolved logical plan,此時像以上sql中的name,count(age),people都是unresolved attribute,relation等,他們是AST樹TreeNode的一中類型,但是他們是不能被計算的(實現了Unevaluable接口)。

3,unresolved logical plan通過Analyzer模塊定義的一系列規則,將步驟2中的unresolved的attribute,relation借助catalog去解析,如將之前提到的unresolved attribute轉換成resolved attribute。此時,如果sql中某個表不存在或者列和表不對應,在此階段便可以發現。Analyzer定義一系列部分規則如下:

4,解析成resolved logical plan以后,通過一系列優化規則會將resolved logical plan的AST轉化成optimized logical plan的AST。這些優化包括基於規則和代價的優化,比如謂詞下推,列值裁剪等。

5,AST到了optimized logical plan以后,利用如下的策略將邏輯計划轉化成物理計划,物理計划是可以執行的計划。當有相關的action操作時,物理計划便可以執行。

2,SQL Parser的具體實現

在上節步驟1中提到,如果使用選擇使用SQL進行查詢,首先需要將SQL解析成spark中的抽象語法樹(AST)。在spark中是借助開源的antlr4庫來解析的。Spark SQL的語法規則文件是:SqlBase.g4。該文件以及生成的相關文件截圖如下。

在生成的文件中SqlBaseBaseListener和SqlBaseBaseVistor分別代表兩種遍歷AST的方法,在spark中主要用了visitor模式。

接下來,將看一下spark中,當使用spark.sql("select *** from ...")時,sql怎么解析成spark內部的AST的?

1,用戶調用的spark.sql的入口是sparkSession中sql函數,該函數最終返回DataFrame(DataSet[Row]),sql的解析的過程主要是在

sessionState.sqlParser.parsePlan(sqlText)中發生的。
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
2,調用到parsePlan,將調用parse函數,傳入的兩個參數分為:sql語句,sqlBaseParse到LogicalPlan的一個函數。
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}

3,在parse函數中,首先構造SqlBaseLexer詞法分析器,接着構造Token流,最終SqlBaseParser對象,然后一次嘗試用不同的模式去進行解析。最終將執行parsePlan中傳入的函數。

4,在步驟2中,astBuilder是SparkSqlAstBuilder的實例,在將Antlr中的匹配樹轉換成unresolved logical plan中,它起着橋梁作用。

astBuilder.visitSingleStatement使用visitor模式,開始匹配SqlBase.g4中sql的入口匹配規則:
singleStatement
: statement EOF
;

遞歸的遍歷statement,以及其后的各個節點。在匹配過程中,碰到葉子節點,就將構造Logical Plan中對應的TreeNode。如當匹配到

singleTableIdentifier
: tableIdentifier EOF
;

規則時(單表的標識符)。即調用的函數如下:

override def visitSingleTableIdentifier(
ctx: SingleTableIdentifierContext): TableIdentifier = withOrigin(ctx) {
visitTableIdentifier(ctx.tableIdentifier)
}

可以看到將遞歸遍歷對應的tableIdentifier,tableIdentifier的定義和遍歷規則如下:

tableIdentifier
: (db=identifier '.')? table=identifier
;
override def visitTableIdentifier(
ctx: TableIdentifierContext): TableIdentifier = withOrigin(ctx) {
TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))
}

可以看到當匹配到tableIdentifier,將直接生成TableIdentifier對象,而該對象是TreeNode的一種。經過類似以上的過程,匹配結束后整個spark內部的抽象語法樹也就建立起來了。

3,小結

本文主要介紹spark catalyst的總體執行情況,以及sql parse的具體實現細節。接下來,計划還將對Analyzer,Optimization,以及執行的過程做更深入的分析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM