Apache Spark源碼走讀之13 -- hiveql on spark實現詳解


歡迎轉載,轉載請注明出處,徽滬一郎

概要

在新近發布的spark 1.0中新加了sql的模塊,更為引人注意的是對hive中的hiveql也提供了良好的支持,作為一個源碼分析控,了解一下spark是如何完成對hql的支持是一件非常有趣的事情。

Hive簡介

Hive的由來

以下部分摘自Hadoop definite guide中的Hive一章

Hive由Facebook出品,其設計之初目的是讓精通SQL技能的分析師能夠對Facebook存放在HDFS上的大規模數據集進行分析和查詢。

Hive大大簡化了對大規模數據集的分析門檻(不再要求分析人員具有很強的編程能力),迅速流行起來,成為Hadoop生成圈上的Killer Application. 目前已經有很多組織把Hive作為一個通用的,可伸縮數據處理平台。

數據模型(Data Model)

Hive所有的數據都存在HDFS中,在Hive中有以下幾種數據模型

  • Tables(表) table和關系型數據庫中的表是相對應的,每個表都有一個對應的hdfs目錄,表中的數據經序列化后存儲在該目錄,Hive同時支持表中的數據存儲在其它類型的文件系統中,如NFS或本地文件系統
  • 分區(Partitions) Hive中的分區起到的作用有點類似於RDBMS中的索引功能,每個Partition都有一個對應的目錄,這樣在查詢的時候,可以減少數據規模
  • 桶(buckets) 即使將數據按分區之后,每個分區的規模有可能還是很大,這個時候,按照關鍵字的hash結果將數據分成多個buckets,每個bucket對應於一個文件

Query Language

 HiveQL是Hive支持的類似於SQL的查詢語言。HiveQL大體可以分成下面兩種類型

  1. DDL(data definition language)  比如創建數據庫(create database),創建表(create table),數據庫和表的刪除
  2. DML(data manipulation language) 數據的添加,查詢
  3. UDF(user defined function) Hive還支持用戶自定義查詢函數

Hive architecture

hive的整體框架圖如下圖所示

 

由上圖可以看出,Hive的整體架構可以分成以下幾大部分

  1. 用戶接口  支持CLI, JDBC和Web UI
  2. Driver Driver負責將用戶指令翻譯轉換成為相應的MapReduce Job
  3. MetaStore 元數據存儲倉庫,像數據庫和表的定義這些內容就屬於元數據這個范疇,默認使用的是Derby存儲引擎

HiveQL執行過程

HiveQL的執行過程如下所述

  1. parser 將HiveQL解析為相應的語法樹
  2. Semantic Analyser 語義分析
  3. Logical Plan Generating 生成相應的LogicalPlan
  4. Query Plan Generating
  5. Optimizer

最終生成MapReduce的Job,交付給Hadoop的MapReduce計算框架具體運行。

Hive實例

最好的學習就是實戰,Hive這一小節還是以一個具體的例子來結束吧。

前提條件是已經安裝好hadoop,具體安裝可以參考源碼走讀11或走讀9

step 1: 創建warehouse

warehouse用來存儲raw data

$ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse

step 2: 啟動hive cli

$ export HIVE_HOME=<hive-install-dir>
$ $HIVE_HOME/bin/hive

step 3: 創建表

創建表,首先將schema數據寫入到metastore,另一件事情就是在warehouse目錄下創建相應的子目錄,該子目錄以表的名稱命名

CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

step 4: 導入數據

導入的數據會存儲在step 3中創建的表目錄下

LOAD DATA LOCAL INPATH '/u.data'
OVERWRITE INTO TABLE u_data;

step 5: 查詢

SELECT COUNT(*) FROM u_data;

 hiveql on Spark

Q: 上一章節花了大量的篇幅介紹了hive由來,框架及hiveql執行過程。那這些東西跟我們標題中所稱的hive on spark有什么關系呢?

Ans:  Hive的整體解決方案很不錯,但有一些地方還值得改進,其中之一就是“從查詢提交到結果返回需要相當長的時間,查詢耗時太長之所以查詢時間很長,一個主要的原因就是因為Hive原生是基於MapReduce的,哪有沒有辦法提高呢。您一定想到了,“不是生成MapReduce Job,而是生成Spark Job”, 充分利用Spark的快速執行能力來縮短HiveQl的響應時間。

下圖是Spark 1.0中所支持的lib庫,SQL是其唯一新添加的lib庫,可見SQL在Spark 1.0中的地位之重要。

 

HiveContext

HiveContext是Spark提供的用戶接口,HiveContext繼承自SqlContext。

讓我們回顧一下,SqlContext中牽涉到的類及其間的關系如下圖所示,具體分析過程參見本系列中的源碼走讀之11

既然是繼承自SqlContext,那么我們將普通sql與hiveql分析執行步驟做一個對比,可以得到下圖。

 

有了上述的比較,就能抓住源碼分析時需要把握的幾個關鍵點

  1. Entrypoint           HiveContext.scala
  2. QueryExecution    HiveContext.scala
    1. parser       HiveQl.scala
    2. optimizer    

數據

使用到的數據有兩種

  1. Schema Data  像數據庫的定義和表的結構,這些都存儲在MetaStore中
  2. Raw data        即要分析的文件本身

Entrypoint

hiveql是整個的入口點,而hql是hiveql的縮寫形式。

  def hiveql(hqlQuery: String): SchemaRDD = {
    val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but does not perform any execution.
    result.queryExecution.toRdd
    result
  }

上述hiveql的定義與sql的定義幾乎一模一樣,唯一的不同是sql中使用parseSql的結果作為SchemaRDD的入參而hiveql中使用HiveQl.parseSql作為SchemaRdd的入參

HiveQL, parser

parseSql的函數定義如代碼所示,解析過程中將指令分成兩大類

  • nativecommand     非select語句,這類語句的特點是執行時間不會因為條件的不同而有很大的差異,基本上都能在較短的時間內完成
  • 非nativecommand  主要是select語句
def parseSql(sql: String): LogicalPlan = {
    try {
      if (sql.toLowerCase.startsWith("set")) {
        NativeCommand(sql)
      } else if (sql.toLowerCase.startsWith("add jar")) {
        AddJar(sql.drop(8))
      } else if (sql.toLowerCase.startsWith("add file")) {
        AddFile(sql.drop(9))
      } else if (sql.startsWith("dfs")) {
        DfsCommand(sql)
      } else if (sql.startsWith("source")) {
        SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
      } else if (sql.startsWith("!")) {
        ShellCommand(sql.drop(1))
      } else {
        val tree = getAst(sql)

        if (nativeCommands contains tree.getText) {
          NativeCommand(sql)
        } else {
          nodeToPlan(tree) match {
            case NativePlaceholder => NativeCommand(sql)
            case other => other
          }
        }
      }
    } catch {
      case e: Exception => throw new ParseException(sql, e)
      case e: NotImplementedError => sys.error(
        s"""
          |Unsupported language features in query: $sql
          |${dumpTree(getAst(sql))}
        """.stripMargin)
    }
  }	

哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands變量,列表很長,代碼就不一一列出。

對於非nativeCommand,最重要的解析函數就是nodeToPlan

toRdd

Spark對HiveQL所做的優化主要體現在Query相關的操作,其它的依然使用Hive的原生執行引擎。

在logicalPlan到physicalPlan的轉換過程中,toRdd最關鍵的元素

override lazy val toRdd: RDD[Row] =
      analyzed match {
        case NativeCommand(cmd) =>
          val output = runSqlHive(cmd)

          if (output.size == 0) {
            emptyResult
          } else {
            val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
            sparkContext.parallelize(asRows, 1)
          }
        case _ =>
          executedPlan.execute().map(_.copy())
      }

native command的執行流程

由於native command是一些非耗時的操作,直接使用Hive中原有的exeucte engine來執行即可。這些command的執行示意圖如下

analyzer

HiveTypeCoercion

val typeCoercionRules =
    List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
      StringToIntegralCasts, FunctionArgumentConversion)		

optimizer

PreInsertionCasts存在的目的就是確保在數據插入執行之前,相應的表已經存在。

override lazy val optimizedPlan =
      optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))

此處要注意的是catalog的用途,catalog是HiveMetastoreCatalog的實例。

HiveMetastoreCatalog是Spark中對Hive Metastore訪問的wrapper。HiveMetastoreCatalog通過調用相應的Hive Api可以獲得數據庫中的表及表的分區,也可以創建新的表和分區。

HiveMetastoreCatalog

HiveMetastoreCatalog中會通過hive client來訪問metastore中的元數據,使用了大量的Hive Api。其中包括了廣為人知的deSer library。

以CreateTable函數為例說明對Hive Library的依賴。

def createTable(
      databaseName: String,
      tableName: String,
      schema: Seq[Attribute],
      allowExisting: Boolean = false): Unit = {
    val table = new Table(databaseName, tableName)
    val hiveSchema =
      schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
    table.setFields(hiveSchema)

    val sd = new StorageDescriptor()
    table.getTTable.setSd(sd)
    sd.setCols(hiveSchema)

    // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
    sd.setCompressed(false)
    sd.setParameters(Map[String, String]())
    sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
    sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
    val serDeInfo = new SerDeInfo()
    serDeInfo.setName(tableName)
    serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
    serDeInfo.setParameters(Map[String, String]())
    sd.setSerdeInfo(serDeInfo)

    try client.createTable(table) catch {
      case e: org.apache.hadoop.hive.ql.metadata.HiveException
        if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
           allowExisting => // Do nothing.
    }
  }

實驗

結合源碼,我們再對一個簡單的例子作下說明。

可能你會想,既然spark也支持hql,那么我原先用hive cli創建的數據庫和表用spark能不能訪問到呢?答案或許會讓你很納悶,“在默認的配置下是不行的”。為什么?

Hive中的meta data采用的存儲引擎是Derby,該存儲引擎只能有一個訪問用戶。同一時刻只能有一個人訪問,即便以同一用戶登錄訪問也不行。針對這個局限,解決方法就是將metastore存儲在mysql或者其它可以多用戶訪問的數據庫中。

具體實例

  1. 創建表
  2. 導入數據
  3. 查詢
  4. 刪除表

在啟動spark-shell之前,需要先設置環境變量HIVE_HOMEHADOOP_HOME.

啟動spark-shell之后,執行如下代碼

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
hql("FROM src SELECT key, value").collect().foreach(println)
hql("drop table src")

create操作會在/user/hive/warehouse/目錄下創建src目錄,可以用以下指令來驗證

$$HADOOP_HOME/bin/hdfs dfs -ls /user/hive/warehouse/

 drop表的時候,不僅metastore中相應的記錄被刪除,而且原始數據raw file本身也會被刪除,即在warehouse目錄下對應某個表的目錄會被整體刪除掉。

上述的create, load及query操作對metastore和raw data的影響可以用下圖的表示

hive-site.xml

如果想對hive默認的配置作修改,可以使用hive-site.xml。

具體步驟如下

 -  在$SPARK_HOME/conf目錄下創建hive-site.xml

 -  根據需要,添寫相應的配置項的值,可以這樣做,將$HIVE_HOME/conf目錄下的hive-default.xml復制到$SPARK_HOME/conf,然后重命名為hive-site.xml

Sql新功能預告

為了進一步提升sql的執行速度,在Spark開發團隊在發布完1.0之后,會通過codegen的方法來提升執行速度。codegen有點類似於jvm中的jit技術。充分利用了scala語言的特性。

前景分析

Spark目前還缺乏一個非常有影響力的應用,也就通常所說的killer application。SQL是Spark在尋找killer application方面所做的一個積極嘗試,也是目前Spark上最有熱度的一個話題,但通過優化Hive執行速度來吸引潛在Spark用戶,該突破方向選擇正確與否還有待市場證明。

Hive除了在執行速度上為人詬病之外,還有一個最大的問題就是多用戶訪問的問題,相較第一個問題,第二個問題來得更為致命。無論是Facebook在Hive之后推出的Presto還是Cloudera推出的Impala都是針對第二問題提出的解決方案,目前都已經取得的了巨大優勢。

小結

本文就Spark對HiveQL提供支持的這一功能進行了比較詳細的分析,其中涉及到以下幾個問題。

  1. 什么是hive
  2. hive有什么缺點,否則就沒Spark或Shark啥事了
  3. Spark主要是針對hive的哪個不足做出改進
  4. Spark是如何對這個做改進的

參考資料

  1. programming hive
  2. Shark vs. Impala
  3. Hive Design


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM