spark-sql 架構
圖1
圖1是sparksql的執行架構,主要包括邏輯計划和物理計划幾個階段,下面對流程詳細分析。
sql執行流程
總體流程
- parser;基於antlr框架對 sql解析,生成抽象語法樹
- 變量替換,通過正則表達式找出符合規則的字符串,替換成系統緩存環境的變量
SQLConf中的`spark.sql.variable.substitute`,默認是可用的;參考` SparkSqlParser`
- parser;將antlr的tree轉成spark catalyst的LogicPlan也就是unresolve logical plan;詳細參考`AstBuild`, `ParseDriver`
- analyzer;通過分析器,結合catalog,把logical plan和實際的數據綁定起來,將unresolve logical plan生成 logical plan;詳細參考`QureyExecution`
- 緩存替換,通過CacheManager,替換有相同結果的logical plan
- logical plan優化,基於規則的優化;優化規則參考Optimizer,優化執行器RuleExecutor
- 生成spark plan,也就是物理計划;參考`QueryPlanner`和`SparkStrategies`
- spark plan准備階段
- 構造RDD執行,涉及spark的wholeStageCodegenExec機制,基於janino框架生成java代碼並編譯
其中`SessionState`類中維護了所有參與sql執行流程的實例對象,`QueryExecution`類則是實際處理SQL執行邏輯的類。需要注意的是,除了第1步,第2步和第3步是立即執行的,這是由於需要判斷sql的合法性以及當前catalog環境下是否存在sql中的庫表結構等,其他步驟都是在觸發spark action的時候才被執行,也就是lazy加載。下面對整個流程的細節進行分析。
詳細分析
變量替換
spark-sql通過正則匹配,將sql中的系統變量,環境變量等配置替換成真正的value,目前支持替換spark的配置和hive的配置
例如:
session.conf.set("spark.sql.test.key","1")
session.sql("select * from test where 1 = ${sparkconf:spark.sql.test.key}")
抽象語法樹AST
先上一下wiki的解釋,AST是源代碼語法結構的一種抽象表示。它以樹狀的形式表現編程語言的語法結構,樹上的每個節點都表示源代碼中的一種結構。說的可能有點抽象,翻譯出來就是說把一個語言表達式的語法結構轉換成樹形結構,那這顆數就是抽象語法樹。
舉個例子,`1*2+3`這個表達式轉成AST,如圖2。
圖2
SQL作為一種獨立的語言,有自己的表達式,所以用AST作為對其語法進行分析是很靈活的。這里Spark選用的是anltr作為AST的構建框架,而不是hive用的calcite框架,antlr相比calcite更輕量,只涉及sql語法解析,這也便於spark自己在后續步驟做自己的sql執行定制化優化。
unresolve logical plan
spark通過visit antlr框架生成的AST,轉換成unresolve LogicPlan,LogicPlan其實是spark定義的AST
分析器
spark所有的規則優化都是基於模式匹配來完成的。分析器這個步驟的主要工作是,基於catalog,完成對logical plan的resolve化。
是否resolved來源兩個指標,1. 子節點是否resolved;2. 輸入的數據類型是否滿足要求,比如要求輸入int類型,實際輸入的string類型,那么就不滿足要求。參考類`Expression`,`Analyzer`。
logical plan
常見的優化規則,下面列舉部分:
移除group下的常量,對應` RemoveLiteralFromGroupExpressions`
移除重復的group表達式,對應` RemoveRepetitionFromGroupExpressions`
謂語下推,在進行其他操作之前,先進行Filter操作。當然這有很多條件限制,比如子查詢中沒有和父查詢相同的條件字段,如果有那么下推會造成沖突
裁剪Filter操作,如果操作總是為True,那么移除,如果操作總是為False,那么用空替換
spark plan
結合LogicPlan和Strategy,將AST轉換成實際執行的算子,參考`SparkPlanner`,內置了幾個strategies。生成SparkPlan后,繼續采用規則匹配的方式優化,其中就包括了著名的wholeStageCodegenExec機制,這個機制默認是開啟的,`spark.sql.codegen.wholeStage`。