歡迎轉載,轉載請注明出處,徽滬一郎
概要
在新近發布的spark 1.0中新加了sql的模塊,更為引人注意的是對hive中的hiveql也提供了良好的支持,作為一個源碼分析控,了解一下spark是如何完成對hql的支持是一件非常有趣的事情。
Hive簡介
Hive的由來
以下部分摘自Hadoop definite guide中的Hive一章
“Hive由Facebook出品,其設計之初目的是讓精通SQL技能的分析師能夠對Facebook存放在HDFS上的大規模數據集進行分析和查詢。
Hive大大簡化了對大規模數據集的分析門檻(不再要求分析人員具有很強的編程能力),迅速流行起來,成為Hadoop生成圈上的Killer Application. 目前已經有很多組織把Hive作為一個通用的,可伸縮數據處理平台。”
數據模型(Data Model)
Hive所有的數據都存在HDFS中,在Hive中有以下幾種數據模型
- Tables(表) table和關系型數據庫中的表是相對應的,每個表都有一個對應的hdfs目錄,表中的數據經序列化后存儲在該目錄,Hive同時支持表中的數據存儲在其它類型的文件系統中,如NFS或本地文件系統
- 分區(Partitions) Hive中的分區起到的作用有點類似於RDBMS中的索引功能,每個Partition都有一個對應的目錄,這樣在查詢的時候,可以減少數據規模
- 桶(buckets) 即使將數據按分區之后,每個分區的規模有可能還是很大,這個時候,按照關鍵字的hash結果將數據分成多個buckets,每個bucket對應於一個文件
Query Language
HiveQL是Hive支持的類似於SQL的查詢語言。HiveQL大體可以分成下面兩種類型
- DDL(data definition language) 比如創建數據庫(create database),創建表(create table),數據庫和表的刪除
- DML(data manipulation language) 數據的添加,查詢
- UDF(user defined function) Hive還支持用戶自定義查詢函數
Hive architecture
hive的整體框架圖如下圖所示
由上圖可以看出,Hive的整體架構可以分成以下幾大部分
- 用戶接口 支持CLI, JDBC和Web UI
- Driver Driver負責將用戶指令翻譯轉換成為相應的MapReduce Job
- MetaStore 元數據存儲倉庫,像數據庫和表的定義這些內容就屬於元數據這個范疇,默認使用的是Derby存儲引擎
HiveQL執行過程
HiveQL的執行過程如下所述
- parser 將HiveQL解析為相應的語法樹
- Semantic Analyser 語義分析
- Logical Plan Generating 生成相應的LogicalPlan
- Query Plan Generating
- Optimizer
最終生成MapReduce的Job,交付給Hadoop的MapReduce計算框架具體運行。
Hive實例
最好的學習就是實戰,Hive這一小節還是以一個具體的例子來結束吧。
前提條件是已經安裝好hadoop,具體安裝可以參考源碼走讀11或走讀9
step 1: 創建warehouse
warehouse用來存儲raw data
$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse
step 2: 啟動hive cli
$ export HIVE_HOME=<hive-install-dir>
$ $HIVE_HOME/bin/hive
step 3: 創建表
創建表,首先將schema數據寫入到metastore,另一件事情就是在warehouse目錄下創建相應的子目錄,該子目錄以表的名稱命名
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
step 4: 導入數據
導入的數據會存儲在step 3中創建的表目錄下
LOAD DATA LOCAL INPATH '/u.data'
OVERWRITE INTO TABLE u_data;
step 5: 查詢
SELECT COUNT(*) FROM u_data;
hiveql on Spark
Q: 上一章節花了大量的篇幅介紹了hive由來,框架及hiveql執行過程。那這些東西跟我們標題中所稱的hive on spark有什么關系呢?
Ans: Hive的整體解決方案很不錯,但有一些地方還值得改進,其中之一就是“從查詢提交到結果返回需要相當長的時間,查詢耗時太長”。之所以查詢時間很長,一個主要的原因就是因為Hive原生是基於MapReduce的,哪有沒有辦法提高呢。您一定想到了,“不是生成MapReduce Job,而是生成Spark Job”, 充分利用Spark的快速執行能力來縮短HiveQl的響應時間。
下圖是Spark 1.0中所支持的lib庫,SQL是其唯一新添加的lib庫,可見SQL在Spark 1.0中的地位之重要。
HiveContext
HiveContext是Spark提供的用戶接口,HiveContext繼承自SqlContext。
讓我們回顧一下,SqlContext中牽涉到的類及其間的關系如下圖所示,具體分析過程參見本系列中的源碼走讀之11。
既然是繼承自SqlContext,那么我們將普通sql與hiveql分析執行步驟做一個對比,可以得到下圖。
有了上述的比較,就能抓住源碼分析時需要把握的幾個關鍵點
- Entrypoint HiveContext.scala
- QueryExecution HiveContext.scala
- parser HiveQl.scala
- optimizer
數據
使用到的數據有兩種
- Schema Data 像數據庫的定義和表的結構,這些都存儲在MetaStore中
- Raw data 即要分析的文件本身
Entrypoint
hiveql是整個的入口點,而hql是hiveql的縮寫形式。
def hiveql(hqlQuery: String): SchemaRDD = {
val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but does not perform any execution.
result.queryExecution.toRdd
result
}
上述hiveql的定義與sql的定義幾乎一模一樣,唯一的不同是sql中使用parseSql的結果作為SchemaRDD的入參而hiveql中使用HiveQl.parseSql作為SchemaRdd的入參
HiveQL, parser
parseSql的函數定義如代碼所示,解析過程中將指令分成兩大類
- nativecommand 非select語句,這類語句的特點是執行時間不會因為條件的不同而有很大的差異,基本上都能在較短的時間內完成
- 非nativecommand 主要是select語句
def parseSql(sql: String): LogicalPlan = {
try {
if (sql.toLowerCase.startsWith("set")) {
NativeCommand(sql)
} else if (sql.toLowerCase.startsWith("add jar")) {
AddJar(sql.drop(8))
} else if (sql.toLowerCase.startsWith("add file")) {
AddFile(sql.drop(9))
} else if (sql.startsWith("dfs")) {
DfsCommand(sql)
} else if (sql.startsWith("source")) {
SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
} else if (sql.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)
if (nativeCommands contains tree.getText) {
NativeCommand(sql)
} else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other
}
}
}
} catch {
case e: Exception => throw new ParseException(sql, e)
case e: NotImplementedError => sys.error(
s"""
|Unsupported language features in query: $sql
|${dumpTree(getAst(sql))}
""".stripMargin)
}
}
哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands變量,列表很長,代碼就不一一列出。
對於非nativeCommand,最重要的解析函數就是nodeToPlan
toRdd
Spark對HiveQL所做的優化主要體現在Query相關的操作,其它的依然使用Hive的原生執行引擎。
在logicalPlan到physicalPlan的轉換過程中,toRdd最關鍵的元素
override lazy val toRdd: RDD[Row] =
analyzed match {
case NativeCommand(cmd) =>
val output = runSqlHive(cmd)
if (output.size == 0) {
emptyResult
} else {
val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
sparkContext.parallelize(asRows, 1)
}
case _ =>
executedPlan.execute().map(_.copy())
}
native command的執行流程
由於native command是一些非耗時的操作,直接使用Hive中原有的exeucte engine來執行即可。這些command的執行示意圖如下
analyzer
HiveTypeCoercion
val typeCoercionRules =
List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
StringToIntegralCasts, FunctionArgumentConversion)
optimizer
PreInsertionCasts存在的目的就是確保在數據插入執行之前,相應的表已經存在。
override lazy val optimizedPlan =
optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
此處要注意的是catalog的用途,catalog是HiveMetastoreCatalog的實例。
HiveMetastoreCatalog是Spark中對Hive Metastore訪問的wrapper。HiveMetastoreCatalog通過調用相應的Hive Api可以獲得數據庫中的表及表的分區,也可以創建新的表和分區。
HiveMetastoreCatalog
HiveMetastoreCatalog中會通過hive client來訪問metastore中的元數據,使用了大量的Hive Api。其中包括了廣為人知的deSer library。
以CreateTable函數為例說明對Hive Library的依賴。
def createTable(
databaseName: String,
tableName: String,
schema: Seq[Attribute],
allowExisting: Boolean = false): Unit = {
val table = new Table(databaseName, tableName)
val hiveSchema =
schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
table.setFields(hiveSchema)
val sd = new StorageDescriptor()
table.getTTable.setSd(sd)
sd.setCols(hiveSchema)
// TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
sd.setCompressed(false)
sd.setParameters(Map[String, String]())
sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
val serDeInfo = new SerDeInfo()
serDeInfo.setName(tableName)
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
serDeInfo.setParameters(Map[String, String]())
sd.setSerdeInfo(serDeInfo)
try client.createTable(table) catch {
case e: org.apache.hadoop.hive.ql.metadata.HiveException
if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
allowExisting => // Do nothing.
}
}
實驗
結合源碼,我們再對一個簡單的例子作下說明。
可能你會想,既然spark也支持hql,那么我原先用hive cli創建的數據庫和表用spark能不能訪問到呢?答案或許會讓你很納悶,“在默認的配置下是不行的”。為什么?
Hive中的meta data采用的存儲引擎是Derby,該存儲引擎只能有一個訪問用戶。同一時刻只能有一個人訪問,即便以同一用戶登錄訪問也不行。針對這個局限,解決方法就是將metastore存儲在mysql或者其它可以多用戶訪問的數據庫中。
具體實例
- 創建表
- 導入數據
- 查詢
- 刪除表
在啟動spark-shell之前,需要先設置環境變量HIVE_HOME和HADOOP_HOME.
啟動spark-shell之后,執行如下代碼
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import hiveContext._
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
hql("FROM src SELECT key, value").collect().foreach(println)
hql("drop table src")
create操作會在/user/hive/warehouse/目錄下創建src目錄,可以用以下指令來驗證
$$HADOOP_HOME/bin/hdfs dfs -ls /user/hive/warehouse/
drop表的時候,不僅metastore中相應的記錄被刪除,而且原始數據raw file本身也會被刪除,即在warehouse目錄下對應某個表的目錄會被整體刪除掉。
上述的create, load及query操作對metastore和raw data的影響可以用下圖的表示
hive-site.xml
如果想對hive默認的配置作修改,可以使用hive-site.xml。
具體步驟如下
- 在$SPARK_HOME/conf目錄下創建hive-site.xml
- 根據需要,添寫相應的配置項的值,可以這樣做,將$HIVE_HOME/conf目錄下的hive-default.xml復制到$SPARK_HOME/conf,然后重命名為hive-site.xml
Sql新功能預告
為了進一步提升sql的執行速度,在Spark開發團隊在發布完1.0之后,會通過codegen的方法來提升執行速度。codegen有點類似於jvm中的jit技術。充分利用了scala語言的特性。
前景分析
Spark目前還缺乏一個非常有影響力的應用,也就通常所說的killer application。SQL是Spark在尋找killer application方面所做的一個積極嘗試,也是目前Spark上最有熱度的一個話題,但通過優化Hive執行速度來吸引潛在Spark用戶,該突破方向選擇正確與否還有待市場證明。
Hive除了在執行速度上為人詬病之外,還有一個最大的問題就是多用戶訪問的問題,相較第一個問題,第二個問題來得更為致命。無論是Facebook在Hive之后推出的Presto還是Cloudera推出的Impala都是針對第二問題提出的解決方案,目前都已經取得的了巨大優勢。
小結
本文就Spark對HiveQL提供支持的這一功能進行了比較詳細的分析,其中涉及到以下幾個問題。
- 什么是hive
- hive有什么缺點,否則就沒Spark或Shark啥事了
- Spark主要是針對hive的哪個不足做出改進
- Spark是如何對這個做改進的
參考資料
- programming hive
- Shark vs. Impala
- Hive Design