之前已經對spark core做了較為深入的解讀,在如今SQL大行其道的背景下,spark中的SQL不僅在離線batch處理中使用廣泛,structured streamming的實現也嚴重依賴spark SQL。因此,接下來,會對spark SQL做一個較為深入的了解。
本文首先介紹一下spark sql的整體流程,然后對這個流程之中涉及到的第一個步驟:SQL語法解析部分做一下較為深入的分析。
1,spark sql概述
首先截取一張任何介紹spark sql實現都會出現的圖(如下)。
總體執行流程如下:從提供的輸入API(SQL,Dataset, dataframe)開始,依次經過unresolved邏輯計划,解析的邏輯計划,優化的邏輯計划,物理計划,然后根據cost based優化,選取一條物理計划進行執行。從unresolved logical plan開始, sql的查詢是通過抽象語法樹(AST)來表示的,所以以后各個操作都是對AST進行的等價轉換操作。 針對以上過程作如下幾點說明:
1,編程接口:通過像df.groupBy("age")這樣的Dataset接口構造查詢過程,抽象語法樹(AST)會自動建立。而通過“SELECT name, count(age) FROM people where age > 21 group by name” 這樣的sql語句進行查詢時,需要增加一個步驟是,需要將SQL解析成AST(spark 2.2中目前是借助於antlr4來做的,具體見后面分析)。
2,經過步驟1后,我們可以得到unresolved logical plan,此時像以上sql中的name,count(age),people都是unresolved attribute,relation等,他們是AST樹TreeNode的一中類型,但是他們是不能被計算的(實現了Unevaluable接口)。
3,unresolved logical plan通過Analyzer模塊定義的一系列規則,將步驟2中的unresolved的attribute,relation借助catalog去解析,如將之前提到的unresolved attribute轉換成resolved attribute。此時,如果sql中某個表不存在或者列和表不對應,在此階段便可以發現。Analyzer定義一系列部分規則如下:
4,解析成resolved logical plan以后,通過一系列優化規則會將resolved logical plan的AST轉化成optimized logical plan的AST。這些優化包括基於規則和代價的優化,比如謂詞下推,列值裁剪等。
5,AST到了optimized logical plan以后,利用如下的策略將邏輯計划轉化成物理計划,物理計划是可以執行的計划。當有相關的action操作時,物理計划便可以執行。
2,SQL Parser的具體實現
在上節步驟1中提到,如果使用選擇使用SQL進行查詢,首先需要將SQL解析成spark中的抽象語法樹(AST)。在spark中是借助開源的antlr4庫來解析的。Spark SQL的語法規則文件是:SqlBase.g4。該文件以及生成的相關文件截圖如下。
在生成的文件中SqlBaseBaseListener和SqlBaseBaseVistor分別代表兩種遍歷AST的方法,在spark中主要用了visitor模式。
接下來,將看一下spark中,當使用spark.sql("select *** from ...")時,sql怎么解析成spark內部的AST的?
1,用戶調用的spark.sql的入口是sparkSession中sql函數,該函數最終返回DataFrame(DataSet[Row]),sql的解析的過程主要是在
sessionState.sqlParser.parsePlan(sqlText)中發生的。
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
2,調用到parsePlan,將調用parse函數,傳入的兩個參數分為:sql語句,sqlBaseParse到LogicalPlan的一個函數。
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
3,在parse函數中,首先構造SqlBaseLexer詞法分析器,接着構造Token流,最終SqlBaseParser對象,然后一次嘗試用不同的模式去進行解析。最終將執行parsePlan中傳入的函數。
4,在步驟2中,astBuilder是SparkSqlAstBuilder的實例,在將Antlr中的匹配樹轉換成unresolved logical plan中,它起着橋梁作用。
astBuilder.visitSingleStatement使用visitor模式,開始匹配SqlBase.g4中sql的入口匹配規則:
singleStatement
: statement EOF
;
遞歸的遍歷statement,以及其后的各個節點。在匹配過程中,碰到葉子節點,就將構造Logical Plan中對應的TreeNode。如當匹配到
singleTableIdentifier
: tableIdentifier EOF
;
規則時(單表的標識符)。即調用的函數如下:
override def visitSingleTableIdentifier(
ctx: SingleTableIdentifierContext): TableIdentifier = withOrigin(ctx) {
visitTableIdentifier(ctx.tableIdentifier)
}
可以看到將遞歸遍歷對應的tableIdentifier,tableIdentifier的定義和遍歷規則如下:
tableIdentifier
: (db=identifier '.')? table=identifier
;
override def visitTableIdentifier(
ctx: TableIdentifierContext): TableIdentifier = withOrigin(ctx) {
TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))
}
可以看到當匹配到tableIdentifier,將直接生成TableIdentifier對象,而該對象是TreeNode的一種。經過類似以上的過程,匹配結束后整個spark內部的抽象語法樹也就建立起來了。
3,小結
本文主要介紹spark catalyst的總體執行情況,以及sql parse的具體實現細節。接下來,計划還將對Analyzer,Optimization,以及執行的過程做更深入的分析。