SparkSql運行原理詳細解析


傳統關系型數據庫中 ,最基本的sql查詢語句由projecttion (field a,field b,field c) , datasource (table A) 和 fieter (field a >10) 三部分組成。 分別對應了sql查詢過程中的result , datasource和operation ,也就是按照result ——> datasource ——> operation 的順序來描述,如下圖所示:

 

 

 

但是sql實際執行過程是按照operation——> datasource——>result 的順序來執行的這與sql語法正好相反,具體的執行過程如下:

  1 . 語法和詞法解析:對寫入的sql語句進行詞法和語法解析(parse),分辨出sql語句在哪些是關鍵詞(如select ,from 和where),哪些是表達式,哪些是projection ,哪些是datasource等,判斷SQL語法是否規范,並形成邏輯計划。

  2 .綁定:將SQL語句和數據庫的數據字典(列,表,視圖等)進行綁定(bind),如果相關的projection和datasource等都在的話,則表示這個SQL語句是可以執行的。

  3 .優化(optimize):一般的數據庫會提供幾個執行計划,這些計划一般都有運行統計數據,數據庫會在這些計划中選擇一個最優計划。

  4 .執行(execute):執行前面的步驟獲取最有執行計划,返回查詢的數據集。

  SparkSQL的運行架構:

  Spark SQL對SQL語句的處理和關系型數據庫采用了類似的方法,sparksql先會將SQL語句進行解析(parse)形成一個Tree,然后使用Rule對Tree進行綁定,優化等處理過程,通過模式匹配對不同類型的節點采用不同操作。而sparksql的查詢優化器是catalyst,它負責處理查詢語句的解析,綁定,優化和生成物理執行計划等過程,catalyst是sparksql最核心部分。

  Spark SQL由core,catalyst,hive和hive-thriftserver4個部分組成。

  •   core: 負責處理數據的輸入/輸出,從不同的數據源獲取數據(如RDD,Parquet文件和JSON文件等),然后將結果查詢結果輸出成Data Frame。
  •        catalyst: 負責處理查詢語句的整個處理過程,包括解析,綁定,優化,生成物理計划等。
  •        hive: 負責對hive數據的處理。
  •        hive-thriftserver:提供client和JDBC/ODBC等接口。

  運行原理原理分析: 

  1.使用SesstionCatalog保存元數據

  在解析sql語句前需要初始化sqlcontext,它定義sparksql上下文,在輸入sql語句前會加載SesstionCatalog,初始化sqlcontext時會把元數據保存在SesstionCatalog中,包括庫名,表名,字段,字段類型等。這些數據將在解析未綁定的邏輯計划上使用。

  2.使用Antlr生成未綁定的邏輯計划

  Spark2.0版本起使用Antlr進行詞法和語法解析,Antlr會構建一個按照關鍵字生成的語法樹,也就是生成的未綁定的邏輯計划。

  3.使用Analyzer綁定邏輯計划

  在這個階段Analyzer 使用Analysis Rules,結合SessionCatalog元數據,對未綁定的邏輯計划進行解析,生成已綁定的邏輯計划。

  4.使用Optimizer優化邏輯計划

  Opetimize(優化器)的實現和處理方式同Analyzer類似,在該類中定義一系列Rule,利用這些Rule對邏輯計划和Expression進行迭代處理,達到樹的節點的合並和優化。

  5.使用SparkPlanner生成可執行計划的物理計划

   SparkPlanner使用Planning Strategies對優化的邏輯計划進行轉化,生成可執行的物理計划。

  6.使用QueryExecution執行物理計划

 


主要是通過sqlContext.sql() 這個方法作為一個入口。

在這之前先得知道一句SQL傳到 sql()這個方法里面后要經歷好幾次轉換, 最終生成一個executedPlan去執行。

總的過程分下面幾步:
1.通過Sqlparse 轉成unresolvedLogicplan
2.通過Analyzer轉成 resolvedLogicplan
3.通過optimizer轉成 optimzedLogicplan
4.通過sparkplanner轉成physicalLogicplan
5.通過prepareForExecution 轉成executable logicplan
6.通過toRDD等方法執行executedplan去調用tree的doExecute

借用一個圖, 懶得自己畫了:


現在那么先從sqlContext.sql進來看到:

  def sql(sqlText: String): DataFrame = {
    DataFrame(this, parseSql(sqlText))
  }



構造了一個DF, 點進去看到里面new 了一個DF:

private[sql] object DataFrame {
  def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
    new DataFrame(sqlContext, logicalPlan)
  }
}



傳入的Logicalplan是在DF object里面執行parseSql(sqlText)而獲取的, 那么這里是怎么拿到這個logicalplan的呢, 點進去看一下其實他調的是sqlContext里面的parseSql

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)



這里就拿一個select語句來看一下最終的executable plan是怎么生成的:
parseSql 是 ddlParser.parse(sql, false), ddlParser其實就是:

protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))



所以調用的是DDLParser類里面的parse 方法, 看一下這個方法是怎么寫的:

  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
    try {
      parse(input)
    } catch {
      case ddlException: DDLException => throw ddlException
      case _ if !exceptionOnError => parseQuery(input)
      case x: Throwable => throw x
    }
  }



其實是調用了父類AbstractSparkSQLParser 的parse方法, 看一下這個方法是怎么寫的:

def parse(input: String): LogicalPlan = synchronized {
    // Initialize the Keywords.
    initLexical
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
  }



里面主要做了兩件事:
1.執行 start方法
2.通過lexical.Scanner(input) 來對input進行解析, 符合query模式就返回success

這個start方法是在DDLParser類里面定義的, 看一下:

  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable

  protected def start: Parser[LogicalPlan] = ddl



可以看到start其實就是去判斷是不是 這三種語句createTable  describeTable  refreshTable, 方法里面怎么寫的就自己進去看一下了, 反正我們用select語句的話, 明顯不是上面三種類型之一, 那么不是上面三種類型的話 #2步就不會返回success, 所以是會去執行 case failureOrError => sys.error(failureOrError.toString), 拋出一個異常, 那么在DDLParser里面接收到異常, 就會跑到catch里面 去執行:
case _ if !exceptionOnError => parseQuery(input)

看一下DDLParser里面parseQuery是怎么來的:

class DDLParser(parseQuery: String => LogicalPlan)
  extends AbstractSparkSQLParser with DataTypeParser with Logging {
...
}



可以看到是在創建DDLParser的時候傳入的, 那么在sqlContext里面DDLParser是怎么聲明的呢:

protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))



傳入的是一個sqlParser.parse(_)

所以回去調用sqlParser的parse方法, sqlParser是這樣創建的:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))



所以會去調用SparkSQLParser里面定義的parse方法, 但是看這個類里面沒有重寫parser的方法, 所以一樣還是調用父類AbstractSparkSQLParser的parse方法:

  def parse(input: String): LogicalPlan = synchronized {
    // Initialize the Keywords.
    initLexical
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
  }



那樣的話還是會去執行start, 這個start就是在SparkSQLParser里面定義的:

override protected lazy val start: Parser[LogicalPlan] =
    cache | uncache | set | show | desc | others



看到start里面會去判斷是不是cache   uncache   set   show   desc , 如果不是就會去調用others, select語句明顯不是上面的任意一種, 所以 直接去看others怎么定義的:

  private lazy val others: Parser[LogicalPlan] =
    wholeInput ^^ {
      case input => fallback(input)
    }



注意這個是lazy, 所以只有在job被提交后才會真正執行
會直接去調用fallback, fallback是在創建SparkSQLParser的時候傳入的:

class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {...}



那么就要到SQLContext里面去看這個fallback到底是什么了:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))



好了 select語句又被傳到了getSQLDialect().parse(_)去執行,  getSQLDialect:

  protected[sql] def dialectClassName = if (conf.dialect == "sql") {
    classOf[DefaultParserDialect].getCanonicalName
  } else {
    conf.dialect
  }

protected[sql] def getSQLDialect(): ParserDialect = {
    try {
      val clazz = Utils.classForName(dialectClassName)
      clazz.newInstance().asInstanceOf[ParserDialect]
    } catch {
      case NonFatal(e) =>
        // Since we didn't find the available SQL Dialect, it will fail even for SET command:
        // SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
        val dialect = conf.dialect
        // reset the sql dialect
        conf.unsetConf(SQLConf.DIALECT)
        // throw out the exception, and the default sql dialect will take effect for next query.
        throw new DialectException(
          s"""Instantiating dialect '$dialect' failed.
             |Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
    }
  }



實際是執行DefaultParserDialect的parser方法:

private[spark] class DefaultParserDialect extends ParserDialect {
  @transient
  protected val sqlParser = SqlParser

  override def parse(sqlText: String): LogicalPlan = {
    sqlParser.parse(sqlText)
  }
}



可以看到他實際是執行SqlParser的parse方法, 那么SqlParser里面parse是怎么寫的, 可以看到里面沒有重寫parse方法, 那么繼續調用start, start的定義如下:

  protected lazy val start: Parser[LogicalPlan] =
    start1 | insert | cte

  protected lazy val start1: Parser[LogicalPlan] =
    (select | ("(" ~> select <~ ")")) *
    ( UNION ~ ALL        ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) }
    | INTERSECT          ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) }
    | EXCEPT             ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)}
    | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
    )



可以看到里面的start1其實就是匹配到了select語句, 那么我們的select最后會通過start1的方法去生成一個unresolvedlogicalplan

那么有了unresolvedlogicalplan后我們去看DataFrame是怎么被構造出來的:

class DataFrame private[sql](
    @transient override val sqlContext: SQLContext,
    @DeveloperApi @transient override val queryExecution: QueryExecution)
  extends Queryable with Serializable {

  // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
  // you wrap it with `withNewExecutionId` if this actions doesn't call other action.

  /**
   * A constructor that automatically analyzes the logical plan.
   *
   * This reports error eagerly as the [[DataFrame]] is constructed, unless
   * [[SQLConf.dataFrameEagerAnalysis]] is turned off.
   */
  def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
    this(sqlContext, {
      val qe = sqlContext.executePlan(logicalPlan)
      if (sqlContext.conf.dataFrameEagerAnalysis) {
        qe.assertAnalyzed()  // This should force analysis and throw errors if there are any
      }
      qe
    })
  }

....
}



new出來DataFrame后回去調用def this(sqlContext: SQLContext, logicalPlan: LogicalPlan)這個構造函數, 這個構造函數其實是創建了返回了DataFrame本身, 但是傳入參數為sqlContext, 和一個queryexecution (qe), qe的傳入參數為unresolvedlogicplan

這個queryexecution就是這部分代碼:

val qe = sqlContext.executePlan(logicalPlan)
      if (sqlContext.conf.dataFrameEagerAnalysis) {
        qe.assertAnalyzed()  // This should force analysis and throw errors if there are any
      }
      qe



我們看一下queryExecution是怎么寫的:

class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {

  def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)

  lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)

  lazy val withCachedData: LogicalPlan = {
    assertAnalyzed()
    sqlContext.cacheManager.useCachedData(analyzed)
  }

  lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)

  lazy val sparkPlan: SparkPlan = {
    SQLContext.setActive(sqlContext)
    sqlContext.planner.plan(optimizedPlan).next()
  }

  // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)

  /** Internal version of the RDD. Avoids copies and has no schema */
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

  protected def stringOrError[A](f: => A): String =
    try f.toString catch { case e: Throwable => e.toString }

  def simpleString: String = {
    s"""== Physical Plan ==
       |${stringOrError(executedPlan)}
      """.stripMargin.trim
  }

  override def toString: String = {
    def output =
      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

    s"""== Parsed Logical Plan ==
       |${stringOrError(logical)}
       |== Analyzed Logical Plan ==
       |${stringOrError(output)}
       |${stringOrError(analyzed)}
       |== Optimized Logical Plan ==
       |${stringOrError(optimizedPlan)}
       |== Physical Plan ==
       |${stringOrError(executedPlan)}
    """.stripMargin.trim
  }
}



這個類是理解整個sql解析過程的關鍵, 傳入對象是一個unresolvedlogicplan, 首先他會去調用sqlContext的analyzer去生成一個resolvedlogicplan:

lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)


也是lazy的, 會在job提交后執行。 我們看一下analyzer是怎么定義的:

  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, functionRegistry, conf) {
      override val extendedResolutionRules =
        ExtractPythonUDFs ::
        PreInsertCastAndRename ::
        (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)

      override val extendedCheckRules = Seq(
        datasources.PreWriteCheck(catalog)
      )
    }



new了一個Analyzer, 然后重寫了extendedResolutionRules , analyzer里面主要定義了一個batches, 這個batches其實就是一些對傳入的tree(logicplan)進行解析的各種rule, 在analyzer里面的rule是這樣的:

  val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil

  lazy val batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution),
    Batch("Resolution", fixedPoint,
      ResolveRelations ::
      ResolveReferences ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveUpCast ::
      ResolveSortReferences ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      HiveTypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic,
      ComputeCurrentTime),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )



里面有一個ResolveRelations , 看一下這個rule就會明白為什么網上好多資料會說analyzer是吧unresolvedlogicplan和catalog綁定生成resolvedlogicplan:

object ResolveRelations extends Rule[LogicalPlan] {
    def getTable(u: UnresolvedRelation): LogicalPlan = {
      try {
        catalog.lookupRelation(u.tableIdentifier, u.alias)
      } catch {
        case _: NoSuchTableException =>
          u.failAnalysis(s"Table not found: ${u.tableName}")
      }
    }



那么這些batches是怎么被調用的呢, 得看anzlyer的execute方法了:

def execute(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)

            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }

            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          // Only log if this is a rule that is supposed to run more than once.
          if (iteration != 2) {
            logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logDebug(
          s"""
          |=== Result of Batch ${batch.name} ===
          |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
        """.stripMargin)
      } else {
        logTrace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }



這個比較復雜, 傳入的是unresolvedLogicplan, 然后會對上面定義的batches進行遍歷, 確保每個rule都做一遍:
batches.foreach { ...}

再每個batch里面執行foldLeft(curPlan)
所以第一次的時候 case里面的(plan, rule)其實就是(curPlan, 第一個rule)
通過val result = rule(plan) 對當前的plan 做rule, 返回的結果當成下一次的curplan執行直到所有的rule都做完, 得到最總的curPlan

然后會去判斷是否還要執行一遍, 以保證所有的node都執行到了這些rule, 有個iteration來記錄執行了多少次, 然后和strategy來做對比:

strategy主要有這幾種:
Once
FixedPoint(maxIterations: Int)
初始化maxIterations = 1

strategy實在batch創建的時候傳入的, 列子:

Batch("UDF", Once,
      HandleNullInputsForUDF)



當所有的rule都執行了strategy規定的次數后, 就返回一個新的sparkplan。

analyzer這邊執行完后, 傳入的unresolvedlogicplan就變成了resolvedlogicplan。

然后會看一下這個resolvedlogicplan是不是可以用cachedata, 如果其中有在cache里面的就直接替換掉:

lazy val withCachedData: LogicalPlan = {
    assertAnalyzed()
    sqlContext.cacheManager.useCachedData(analyzed)
  }



然后再通過optimizer把resolvedlogicplan變成一個optimzedLogicplan:
具體調用過程和analyzer一樣, 就不重復了:

  lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)



再通過sparkplaner轉成physicalplan:

  lazy val sparkPlan: SparkPlan = {
    SQLContext.setActive(sqlContext)
    sqlContext.planner.plan(optimizedPlan).next()
  }



然后通過通過prepareForExecution轉成executebleplan:

lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan)



到這里為止就生成了一個可以執行的物理計划, 這個物理計划會在toRdd的時候執行:

lazy val toRdd: RDD[InternalRow] = executedPlan.execute()



可以看到一路下來都是lazy的, 所以只有在job真正提交后才會交由spark 去做這些事,

execute方法里面其實就是調用了物理計划的toexecute方法:

protected def doExecute(): RDD[InternalRow]

  final def execute(): RDD[InternalRow] = {
    if (children.nonEmpty) {
      val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
      val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
      assert(!(hasSafeInputs && hasUnsafeInputs),
        "Child operators should output rows in the same format")
      assert(canProcessSafeRows || canProcessUnsafeRows,
        "Operator must be able to process at least one row format")
      assert(!hasSafeInputs || canProcessSafeRows,
        "Operator will receive safe rows as input but cannot process safe rows")
      assert(!hasUnsafeInputs || canProcessUnsafeRows,
        "Operator will receive unsafe rows as input but cannot process unsafe rows")
    }
    RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
      prepare()
      doExecute()
    }
  }



里面的doExecute方法只是一個聲明, 其實現實在所有的sparkplan中實現的, 比如說實在所有的LeafNode UnaryNode BinaryNode的實現類里面實現的, 隨便找一個列子:

case class Limit(limit: Int, child: SparkPlan)
  extends UnaryNode {
  // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
  // partition local limit -> exchange into one partition -> partition local limit again

  /** We must copy rows when sort based shuffle is on */
  private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

  override def output: Seq[Attribute] = child.output
  override def outputPartitioning: Partitioning = SinglePartition

  override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

  protected override def doExecute(): RDD[InternalRow] = {
    val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
      child.execute().mapPartitionsInternal { iter =>
        iter.take(limit).map(row => (false, row.copy()))
      }
    } else {
      child.execute().mapPartitionsInternal { iter =>
        val mutablePair = new MutablePair[Boolean, InternalRow]()
        iter.take(limit).map(row => mutablePair.update(false, row))
      }
    }
    val part = new HashPartitioner(1)
    val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part)
    shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf))
    shuffled.mapPartitionsInternal(_.take(limit).map(_._2))
  }
}


  這里的doExecute就返回了一個RDD

  原文地址:https://www.iteye.com/blog/humingminghz-2311549

  原文地址:https://www.cnblogs.com/db-record/p/11832285.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM