歡迎轉載,轉載請注明出處,徽滬一郎。
概要
在即將發布的spark 1.0中有一個新增的功能,即對sql的支持,也就是說可以用sql來對數據進行查詢,這對於DBA來說無疑是一大福音,因為以前的知識繼續生效,而無須去學什么scala或其它script.
一般來說任意一個sql子系統都需要有parser,optimizer,execution三大功能模塊,在spark中這些又都是如何實現的呢,這些實現又有哪些亮點和問題?帶着這些疑問,本文准備做一些比較深入的分析。
SQL模塊分析有幾大難點,分別為
- sql分析和執行的通用過程,這個與是否用spark無關,應該是非常general的問題
- spark sql中具體實現時的整體架構
- 源碼閱讀時碰到的scala特殊語法,也就是常說的語法糖問題
為什么需要SQL
SQL是一種標准,一種用來進行數據分析的標准,已經存在多年。
在大數據的背景下,隨着數據規模的日漸增大,原有的分析技巧是否就過時了呢?答案顯然是否定的,原來的分析技巧在既有的分析維度上依然保持有效,當然對於新的數據我們想挖掘出更多有意思有價值的內容,這個目標可以交給數據挖掘或者機器學習去完成。
那么原有的數據分析人員如何快速的轉換到Big Data的平台上來呢,去重新學一種腳本嗎,直接用scala或python去編寫RDD。顯然這樣的代價太高,學習成本大。數據分析人員希望底層存儲機制和分析引擎的變換不要對上層分析的應用有直接的影響,需求用一句話來表達就是,”直接使用sql語句來對數據進行分析“。
這也是為什么Hive興起的原因了。Hive的流行直接證明這種設計迎合了市場的需求。由於Hive是采用了Hadoop的MapReduce作為分析執行引擎,其處理速度上不是盡如人意。Spark以快著稱,很快有好事者寫出了Shark,Shark取得了非常不俗的成績,迎得了極好的口碑。
畢竟Shark是游離於Spark之外的一個項目,不受Spark節制,那么Spark開發團隊的目標是將對SQL支持作用Spark的核心功能里面。以上分析就是Spark中的sql功能的由來。
應用舉例
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext._
case class Person(name: String, age: Int)
val person = sc.textFile("examples/src/main/resources/people.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim.toInt))
person.registerAsTable("person")
val teenagers = sql("SELECT name, age FROM person WHERE age >= 13 and age <= 19")
teenagers.map(t => "name:" + t(0)).collect().foreach(println)
上述代碼的邏輯非常清晰,就是將存在於person.txt中年齡界於13到19歲的年輕人名字打印出來。
SQL通用執行過程
SQL的組成部分
SQL語句大家都很熟悉,那么有沒有仔細想過其有幾大部分組成呢?可能你會說,”這還用問,不就是“select * from tablex where f1=?”,有什么好想嗎?“
還是先來看看再說吧,說不定有些新的思維在里面呢?
上圖是對最簡單的sql語句的重新標注,SELECT表示是一種具體的操作,即查詢數據,”f1,f2,f3"表示返回的結果,tableX是數據源,condition部分是查詢條件。有沒有發覺SQL表達式中的順序與常見的RDD處理邏輯其在表達的順序上有差異。還是繼續用圖來表示不同吧。
SQL語句在分析執行過程中會經歷下圖所示的幾個步驟
- 語法解析
- 操作綁定
- 優化執行策略
- 交付執行
語法解析
語法解析之后,會形成一棵語法樹,如下圖所示。樹中的每個節點是執行的rule,整棵樹稱之為執行策略。
策略優化
形成上述的執行策略樹還只是第一步,因為這個執行策略可以進行優化,所謂的優化就是對樹中節點進行合並或是進行順序上的調整。
以大家熟悉的join操作為例,下圖給出一個join優化的示例。A JOIN B等同於B JOIN A,但是順序的調整可能給執行的性能帶來極大的影響,下圖就是調整前后的對比圖。
再舉一例,一般來說盡可能的先實施聚合操作(Aggregate)然后再join
小結
上述一大通分析,希望達到的目的就兩個。
- 語法解析之后生成一個執行策略樹
- 執行策略樹可以優化,優化的過程就是對樹中節點進行合並或者順序調整
有關SQL查詢分析優化的具體過程,強烈推薦參考query optimizer deep dive系列文章
SQL在spark中的實現
有了上述內容的鋪墊,想必你已經意識到Spark如果要很好的支持sql,勢必也要完成,解析,優化,執行的三大過程。
整個SQL部分的代碼,其大致分類如下圖所示
- SqlParser生成LogicPlan Tree
- Analyzer和Optimizer將各種rule作用於LogicalPlan Tree
- 最終優化生成的LogicalPlan生成Spark RDD
- 最后將生成的RDD交由Spark執行
階段1:生成LogicalPlan
在sql中引入了一種新的RDD,即SchemaRDD
且看SchemaRDD的構造函數
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
構造函數中總共兩入參一為SparkContext,另一個LogicalPlan。LogicalPlan又是如何生成的呢?
要回答這個問題,不得不回到整個問題的入口點sql函數,sql函數的定義如下
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
result.queryExecution.toRdd
result
}
parseSql(sqlText)負責生成LogicalPlan,parseSql就是SqlParser的一個實例。
SqlParser這一部分的代碼要理解起來關鍵是要搞清楚StandardTokenParsers的調用規則,里面有一大堆的符號,如果不理解是什么意思,估計很難理清頭緒。
由於apply函數可以不被顯示調用,所以parseSql(sqlText)一句其實會隱式的調用SqlParser中的apply函數
def apply(input: String): LogicalPlan = {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
}
}
最最最讓人蛋疼的一行代碼就是phrase(query)(new lexical.Scanner(input))這里了,翻譯過來就是如果輸入的input字符串符合Lexical中定義的規則,則繼續使用query處理。
看一下query的定義是什么
protected lazy val query: Parser[LogicalPlan] =
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
) | insert
到了這里終於看到有LogicalPlan了,也就是說將普通的string轉換成LogicalPlan在這里發生了。
query這段代碼同時說明,在目前的spark sql中僅支持select和insert兩種操作,至於delete, update暫不支持。
注:即便是到現在,估計你和當初一樣對於SqlParser的使用還是一頭霧水,不要緊,請參考ref[3]和[4]中的內容,至於那些稀奇古怪的符號到底是什么意思,請參考ref[5].
階段2:QueryExecution
第一階段,將string轉換成為logicalplan tree,第二階段將各種規則作用於LogicalPlan。
在第一階段中展示的代碼,哪一句會觸發優化規則呢?是sql函數中的"result.queryExecution.toRdd",此處的queryExecution就是QueryExecution。這里又涉及到scala的一個語法糖問題。QueryExecution是一個抽象類,但卻看到了下述的代碼
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
怎么可以創建抽象類的實例?我的世界坍塌了,呵呵。不要緊張,這在scala的世界是允許的,只不過scala是隱含的創建了一個QueryExecution的子類並初始化而已,java里的原則還是對的,人家背后有貓膩。
Ok,輪到階段2中最重要的角色QueryExecution閃亮登場了
protected abstract class QueryExecution {
def logical: LogicalPlan
lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String = stringOrError(executedPlan)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
def debugExec() = DebugQuery(executedPlan).execute().collect()
}
三大步
- lazy val analyzed = analyzer(logical)
- lazy val optimizedPlan = optimizer(analyzed)
- lazy val sparkPlan = planner(optimizedPlan).next()
無論analyzer還是optimizer,它們都是RuleExecutor的子類,
RuleExecutor的默認處理函數是apply,對所有的子類都是一樣的,RuleExecutor的apply函數定義如下,
def apply(plan: TreeType): TreeType = {
var curPlan = plan
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val result = rule(plan)
if (!result.fastEquals(plan)) {
logger.trace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")
continue = false
}
if (curPlan.fastEquals(lastPlan)) {
logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")
continue = false
}
lastPlan = curPlan
}
if (!batchStartPlan.fastEquals(curPlan)) {
logger.debug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logger.trace(s"Batch ${batch.name} has no effect.")
}
}
curPlan
}
對於RuleExecutor的子類來說,最主要的是定義自己的batches,來看analyzer中的batches是如何定義的
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once,
NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)
batch中定義了一系列的規則,這里再次出現語法糖問題。“如何理解::這個操作符”? ::表示cons的意思,即連接生成一個list.
Batch構造函數中需要指定一系列的Rule,像ResolveReferences就是Rule,有關Rule的代碼就不一一分析了。
階段3:LogicalPlan轉換成Physical Plan
在階段3最主要的代碼就兩行
- lazy val executePlan: SparkPlan = prepareForExecution(sparkPlan)
- lazy val toRdd: RDD[Row] = executedPlan.execute()
與LogicalPlan不同,SparkPlan最重要的區別就是有execute函數
針對Sparkplan的具體實現,又要分成UnaryNode, LeafNode和BinaryNode,簡要來說即單目運算符操作,葉子結點,雙目運算符操作。每個子類的具體實現可以自行參考源碼。
階段4: 觸發RDD執行
RDD被觸發真正執行的過程在看了前面幾篇文章之后想來難不住你來,所有的所有都在這一行代碼。
teenagers.map(p => "name:"+p(0)).foreach(println)
如果真的不明白,建議回頭再讀一下Spark Job的執行過程分析。
總結
行為至此,可以收筆了。應該說SQL部分的代碼涉及到的知識點還是比較多的,最重要的是理清兩點,即SQL語句的通用處理過程。另一個是Spark SQL子系統中具體實現機制。
Spark Sql子模塊的具體實現緊緊圍繞LogicalPlan Tree展開,一是用sqlparser來生成logicalplan,二是用RuleExecutor將各種Rule作用於LogicalPlan。最后生成普通的RDD將會給Spark core處理。