SparkSQL的執行計划詳解


一:首先將我們從文本中讀取的數據映射到表也就是視圖

eg:

  $>cat b.txt

 

  1 ded
  2 dsfre
  3 sfs
  4 fr

  $>val sc = spark.sparkContext     #創建SparkContext

  $>val rdd = sc.textFile("file:///home/Alex_lei/b.txt").map(x=>x.split(" ")).map(x=>(x(0),x(1)))  

    #讀取文件到rdd中(tuple形式,因為createDataFrame方法所需要的rdd為tuple形式)

  $>val df = spark.createDataFrame(rdd)    #創建dataframe

  $>df.createTempView("person")        #將dataframe映射到表

 

二:分析

$>val query = spark.sql("select * from person where _1>1") 

(1)explain() 查看物理計划

  $>query.explain()

    == Physical Plan ==

      *Filter (isnotnull(_1#3) && (cast(_1#3 as double) > 1.0))
        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,           assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#4]
          +- Scan ExternalRDDScan[obj#2]

    說明:類似一棵樹,從下往上看,首先掃描外部RDD,然后是序列化字段,在就是過濾,判斷是否為null和第一個字段大於1的。

2)explain(true)查看整個SQL的執行計划,主要分為4個階段

  --1:解析過程

    == Parsed Logical Plan ==

    'Project [*]
      +- 'Filter ('_1 > 1)
        +- 'UnresolvedRelation `person`

    說明:Project[*]是我們所要的結果集,解析過程不能判斷表person是否存在,有什么關系,然后就是列出過濾條件和所要的結果集。

 

  --2:邏輯階段

    == Analyzed Logical Plan ==

    _1: string, _2: string
      Project [_1#3, _2#4]
        +- Filter (cast(_1#3 as double) > cast(1 as double))
          +- SubqueryAlias person
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#4]
              +- ExternalRDD [obj#2]

    說明:首先還是加載外部RDD,然后序列化字段,列出映射表的名字,確認表存在,然后按照條件過濾,獲取結果集。

 

  --3:優化階段

    == Optimized Logical Plan ==

     Filter (isnotnull(_1#3) && (cast(_1#3 as double) > 1.0))
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#4]
        +- ExternalRDD [obj#2]

    說明:和之前的一樣,優化的部分就是過濾條件,先判斷是否為null(hive和關系型數據庫都沒有),這個和RDD的不同之處是rdd是將數據全部加在進來,而sparksql如果遇到有null值的直接停止,這個是個簡單的優化方案,具體其他的優化措施還是根據所寫的sql語句。

 

  --4:物理執行計划

    == Physical Plan ==

    *Filter (isnotnull(_1#3) && (cast(_1#3 as double) > 1.0))
      +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#4]
        
+- Scan ExternalRDDScan[obj#2]

    說明:同上所說的物理執行計划。

 

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM