本章就着重介紹一個工具hive/console,來加深讀者對sparkSQL的執行計划的理解。
1:hive/console安裝
sparkSQL從1.0.0開始提供了一個sparkSQL的調試工具hive/console。
該工具是給開發人員使用,在編譯生成的安裝部署包中並沒有;該工具須要使用sbt編譯執行。要使用該工具,須要具備下面條件:
- spark1.1.0源代碼
- hive0.12源代碼並編譯
- 配置環境變量
1.1:安裝hive/cosole
以下是筆者安裝過程:
A:下載spark1.1.0源代碼,安裝在/app/hadoop/spark110_sql文件夾
B:下載hive0.12源代碼,安裝在/app/hadoop/hive012文件夾,進入src文件夾后,使用以下命令進行編譯:
ant clean package -Dhadoop.version=2.2.0 -Dhadoop-0.23.version=2.2.0 -Dhadoop.mr.rev=23
C:配置環境變量文件~/.bashrc后,source ~/.bashrc使環境變量生效。
export HIVE_HOME=/app/hadoop/hive012/src/build/dist export HIVE_DEV_HOME=/app/hadoop/hive012/src export HADOOP_HOME=/app/hadoop/hadoop220D:啟動
切換到spark安裝文件夾/app/hadoop/spark110_sql,執行命令:
sbt/sbt hive/console
經過一段漫長的sbt編譯過程。最后出現例如以下界面:


1.2:hive/console原理
hive/console的調試原理非常easy。就是在scala控制台裝載了catalyst中幾個關鍵的class,當中的TestHive提前定義了表結構並裝載命令。這些數據是hive0.12源代碼中帶有的測試數據,裝載這些數據是按需運行的。這些數據位於/app/hadoop/hive012/src/data中。也就是$HIVE_DEV_HOME/data中。
/*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */ // The test tables that are defined in the Hive QTestUtil. // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java val hiveQTestUtilTables = Seq( TestTable("src", "CREATE TABLE src (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), TestTable("src1", "CREATE TABLE src1 (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), TestTable("srcpart", () => { runSqlHive( "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { runSqlHive( s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr') """.stripMargin) } }), ...... )由於要使用hive0.12的測試數據。所以須要定義兩個環境變量:HIVE_HOME和HIVE_DEV_HOME。假設使用hive0.13的話。用戶須要更改到對應文件夾:
/*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */ /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") /** The location of the hive source code. */ lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME")
另外,假設用戶想在hive/console啟動的時候。預載很多其它的class。能夠改動spark源代碼下的 project/SparkBuild.scala文件
/* 源自 project/SparkBuild.scala */ object Hive { lazy val settings = Seq( javaOptions += "-XX:MaxPermSize=1g", // Multiple queries rely on the TestHive singleton. See comments there for more details. parallelExecution in Test := false, // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings // only for this subproject. scalacOptions <<= scalacOptions map { currentOpts: Seq[String] => currentOpts.filterNot(_ == "-deprecation") }, initialCommands in console := """ |import org.apache.spark.sql.catalyst.analysis._ |import org.apache.spark.sql.catalyst.dsl._ |import org.apache.spark.sql.catalyst.errors._ |import org.apache.spark.sql.catalyst.expressions._ |import org.apache.spark.sql.catalyst.plans.logical._ |import org.apache.spark.sql.catalyst.rules._ |import org.apache.spark.sql.catalyst.types._ |import org.apache.spark.sql.catalyst.util._ |import org.apache.spark.sql.execution |import org.apache.spark.sql.hive._ |import org.apache.spark.sql.hive.test.TestHive._ |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin ) }
以下介紹一下hive/console的經常使用操作,主要是和執行計划相關的經常使用操作。在操作前,首先定義一個表people和查詢query:
//在控制台逐行執行 case class Person(name:String, age:Int, state:String) sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people") val query= sql("select * from people")
2.1 查看查詢的schema
query.printSchema

2.2 查看查詢的整個執行計划
query.queryExecution

2.3 查看查詢的Unresolved LogicalPlan
query.queryExecution.logical

2.4 查看查詢的analyzed LogicalPlan
query.queryExecution.analyzed

2.5 查看優化后的LogicalPlan
query.queryExecution.optimizedPlan

2.6 查看物理計划
query.queryExecution.sparkPlan

2.7 查看RDD的轉換過程
query.toDebugString

2.8 很多其它的操作
很多其它的操作能夠通過Tab鍵陳列出來。也能夠參開sparkSQL的API,也能夠參看源碼中的方法和函數。
3:不同數據源的執行計划
上面經常使用操作里介紹了源自RDD的數據。我們都知道。sparkSQL能夠源自多個數據源:jsonFile、parquetFile、hive。
以下看看這些數據源的schema:
3.1 json文件
json文件支持嵌套表,sparkSQL也能夠讀入嵌套表,如以下形式的json數據,經修整(去空格和換行符)保存后,能夠使用jsonFile讀入sparkSQL。
{ "fullname": "Sean Kelly", "org": "SK Consulting", "emailaddrs": [ {"type": "work", "value": "kelly@seankelly.biz"}, {"type": "home", "pref": 1, "value": "kelly@seankelly.tv"} ], "telephones": [ {"type": "work", "pref": 1, "value": "+1 214 555 1212"}, {"type": "fax", "value": "+1 214 555 1213"}, {"type": "mobile", "value": "+1 214 555 1214"} ], "addresses": [ {"type": "work", "format": "us", "value": "1234 Main StnSpringfield, TX 78080-1216"}, {"type": "home", "format": "us", "value": "5678 Main StnSpringfield, TX 78080-1316"} ], "urls": [ {"type": "work", "value": "http://seankelly.biz/"}, {"type": "home", "value": "http://seankelly.tv/"} ] }去空格和換行符后保存為/home/mmicky/data/nestjson.json,使用jsonFile讀入並注冊成表jsonPerson,然后定義一個查詢jsonQuery:
jsonFile("/home/mmicky/data/nestjson.json").registerTempTable("jsonPerson") val jsonQuery = sql("select * from jsonPerson")
查看jsonQuery的schema:
jsonQuery.printSchema

查看jsonQuery的整個執行計划:
jsonQuery.queryExecution

3.2 parquet文件
parquet文件讀入並注冊成表parquetWiki,然后定義一個查詢parquetQuery:
parquetFile("/home/mmicky/data/spark/wiki_parquet").registerTempTable("parquetWiki") val parquetQuery = sql("select * from parquetWiki")
查詢parquetQuery的schema:
parquetQuery.printSchema

查詢parquetQuery的整個執行計划:
parquetQuery.queryExecution

3.3 hive數據
之前說了,TestHive類中已經定義了大量的hive0.12的測試數據的表格式,如src、sales等等,在hive/console里能夠直接使用;第一次使用的時候,hive/console會裝載一次。
以下我們使用sales表看看其schema和整個執行計划。首先定義一個查詢hiveQuery:
val hiveQuery = sql("select * from sales")
查看hiveQuery的schema:
hiveQuery.printSchema

查看hiveQuery的整個執行計划:
hiveQuery.queryExecution

從上面能夠看出,來自jsonFile、parquetFile、hive數據的物理計划還有有非常大差別的。
4:不同查詢的執行計划
為了加深理解,我們列幾個經常使用查詢的執行計划和RDD轉換過程。
4.1 聚合查詢
sql("select state,avg(age) from people group by state").queryExecution

sql("select state,avg(age) from people group by state").toDebugString

4.2 join操作
sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution

sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString

4.3 Distinct操作
sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution

sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString

5:查詢的優化
上面的查詢比較簡單。看不出優化的過程,以下看幾個樣例,能夠理解sparkSQL的優化過程。
5.1 CombineFilters
CombineFilters就是合並Filter,在含有多個Filter時發生。例如以下查詢:
sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution

上面的查詢,在Optimized的過程中。將age>=19和age<30這兩個Filter合並了,合並成((age>=19) && (age<30))。事實上上面還做了一個其它的優化,就是project的下推,子查詢使用了表的全部列,而主查詢使用了列name。在查詢數據的時候子查詢優化成僅僅查列name。
5.2 PushPredicateThroughProject
PushPredicateThroughProject就是project下推。和上面樣例中的project一樣。
sql("select name from (select name,state as location from people) a where location='CA'").queryExecution

5.3 ConstantFolding
ConstantFolding是常量疊加,用於表達式。如以下的樣例:
sql("select name,1+2 from people").queryExecution

在Optimized的過程中,將常量表達式直接累加在一起。用新的列名來表示。
5.4 自己定義優化
在sparkSQL中的Optimizer中定義了3類12中優化方法,這里不再一一陳列。對於用於自己定義的優化,在hive/console也能夠非常方便的調試。僅僅要先定義一個LogicalPlan,然后使用自己定義的優化函數進行測試就能夠了。以下就舉個和CombineFilters一樣的樣例,首先定義一個函數:
object CombineFilters extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(c1, Filter(c2, grandChild)) => Filter(And(c1,c2),grandChild) } }
然后定義一個query,並使用query.queryExecution.analyzed查看優化前的LogicPlan:
val query= sql("select * from people").where('age >=19).where('age <30) query.queryExecution.analyzed

最后。使用自己定義優化函數進行優化:
CombineFilters(query.queryExecution.analyzed)

能夠看到兩個Filter合並在一起了。
甚至,在hive/console里直接使用transform對LogicPlan應用定義好的rule,以下定義了一個query,並使用query.queryExecution.analyzed查看應用rule前的LogicPlan:
val hiveQuery = sql("SELECT * FROM (SELECT * FROM src) a") hiveQuery.queryExecution.analyzed

然后,直接用transform將自己定義的rule:
hiveQuery.queryExecution.analyzed transform { case Project(projectList, child) if projectList == child.output => child }

該transform在LogicPlan的主查詢和子查詢的project同樣時合並project。
經過上面的樣例。加上自己的理解。相信大部分的讀者對sparkSQL中的執行計划應該有了比較明白的了解。