Hive計算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎


前言

Hive從2008年始於FaceBook工程師之手,經過10幾年的發展至今保持強大的生命力。截止目前Hive已經更新至3.1.x版本,Hive從最開始的為人詬病的速度慢迅速發展,開始支持更多的計算引擎,計算速度大大提升。

本文我們將從原理、應用、調優分別講解Hive所支持的MapReduce、Tez、Spark引擎。

MapReduce引擎

我們在之前的文章中:

對Hive的MapReduce引擎已經做過非常詳細的講解了。

在Hive2.x版本中,HiveSQL會被轉化為MR任務,這也是我們經常說的HiveSQL的執行原理。

我們先來看下 Hive 的底層執行架構圖, Hive 的主要組件與 Hadoop 交互的過程:

Hive底層執行架構

在 Hive 這一側,總共有五個組件:

  • UI:用戶界面。可看作我們提交SQL語句的命令行界面。
  • DRIVER:驅動程序。接收查詢的組件。該組件實現了會話句柄的概念。
  • COMPILER:編譯器。負責將 SQL 轉化為平台可執行的執行計划。對不同的查詢塊和查詢表達式進行語義分析,並最終借助表和從 metastore 查找的分區元數據來生成執行計划。
  • METASTORE:元數據庫。存儲 Hive 中各種表和分區的所有結構信息。
  • EXECUTION ENGINE:執行引擎。負責提交 COMPILER 階段編譯好的執行計划到不同的平台上。

上圖的基本流程是:

  • 步驟1:UI 調用 DRIVER 的接口;
  • 步驟2:DRIVER 為查詢創建會話句柄,並將查詢發送到 COMPILER(編譯器)生成執行計划;
  • 步驟3和4:編譯器從元數據存儲中獲取本次查詢所需要的元數據,該元數據用於對查詢樹中的表達式進行類型檢查,以及基於查詢謂詞修建分區;
  • 步驟5:編譯器生成的計划是分階段的DAG,每個階段要么是 map/reduce 作業,要么是一個元數據或者HDFS上的操作。將生成的計划發給 DRIVER。

如果是 map/reduce 作業,該計划包括 map operator trees 和一個 reduce operator tree,執行引擎將會把這些作業發送給 MapReduce :

  • 步驟6、6.1、6.2和6.3:執行引擎將這些階段提交給適當的組件。在每個 task(mapper/reducer) 中,從HDFS文件中讀取與表或中間輸出相關聯的數據,並通過相關算子樹傳遞這些數據。最終這些數據通過序列化器寫入到一個臨時HDFS文件中(如果不需要 reduce 階段,則在 map 中操作)。臨時文件用於向計划中后面的 map/reduce 階段提供數據。
  • 步驟7、8和9:最終的臨時文件將移動到表的位置,確保不讀取臟數據(文件重命名在HDFS中是原子操作)。對於用戶的查詢,臨時文件的內容由執行引擎直接從HDFS讀取,然后通過Driver發送到UI。

Hive SQL 編譯成 MapReduce 過程

美團博客中有一篇非常詳細的博客講解《Hive SQL的編譯過程》。

你可以參考:
https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html

編譯 SQL 的任務是在上節中介紹的 COMPILER(編譯器組件)中完成的。Hive將SQL轉化為MapReduce任務,整個編譯過程分為六個階段:

Hive SQL編譯過程

  1. 詞法、語法解析: Antlr 定義 SQL 的語法規則,完成 SQL 詞法,語法解析,將 SQL 轉化為抽象語法樹 AST Tree;

Antlr是一種語言識別的工具,可以用來構造領域語言。使用Antlr構造特定的語言只需要編寫一個語法文件,定義詞法和語法替換規則即可,Antlr完成了詞法分析、語法分析、語義分析、中間代碼生成的過程。

  1. 語義解析: 遍歷 AST Tree,抽象出查詢的基本組成單元 QueryBlock;
  2. 生成邏輯執行計划: 遍歷 QueryBlock,翻譯為執行操作樹 OperatorTree;
  3. 優化邏輯執行計划: 邏輯層優化器進行 OperatorTree 變換,合並 Operator,達到減少 MapReduce Job,減少數據傳輸及 shuffle 數據量;
  4. 生成物理執行計划: 遍歷 OperatorTree,翻譯為 MapReduce 任務;
  5. 優化物理執行計划: 物理層優化器進行 MapReduce 任務的變換,生成最終的執行計划。

下面對這六個階段詳細解析:

為便於理解,我們拿一個簡單的查詢語句進行展示,對5月23號的地區維表進行查詢:

select * from dim.dim_region where dt = '2021-05-23';

階段一:詞法、語法解析

根據Antlr定義的sql語法規則,將相關sql進行詞法、語法解析,轉化為抽象語法樹AST Tree:

ABSTRACT SYNTAX TREE:
TOK_QUERY
    TOK_FROM 
    TOK_TABREF
           TOK_TABNAME
               dim
                 dim_region
    TOK_INSERT
      TOK_DESTINATION
          TOK_DIR
              TOK_TMP_FILE
        TOK_SELECT
          TOK_SELEXPR
              TOK_ALLCOLREF
        TOK_WHERE
          =
              TOK_TABLE_OR_COL
                  dt
                    '2021-05-23'

階段二:語義解析

遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock:

AST Tree生成后由於其復雜度依舊較高,不便於翻譯為mapreduce程序,需要進行進一步抽象和結構化,形成QueryBlock。

QueryBlock是一條SQL最基本的組成單元,包括三個部分:輸入源,計算過程,輸出。簡單來講一個QueryBlock就是一個子查詢。

QueryBlock的生成過程為一個遞歸過程,先序遍歷 AST Tree ,遇到不同的 Token 節點(理解為特殊標記),保存到相應的屬性中。

階段三:生成邏輯執行計划

遍歷QueryBlock,翻譯為執行操作樹OperatorTree:

Hive最終生成的MapReduce任務,Map階段和Reduce階段均由OperatorTree組成。
基本的操作符包括:

TableScanOperator

SelectOperator

FilterOperator

JoinOperator

GroupByOperator

ReduceSinkOperator

Operator在Map Reduce階段之間的數據傳遞都是一個流式的過程。每一個Operator對一行數據完成操作后之后將數據傳遞給childOperator計算。

由於Join/GroupBy/OrderBy均需要在Reduce階段完成,所以在生成相應操作的Operator之前都會先生成一個ReduceSinkOperator,將字段組合並序列化為Reduce Key/value, Partition Key。

階段四:優化邏輯執行計划

Hive中的邏輯查詢優化可以大致分為以下幾類:

  • 投影修剪
  • 推導傳遞謂詞
  • 謂詞下推
  • 將Select-Select,Filter-Filter合並為單個操作
  • 多路 Join
  • 查詢重寫以適應某些列值的Join傾斜

階段五:生成物理執行計划

生成物理執行計划即是將邏輯執行計划生成的OperatorTree轉化為MapReduce Job的過程,主要分為下面幾個階段:

  1. 對輸出表生成MoveTask
  2. 從OperatorTree的其中一個根節點向下深度優先遍歷
  3. ReduceSinkOperator標示Map/Reduce的界限,多個Job間的界限
  4. 遍歷其他根節點,遇過碰到JoinOperator合並MapReduceTask
  5. 生成StatTask更新元數據
  6. 剪斷Map與Reduce間的Operator的關系

階段六:優化物理執行計划

Hive中的物理優化可以大致分為以下幾類:

  • 分區修剪(Partition Pruning)
  • 基於分區和桶的掃描修剪(Scan pruning)
  • 如果查詢基於抽樣,則掃描修剪
  • 在某些情況下,在 map 端應用 Group By
  • 在 mapper 上執行 Join
  • 優化 Union,使Union只在 map 端執行
  • 在多路 Join 中,根據用戶提示決定最后流哪個表
  • 刪除不必要的 ReduceSinkOperators
  • 對於帶有Limit子句的查詢,減少需要為該表掃描的文件數
  • 對於帶有Limit子句的查詢,通過限制 ReduceSinkOperator 生成的內容來限制來自 mapper 的輸出
  • 減少用戶提交的SQL查詢所需的Tez作業數量
  • 如果是簡單的提取查詢,避免使用MapReduce作業
  • 對於帶有聚合的簡單獲取查詢,執行不帶 MapReduce 任務的聚合
  • 重寫 Group By 查詢使用索引表代替原來的表
  • 當表掃描之上的謂詞是相等謂詞且謂詞中的列具有索引時,使用索引掃描

經過以上六個階段,SQL 就被解析映射成了集群上的 MapReduce 任務。

Explain語法

Hive Explain 語句類似Mysql 的Explain 語句,提供了對應查詢的執行計划,對於我們在理解Hive底層邏輯、Hive調優、Hive SQL書寫等方面提供了一個參照,在我們的生產工作了是一個很有意義的工具。

Hive Explain語法

EXPLAIN [EXTENDED|CBO|AST|DEPENDENCY|AUTHORIZATION|LOCKS|VECTORIZATION|ANALYZE] query

Hive Explain的語法規則如上,后面將按照對應的子句進行探討。

EXTENDED 語句會在執行計划中產生關於算子(Operator)的額外信息,這些信息都是典型的物理信息,如文件名稱等。

在執行Explain QUERY 之后,一個查詢會被轉化為包含多個Stage的語句(看起來更像一個DAG)。這些Stages要么是map/reduce Stage,要么是做些元數據或文件系統操作的Stage (如 move 、rename等)。Explain的輸出包含2個部分:

  • 執行計划不同Stage之間的以來關系(Dependency)
  • 每個Stage的執行描述信息(Description)

以下將通過一個簡單的例子進行解釋。

執行Explain 語句

EXPLAIN 
SELECT SUM(id) FROM test1;

Explain輸出結果解析

  • 依賴圖
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: int)
              outputColumnNames: id
              Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: sum(id)
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  sort order:
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: sum(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

一個HIVE查詢被轉換為一個由一個或多個stage組成的序列(有向無環圖DAG)。這些stage可以是MapReduce stage,也可以是負責元數據存儲的stage,也可以是負責文件系統的操作(比如移動和重命名)的stage。

我們將上述結果拆分看,先從最外層開始,包含兩個大的部分:

  • stage dependencies:各個stage之間的依賴性
  • stage plan:各個stage的執行計划

先看第一部分 stage dependencies ,包含兩個 stage,Stage-1 是根stage,說明這是開始的stage,Stage-0 依賴 Stage-1,Stage-1執行完成后執行Stage-0。

再看第二部分 stage plan,里面有一個 Map Reduce,一個MR的執行計划分為兩個部分

  • Map Operator Tree:MAP端的執行計划樹
  • Reduce Operator Tree:Reduce端的執行計划樹

這兩個執行計划樹里面包含這條sql語句的 operator

  1. TableScan:表掃描操作,map端第一個操作肯定是加載表,所以就是表掃描操作,常見的屬性:
alias:表名稱
Statistics:表統計信息,包含表中數據條數,數據大小等
  1. Select Operator:選取操作,常見的屬性 :
expressions:需要的字段名稱及字段類型
outputColumnNames:輸出的列名稱
Statistics:表統計信息,包含表中數據條數,數據大小等
  1. Group By Operator:分組聚合操作,常見的屬性:
aggregations:顯示聚合函數信息.
mode:聚合模式,值有 hash:隨機聚合,就是hash partition;partial:局部聚合;final:最終聚合.
keys:分組的字段,如果沒有分組,則沒有此字段.
outputColumnNames:聚合之后輸出列名.
Statistics:表統計信息,包含分組聚合之后的數據條數,數據大小等.
  1. Reduce Output Operator:輸出到reduce操作,常見屬性:

sort order:值為空 不排序;值為 + 正序排序,值為 - 倒序排序;值為 ± 排序的列為兩列,第一列為正序,第二列為倒序.

  1. Filter Operator:過濾操作,常見的屬性:

predicate:過濾條件,如sql語句中的where id>=1,則此處顯示(id >= 1).

  1. Map Join Operator:join 操作,常見的屬性:
condition map:join方式 ,如Inner Join 0 to 1 Left Outer Join0 to 2
keys: join 的條件字段
outputColumnNames:join 完成之后輸出的字段
Statistics:join 完成之后生成的數據條數,大小等
  1. File Output Operator:文件輸出操作,常見的屬性:
compressed:是否壓縮
table:表的信息,包含輸入輸出文件格式化方式,序列化方式等
  1. Fetch Operator 客戶端獲取數據操作,常見的屬性:

limit,值為 -1 表示不限制條數,其他值為限制的條數

Explain使用場景

那么Explain能夠為我們在生產實踐中帶來哪些便利及解決我們哪些迷惑呢?

join 語句會過濾 Null 的值嗎?

現在,我們在hive cli 輸入以下查詢計划語句

select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

然后執行:

explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id;

我們來看結果:

TableScan
 alias: a
 Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
 Filter Operator
    predicate: id is not null (type: boolean)
    Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int)
        outputColumnNames: _col0
        Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
        HashTable Sink Operator
           keys:
             0 _col0 (type: int)
             1 _col0 (type: int)
 ...

從上述結果可以看到 predicate: id is not null 這樣一行,說明 join 時會自動過濾掉關聯字段為 null 值的情況,但 left join 或 full join 是不會自動過濾null值的,大家可以自行嘗試下。

group by 分組語句會進行排序嗎?

select id,max(user_name) from test1 group by id;

直接來看 explain 之后結果:

TableScan
    alias: test1
    Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
    Select Operator
        expressions: id (type: int), user_name (type: string)
        outputColumnNames: id, user_name
        Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
        Group By Operator
           aggregations: max(user_name)
           keys: id (type: int)
           mode: hash
           outputColumnNames: _col0, _col1
           Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
           Reduce Output Operator
             key expressions: _col0 (type: int)
             sort order: +
             Map-reduce partition columns: _col0 (type: int)
             Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE
             value expressions: _col1 (type: string)
 ...

我們看 Group By Operator,里面有 keys: id (type: int) 說明按照 id 進行分組的,再往下看還有 sort order: + ,說明是按照 id 字段進行正序排序的。

哪條sql執行效率高

觀察如下兩條sql:

SELECT
 a.id,
 b.user_name
FROM
 test1 a
JOIN test2 b ON a.id = b.id
WHERE
 a.id > 2;
SELECT
 a.id,
 b.user_name
FROM
 (SELECT * FROM test1 WHERE id > 2) a
JOIN test2 b ON a.id = b.id;

這兩條sql語句輸出的結果是一樣的,但是哪條sql執行效率高呢?

有人說第一條sql執行效率高,因為第二條sql有子查詢,子查詢會影響性能;有人說第二條sql執行效率高,因為先過濾之后,在進行join時的條數減少了,所以執行效率就高了。到底哪條sql效率高呢,我們直接在sql語句前面加上 explain,看下執行計划不就知道了嘛!

在第一條sql語句前加上 explain,得到如下結果:

hive (default)> explain select a.id,b.user_name from test1 a join test2 b on a.id=b.id where a.id >2;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:a
          TableScan
            alias: a
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

在第二條sql語句前加上 explain,得到如下結果:

hive (default)> explain select a.id,b.user_name from(select * from  test1 where id>2 ) a join test2 b on a.id=b.id;
OK
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_0:test1
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_0:test1
          TableScan
            alias: test1
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int)
                outputColumnNames: _col0
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                HashTable Sink Operator
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 6 Data size: 75 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (id > 2) (type: boolean)
              Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: id (type: int), user_name (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 2 Data size: 25 Basic stats: COMPLETE Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col0 (type: int)
                    1 _col0 (type: int)
                  outputColumnNames: _col0, _col2
                  Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col2 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 2 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
  • 大家有什么發現,除了表別名不一樣,其他的執行計划完全一樣,都是先進行 where 條件過濾,在進行 join 條件關聯。說明 hive 底層會自動幫我們進行優化,所以這兩條sql語句執行效率是一樣的。

  • 以上僅列舉了3個我們生產中既熟悉又有點迷糊的例子,explain 還有很多其他的用途,如查看stage的依賴情況、排查數據傾斜、hive 調優等,小伙伴們可以自行嘗試。

explain dependency的用法

explain dependency用於描述一段SQL需要的數據來源,輸出是一個json格式的數據,里面包含以下兩個部分的內容:

  • input_partitions:描述一段SQL依賴的數據來源表分區,里面存儲的是分區名的列表,如果整段SQL包含的所有表都是非分區表,則顯示為空。
  • input_tables:描述一段SQL依賴的數據來源表,里面存儲的是Hive表名的列表。

使用explain dependency查看SQL查詢非分區普通表,在 hive cli 中輸入以下命令:

explain dependency select s_age,count(1) num from student_orc;

得到如下結果:

{"input_partitions":[],"input_tables":[{"tablename":"default@student_tb _orc","tabletype":"MANAGED_TABLE"}]}

使用explain dependency查看SQL查詢分區表,在 hive cli 中輸入以下命令:

explain dependency select s_age,count(1) num from student_orc_partition;

得到結果:

{"input_partitions":[{"partitionName":"default@student_orc_partition@ part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, 
{"partitionName":"default@student_orc_partition@part=2"}, 
{"partitionName":"default@student_orc_partition@part=3"},
{"partitionName":"default@student_orc_partition@part=4"}, 
{"partitionName":"default@student_orc_partition@part=5"},
{"partitionName":"default@student_orc_partition@part=6"},
{"partitionName":"default@student_orc_partition@part=7"},
{"partitionName":"default@student_orc_partition@part=8"},
{"partitionName":"default@student_orc_partition@part=9"}], 
"input_tables":[{"tablename":"default@student_orc_partition", "tabletype":"MANAGED_TABLE"}]

explain dependency的使用場景有兩個:

  • 場景一:快速排除。快速排除因為讀取不到相應分區的數據而導致任務數據輸出異常。例如,在一個以天分區的任務中,上游任務因為生產過程不可控因素出現異常或者空跑,導致下游任務引發異常。通過這種方式,可以快速查看SQL讀取的分區是否出現異常。
  • 場景二:理清表的輸入,幫助理解程序的運行,特別是有助於理解有多重子查詢,多表連接的依賴輸入。

下面通過兩個案例來看explain dependency的實際運用:

識別看似等價的代碼

有如下兩條看似相等的sql:

代碼一:

select 
a.s_no 
from student_orc_partition a 
inner join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;

代碼二:

select 
a.s_no 
from student_orc_partition a 
inner join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part 
where a.part>=1 and a.part<=2;

我們看下上述兩段代碼explain dependency的輸出結果:

代碼1的explain dependency結果:

{"input_partitions": 
[{"partitionName":"default@student_orc_partition@part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, 
{"partitionName":"default@student_orc_partition@part=2"},
{"partitionName":"default@student_orc_partition_only@part=1"}, 
{"partitionName":"default@student_orc_partition_only@part=2"}], 
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

代碼2的explain dependency結果:

{"input_partitions": 
[{"partitionName":"default@student_orc_partition@part=1"}, 
{"partitionName" : "default@student_orc_partition@part=2"},
{"partitionName" :"default@student_orc_partition_only@part=1"},
{"partitionName":"default@student_orc_partition_only@part=2"}], 
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

通過上面的輸出結果可以看到,其實上述的兩個SQL並不等價,代碼1在內連接(inner join)中的連接條件(on)中加入非等值的過濾條件后,並沒有將內連接的左右兩個表按照過濾條件進行過濾,內連接在執行時會多讀取part=0的分區數據。而在代碼2中,會過濾掉不符合條件的分區。

識別SQL讀取數據范圍的差別

有如下兩段代碼:

代碼一:

explain dependency
select
a.s_no 
from student_orc_partition a 
left join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part and b.part>=1 and b.part<=2;

代碼二:

explain dependency 
select 
a.s_no 
from student_orc_partition a 
left join 
student_orc_partition_only b 
on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;

以上兩個代碼的數據讀取范圍是一樣的嗎?答案是不一樣,我們通過explain dependency來看下:

代碼1的explain dependency結果:

{"input_partitions": 
[{"partitionName": "default@student_orc_partition@part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, …中間省略7個分區
{"partitionName":"default@student_orc_partition@part=9"}, 
{"partitionName":"default@student_orc_partition_only@part=1"}, 
{"partitionName":"default@student_orc_partition_only@part=2"}], 
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

代碼2的explain dependency結果:

{"input_partitions": 
[{"partitionName":"default@student_orc_partition@part=0"}, 
{"partitionName":"default@student_orc_partition@part=1"}, …中間省略7個分區 
{"partitionName":"default@student_orc_partition@part=9"}, 
{"partitionName":"default@student_orc_partition_only@part=0"}, 
{"partitionName":"default@student_orc_partition_only@part=1"}, …中間省略7個分區 
{"partitionName":"default@student_orc_partition_only@part=9"}],
"input_tables": [{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"}, {"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

可以看到,對左外連接在連接條件中加入非等值過濾的條件,如果過濾條件是作用於右表(b表)有起到過濾的效果,則右表只要掃描兩個分區即可,但是左表(a表)會進行全表掃描。如果過濾條件是針對左表,則完全沒有起到過濾的作用,那么兩個表將進行全表掃描。這時的情況就如同全外連接一樣都需要對兩個數據進行全表掃描。

在使用過程中,容易認為代碼片段2可以像代碼片段1一樣進行數據過濾,通過查看explain dependency的輸出結果,可以知道不是如此。

explain authorization 的用法

通過explain authorization可以知道當前SQL訪問的數據來源(INPUTS) 和數據輸出(OUTPUTS),以及當前Hive的訪問用戶 (CURRENT_USER)和操作(OPERATION)。

在 hive cli 中輸入以下命令:

explain authorization 
select variance(s_score) from student_tb_orc;

結果如下:

INPUTS: 
  default@student_tb_orc 
OUTPUTS: 
  hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194- 90f1475a3ed5/-mr-10000 
CURRENT_USER: 
  hdfs 
OPERATION: 
  QUERY 
AUTHORIZATION_FAILURES: 
  No privilege 'Select' found for inputs { database:default, table:student_ tb_orc, columnName:s_score}

從上面的信息可知:

  • 上面案例的數據來源是defalut數據庫中的 student_tb_orc表;
  • 數據的輸出路徑是hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-90f1475a3ed5/-mr-10000;
  • 當前的操作用戶是hdfs,操作是查詢;
  • 觀察上面的信息我們還會看到AUTHORIZATION_FAILURES信息,提示對當前的輸入沒有查詢權限,但如果運行上面的SQL的話也能夠正常運行。為什么會出現這種情況?Hive在默認不配置權限管理的情況下不進行權限驗證,所有的用戶在Hive里面都是超級管理員,即使不對特定的用戶進行賦權,也能夠正常查詢。

Tez引擎

Tez是Apache開源的支持DAG作業的計算框架,是支持HADOOP2.x的重要引擎。它源於MapReduce框架,核心思想是將Map和Reduce兩個操作進一步拆分,分解后的元操作可以任意靈活組合,產生新的操作,這些操作經過一些控制程序組裝后,可形成一個大的DAG作業。

Tez將Map task和Reduce task進一步拆分為如下圖所示:

Tez的task由Input、processor、output階段組成,可以表達所有復雜的map、reduce操作,如下圖:

Tez的實現

Tez對外提供了6種可編程組件,分別是:

1)Input:對輸入數據源的抽象,它解析輸入數據格式,並吐出一個個Key/value
2)Output:對輸出數據源的抽象,它將用戶程序產生的Key/value寫入文件系統
3)Paritioner:對數據進行分片,類似於MR中的Partitioner
4)Processor:對計算的抽象,它從一個Input中獲取數據,經處理后,通過Output輸出
5)Task:對任務的抽象,每個Task由一個Input、Ouput和Processor組成
6)Maser:管理各個Task的依賴關系,並按順依賴關系執行他們

除了以上6種組件,Tez還提供了兩種算子,分別是Sort(排序)和Shuffle(混洗),為了用戶使用方便,它還提供了多種Input、Output、Task和Sort的實現,具體如下:

1)Input實現:LocalMergedInput(文件本地合並后作為輸入),ShuffledMergedInput(遠程拷貝數據且合並后作為輸入)
2)Output實現:InMemorySortedOutput(內存排序后輸出),LocalOnFileSorterOutput(本地磁盤排序后輸出),OnFileSortedOutput(磁盤排序后輸出)
3)Task實現:RunTimeTask(非常簡單的Task,基本沒做什么事)
4)Sort實現:DefaultSorter(本地數據排序),InMemoryShuffleSorter(遠程拷貝數據並排序)

為了展示Tez的使用方法和驗證Tez框架的可用性,Apache在YARN MRAppMaster基礎上使用Tez編程接口重新設計了MapReduce框架,使之可運行在YARN中。為此,Tez提供了以下幾個組件:

1)Input:SimpleInput(直接使用MR InputFormat獲取數據)
2)Output:SimpleOutput(直接使用MR OutputFormat獲取數據)
3)Partition:MRPartitioner(直接使用MR Partitioner獲取數據)
4)Processor:MapProcessor(執行Map Task),ReduceProcessor(執行Reduce Task)
5)Task:FinalTask,InitialTask,initialTaskWithInMemSort,InitialTaskWithLocalSort ,IntermediateTask,LocalFinalTask,MapOnlyTask。

對於MapReduce作業而言,如果只有Map Task,則使用MapOnlyTask,否則,Map Task使用InitialTaskWithInMemSort而Reduce Task用FinalTask。當然,如果你想編寫其他類型的作業,可使用以上任何幾種Task進行組合,比如”InitialTaskWithInMemSort –> FinalTask”是MapReduce作業。

為了減少Tez開發工作量,並讓Tez能夠運行在YARN之上,Tez重用了大部分YARN中MRAppMater的代碼,包括客戶端、資源申請、任務推測執行、任務啟動等。

Tez和MapReduce作業的比較:

  • Tez繞過了MapReduce很多不必要的中間的數據存儲和讀取的過程,直接在一個作業中表達了MapReduce需要多個作業共同協作才能完成的事情。
  • Tez和MapReduce一樣都運行使用YARN作為資源調度和管理。但與MapReduce on YARN不同,Tez on YARN並不是將作業提交到ResourceManager,而是提交到AMPoolServer的服務上,AMPoolServer存放着若干已經預先啟動ApplicationMaster的服務。
  • 當用戶提交一個作業上來后,AMPoolServer從中選擇一個ApplicationMaster用於管理用戶提交上來的作業,這樣既可以節省ResourceManager創建ApplicationMaster的時間,而又能夠重用每個ApplicationMaster的資源,節省了資源釋放和創建時間。

Tez相比於MapReduce有幾點重大改進:

  • 當查詢需要有多個reduce邏輯時,Hive的MapReduce引擎會將計划分解,每個Redcue提交一個MR作業。這個鏈中的所有MR作業都需要逐個調度,每個作業都必須從HDFS中重新讀取上一個作業的輸出並重新洗牌。而在Tez中,幾個reduce接收器可以直接連接,數據可以流水線傳輸,而不需要臨時HDFS文件,這種模式稱為MRR(Map-reduce-reduce*)。
  • Tez還允許一次發送整個查詢計划,實現應用程序動態規划,從而使框架能夠更智能地分配資源,並通過各個階段流水線傳輸數據。對於更復雜的查詢來說,這是一個巨大的改進,因為它消除了IO/sync障礙和各個階段之間的調度開銷。
  • 在MapReduce計算引擎中,無論數據大小,在Shuffle階段都以相同的方式執行,將數據序列化到磁盤,再由下游的程序去拉取,並反序列化。Tez可以允許小數據集完全在內存中處理,而MapReduce中沒有這樣的優化。倉庫查詢經常需要在處理完大量的數據后對小型數據集進行排序或聚合,Tez的優化也能極大地提升效率。

給 Hive 換上 Tez 非常簡單,只需給 hive-site.xml 中設置:

<property>
    <name>hive.execution.engine</name>
    <value>tez</value>
</property>

設置hive.execution.engine為 tez 后進入到 Hive 執行 SQL:

hive> select count(*) as c from userinfo;
Query ID = zhenqin_20161104150743_4155afab-4bfa-4e8a-acb0-90c8c50ecfb5
Total jobs = 1
Launching Job 1 out of 1
 
 
Status: Running (Executing on YARN cluster with App id application_1478229439699_0007)
 
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      2          2        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 6.19 s     
--------------------------------------------------------------------------------
OK
1000000
Time taken: 6.611 seconds, Fetched: 1 row(s)

可以看到,我的 userinfo 中有 100W 條記錄,執行一遍 count 需要 6.19s。現在把 engine 換為 mr

set hive.execution.engine=mr;

再次執行 count userinfo:

hive> select count(*) as c from userinfo;
Query ID = zhenqin_20161104152022_c7e6c5bd-d456-4ec7-b895-c81a369aab27
Total jobs = 1
Launching Job 1 out of 1
Starting Job = job_1478229439699_0010, Tracking URL = http://localhost:8088/proxy/application_1478229439699_0010/
Kill Command = /Users/zhenqin/software/hadoop/bin/hadoop job  -kill job_1478229439699_0010
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-11-04 15:20:28,323 Stage-1 map = 0%,  reduce = 0%
2016-11-04 15:20:34,587 Stage-1 map = 100%,  reduce = 0%
2016-11-04 15:20:40,796 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_1478229439699_0010
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   HDFS Read: 215 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
1000000
Time taken: 19.46 seconds, Fetched: 1 row(s)
hive> 

可以看到,使用 Tez 效率比 MapReduce 有近3倍的提升。而且,Hive 在使用 Tez 引擎執行時,有 ==>> 動態的進度指示。而在使用 mr 時,只有日志輸出 map and reduce 的進度百分比。使用 tez,輸出的日志也清爽很多。

在我測試的很多復雜的 SQL,Tez 的都比 MapReduce 快很多,快慢取決於 SQL 的復雜度。執行簡單的 select 等並不能體現 tez 的優勢。Tez 內部翻譯 SQL 能任意的 Map,Reduce,Reduce 組合,而 MR 只能 Map->Reduce->Map->Reduce,因此在執行復雜 SQL 時, Tez 的優勢明顯。

Tez 參數優化

優化參參數(在同樣條件下,使用了tez從300s+降到200s+)

set hive.execution.engine=tez;
set mapred.job.name=recommend_user_profile_$idate;
set mapred.reduce.tasks=-1;
set hive.exec.reducers.max=160;
set hive.auto.convert.join=true;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=16; 
set hive.optimize.skewjoin=true;
set hive.exec.reducers.bytes.per.reducer=100000000;
set mapred.max.split.size=200000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.hadoop.supports.splittable.combineinputformat=true;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

Tez內存優化

1. AM、Container大小設置

tez.am.resource.memory.mb

參數說明:Set tez.am.resource.memory.mb tobe the same as yarn.scheduler.minimum-allocation-mb the YARNminimum container size.

hive.tez.container.size

參數說明:Set hive.tez.container.size to be the same as or a small multiple(1 or 2 times that) of YARN container size yarn.scheduler.minimum-allocation-mb but NEVER more than yarn.scheduler.maximum-allocation-mb.

2. AM、Container JVM參數設置

tez.am.launch.cmd-opts

默認值:80% * tez.am.resource.memory.mb,一般不需要調整

hive.tez.java.ops

默認值:80% * hive.tez.container.size 參數說明:Hortonworks建議"–server –Djava.net.preferIPv4Stack=true–XX:NewRatio=8 –XX:+UseNUMA –XX:UseG1G"

tez.container.max.java.heap.fraction

默認值:0.8,參數說明:task/AM占用JVM Xmx的比例,該參數建議調整,需根據具體業務情況修改;

3. Hive內存Map Join參數設置

tez.runtime.io.sort.mb

默認值:100,參數說明:輸出排序需要的內存大小。建議值:40% * hive.tez.container.size,一般不超過2G.

hive.auto.convert.join.noconditionaltask

默認值:true,參數說明:是否將多個mapjoin合並為一個,使用默認值

hive.auto.convert.join.noconditionaltask.size

默認值為10MB,參數說明:多個mapjoin轉換為1個時,所有小表的文件大小總和的最大值,這個值只是限制輸入的表文件的大小,並不代表實際mapjoin時hashtable的大小。建議值:1/3 * hive.tez.container.size

tez.runtime.unordered.output.buffer.size-mb

默認值:100,參數說明:Size of the buffer to use if not writing directly to disk。建議值: 10% * hive.tez.container.size.

4. Container重用設置

tez.am.container.reuse.enabled

默認值:true,參數說明:Container重用開關

Spark引擎

Hive社區於2014年推出了Hive on Spark項目(HIVE-7292),將Spark作為繼MapReduce和Tez之后Hive的第三個計算引擎。該項目由Cloudera、Intel和MapR等幾家公司共同開發,並受到了來自Hive和Spark兩個社區的共同關注。通過該項目,可以提高Hive查詢的性能,同時為已經部署了Hive或者Spark的用戶提供了更加靈活的選擇,從而進一步提高Hive和Spark的普及率。

總體設計

Hive on Spark總體的設計思路是,盡可能重用Hive邏輯層面的功能;從生成物理計划開始,提供一整套針對Spark的實現,比如 SparkCompiler、SparkTask等,這樣Hive的查詢就可以作為Spark的任務來執行了。以下是幾點主要的設計原則。

  • 盡可能減少對Hive原有代碼的修改。這是和之前的Shark設計思路最大的不同。Shark對Hive的改動太大以至於無法被Hive社區接受,Hive on Spark盡可能少改動Hive的代碼,從而不影響Hive目前對MapReduce和Tez的支持。同時,Hive on Spark保證對現有的MapReduce和Tez模式在功能和性能方面不會有任何影響。
  • 對於選擇Spark的用戶,應使其能夠自動的獲取Hive現有的和未來新增的功能。
  • 盡可能降低維護成本,保持對Spark依賴的松耦合。

基於以上思路和原則,具體的一些設計架構如下。

Hive 的用戶可以通過hive.execution.engine來設置計算引擎,目前該參數可選的值為mr和tez。為了實現Hive on Spark,我們將spark作為該參數的第三個選項。要開啟Hive on Spark模式,用戶僅需將這個參數設置為spark即可。

在hive中使用以下語句開啟:

hive> set hive.execution.engine=spark;

總體設計

Spark 以分布式可靠數據集(Resilient Distributed Dataset,RDD)作為其數據抽象,因此我們需要將Hive的表轉化為RDD以便Spark處理。本質上,Hive的表和Spark的 HadoopRDD都是HDFS上的一組文件,通過InputFormat和RecordReader讀取其中的數據,因此這個轉化是自然而然的。

Spark為RDD提供了一系列的轉換(Transformation),其中有些轉換也是面向SQL 的,如groupByKey、join等。但如果使用這些轉換(就如Shark所做的那樣),就意味着我們要重新實現一些Hive已有的功能;而且當 Hive增加新的功能時,我們需要相應地修改Hive on Spark模式。有鑒於此,我們選擇將Hive的操作符包裝為Function,然后應用到RDD上。這樣,我們只需要依賴較少的幾種RDD的轉換,而主要的計算邏輯仍由Hive提供。

由於使用了Hive的原語,因此我們需要顯式地調用一些Transformation來實現Shuffle的功能。下表中列舉了Hive on Spark使用的所有轉換。

Hive on Spark

對repartitionAndSortWithinPartitions 簡單說明一下,這個功能由SPARK-2978引入,目的是提供一種MapReduce風格的Shuffle。雖然sortByKey也提供了排序的功 能,但某些情況下我們並不需要全局有序,另外其使用的Range Partitioner對於某些Hive的查詢並不適用。

物理執行計划

通過SparkCompiler將Operator Tree轉換為Task Tree,其中需要提交給Spark執行的任務即為SparkTask。不同於MapReduce中Map+Reduce的兩階段執行模式,Spark采用DAG執行模式,因此一個SparkTask包含了一個表示RDD轉換的DAG,我們將這個DAG包裝為SparkWork。執行SparkTask 時,就根據SparkWork所表示的DAG計算出最終的RDD,然后通過RDD的foreachAsync來觸發運算。使用foreachAsync是因為我們使用了Hive原語,因此不需要RDD返回結果;此外foreachAsync異步提交任務便於我們對任務進行監控。

SparkContext生命周期

SparkContext 是用戶與Spark集群進行交互的接口,Hive on Spark應該為每個用戶的會話創建一個SparkContext。但是Spark目前的使用方式假設SparkContext的生命周期是Spark應 用級別的,而且目前在同一個JVM中不能創建多個SparkContext。這明顯無法滿足HiveServer2的應用場景,因為多個客戶端需要通過同一個HiveServer2來提供服務。鑒於此,我們需要在單獨的JVM中啟動SparkContext,並通過RPC與遠程的SparkContext進行通信。

任務監控與統計信息收集

Spark提供了SparkListener接口來監聽任務執行期間的各種事件,因此我們可以實現一個Listener來監控任務執行進度以及收集任務級別的統計信 息(目前任務級別的統計由SparkListener采集,任務進度則由Spark提供的專門的API來監控)。另外Hive還提供了Operator級 別的統計數據信息,比如讀取的行數等。在MapReduce模式下,這些信息通過Hadoop Counter收集。我們可以使用Spark提供的Accumulator來實現該功能。

細節實現

Hive on Spark解析SQL的過程

SQL語句在分析執行過程中會經歷下圖所示的幾個步驟

  1. 語法解析
  2. 操作綁定
  3. 優化執行策略
  4. 交付執行

語法解析

語法解析之后,會形成一棵語法樹,如下圖所示。樹中的每個節點是執行的rule,整棵樹稱之為執行策略。

策略優化

形成上述的執行策略樹還只是第一步,因為這個執行策略可以進行優化,所謂的優化就是對樹中節點進行合並或是進行順序上的調整。

以大家熟悉的join操作為例,下圖給出一個join優化的示例。A JOIN B等同於B JOIN A,但是順序的調整可能給執行的性能帶來極大的影響,下圖就是調整前后的對比圖。

在Hash Join中,首先被訪問的表稱之為“內部構建表”,第二個表為“探針輸入”。創建內部表時,會將數據移動到數據倉庫指向的路徑;創建外部表,僅記錄數據所在的路徑。

再舉一例,一般來說盡可能的先實施聚合操作(Aggregate)然后再join

這種優化自動完成,在調優時不需要考慮。

SQL到Spark作業的轉換過程

  • native command的執行流程

由於native command是一些非耗時的操作,直接使用Hive中原有的exeucte engine來執行即可。這些command的執行示意圖如下:

  • SparkTask的生成和執行

我們通過一個例子來看一下一個簡單的兩表JOIN查詢如何被轉換為SparkTask並被執行。下圖左半部分展示了這個查詢的Operator Tree,以及該Operator Tree如何被轉化成SparkTask;右半部分展示了該SparkTask執行時如何得到最終的RDD並通過foreachAsync提交Spark任務。

SparkCompiler遍歷Operator Tree,將其划分為不同的MapWork和ReduceWork。

MapWork為根節點,總是由TableScanOperator(Hive中對表進行掃描的操作符)開始;后續的Work均為ReduceWork。ReduceSinkOperator(Hive中進行Shuffle輸出的操作符)用來標記兩個Work之間的界線,出現ReduceSinkOperator表示當前Work到下一個Work之間的數據需要進行Shuffle。因此,當我們發現ReduceSinkOperator時,就會創建一個新的ReduceWork並作為當前Work的子節點。包含了FileSinkOperator(Hive中將結果輸出到文件的操作符)的Work為葉子節點。

與MapReduce最大的不同在於,我們並不要求ReduceWork一定是葉子節點,即ReduceWork之后可以鏈接更多的ReduceWork,並在同一個SparkTask中執行。

從該圖可以看出,這個查詢的Operator Tree被轉化成了兩個MapWork和一個ReduceWork。

執行SparkTask步驟:

  1. 根據MapWork來生成最底層的HadoopRDD,
  2. 將各個MapWork和ReduceWork包裝成Function應用到RDD上。
  3. 在有依賴的Work之間,需要顯式地調用Shuffle轉換,具體選用哪種Shuffle則要根據查詢的類型來確定。另外,由於這個例子涉及多表查詢,因此在Shuffle之前還要對RDD進行Union。
  4. 經過這一系列轉換后,得到最終的RDD,並通過foreachAsync提交到Spark集群上進行計算。

在logicalPlan到physicalPlan的轉換過程中,toRdd最關鍵的元素

override lazy val toRdd: RDD[Row] =
      analyzed match {
        case NativeCommand(cmd) =>
          val output = runSqlHive(cmd)

          if (output.size == 0) {
            emptyResult
          } else {
            val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
            sparkContext.parallelize(asRows, 1)
          }
        case _ =>
          executedPlan.execute().map(_.copy())
      }

SparkTask的生成和執行

我們通過一個例子來看一下一個簡單的兩表JOIN查詢如何被轉換為SparkTask並被執行。下圖左半部分展示了這個查詢的Operator Tree,以及該Operator Tree如何被轉化成SparkTask;右半部分展示了該SparkTask執行時如何得到最終的RDD並通過foreachAsync提交Spark任務。

SparkCompiler遍歷Operator Tree,將其划分為不同的MapWork和ReduceWork。MapWork為根節點,總是由TableScanOperator(Hive中對表 進行掃描的操作符)開始;后續的Work均為ReduceWork。ReduceSinkOperator(Hive中進行Shuffle輸出的操作符) 用來標記兩個Work之間的界線,出現ReduceSinkOperator表示當前Work到下一個Work之間的數據需要進行Shuffle。因此, 當我們發現ReduceSinkOperator時,就會創建一個新的ReduceWork並作為當前Work的子節點。包含了 FileSinkOperator(Hive中將結果輸出到文件的操作符)的Work為葉子節點。與MapReduce最大的不同在於,我們並不要求 ReduceWork一定是葉子節點,即ReduceWork之后可以鏈接更多的ReduceWork,並在同一個SparkTask中執行。

從該圖可以看出,這個查詢的Operator Tree被轉化成了兩個MapWork和一個ReduceWork。在執行SparkTask時,首先根據MapWork來生成最底層的 HadoopRDD,然后將各個MapWork和ReduceWork包裝成Function應用到RDD上。在有依賴的Work之間,需要顯式地調用 Shuffle轉換,具體選用哪種Shuffle則要根據查詢的類型來確定。另外,由於這個例子涉及多表查詢,因此在Shuffle之前還要對RDD進行 Union。經過這一系列轉換后,得到最終的RDD,並通過foreachAsync提交到Spark集群上進行計算。

運行模式

Hive on Spark支持兩種運行模式:本地和遠程。當用戶把Spark Master URL設置為local時,采用本地模式;其余情況則采用遠程模式。本地模式下,SparkContext與客戶端運行在同一個JVM中;遠程模式 下,SparkContext運行在一個獨立的JVM中。提供本地模式主要是為了方便調試,一般用戶不應選擇該模式。因此我們這里也主要介紹遠程模式 (Remote SparkContext,RSC)。下圖展示了RSC的工作原理。

用戶的每個Session會創建一個SparkClient,SparkClient會啟動RemoteDriver進程,並由RemoteDriver創 建SparkContext。SparkTask執行時,通過Session提交任務,任務的主體就是對應的SparkWork。SparkClient 將任務提交給RemoteDriver,並返回一個SparkJobRef,通過該SparkJobRef,客戶端可以監控任務執行進度,進行錯誤處理, 以及采集統計信息等。由於最終的RDD計算沒有返回結果,因此客戶端只需要監控執行進度而不需要處理返回值。RemoteDriver通過 SparkListener收集任務級別的統計數據,通過Accumulator收集Operator級別的統計數據(Accumulator被包裝為 SparkCounter),並在任務結束時返回給SparkClient。

SparkClient 與RemoteDriver之間通過基於Netty的RPC進行通信。除了提交任務,SparkClient還提供了諸如添加Jar包、獲取集群信息等接 口。如果客戶端需要使用更一般的SparkContext的功能,可以自定義一個任務並通過SparkClient發送到RemoteDriver上執行。

理論上來說,Hive on Spark對於Spark集群的部署方式沒有特別的要求,除了local以外,RemoteDriver可以連接到任意的Spark集群來執行任務。在我 們的測試中,Hive on Spark在Standalone和Spark on YARN的集群上都能正常工作(需要動態添加Jar包的查詢在yarn-cluster模式下還不能運行,請參考HIVE-9425)。

優化

Yarn的配置

yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb,這兩個參數決定這集群資源管理器能夠有多少資源用於運行yarn上的任務。這兩個參數的值是由機器的配置及同時在機器上運行的其它進程共同決定。本文假設僅有hdfs的datanode和yarn的nodemanager運行於該節點。

  1. 配置cores

基本配置是datanode和nodemanager各一個核,操作系統兩個核,然后剩下28核配置作為yarn資源。也即是yarn.nodemanager.resource.cpu-vcores=28

  1. 配置內存

對於內存,預留20GB給操作系統,datanode,nodemanager,剩余100GB作為yarn資源。也即是 yarn.nodemanager.resource.memory-mb=100*1024

Spark配置

假設Yarn節點機器配置,假設有32核,120GB內存。

給Yarn分配資源以后,那就要想着spark如何使用這些資源了,主要配置對象:

execurtor 和driver內存,executro配額,並行度。

  1. executor內存

設置executor內存需要考慮如下因素:

  • executor內存越多,越能為更多的查詢提供map join的優化。由於垃圾回收的壓力會導致開銷增加。
  • 某些情況下hdfs的客戶端不能很好的處理並發寫入,所以過多的核心可能會導致競爭。

為了最大化使用core,建議將core設置為4,5,6(多核心會導致並發問題,所以寫代碼的時候尤其是靜態的鏈接等要考慮並發問題)具體分配核心數要結合yarn所提供的核心數。由於本文中涉及到的node節點是28核,那么很明顯分配為4的化可以被整除,spark.executor.cores設置為4 不會有多余的核剩下,設置為5,6都會有core剩余。spark.executor.cores=4,由於總共有28個核,那么最大可以申請的executor數是7。總內存處以7,也即是 100/7,可以得到每個executor約14GB內存。

要知道 spark.executor.memory 和spark.executor.memoryOverhead共同決定着executor內存。建議spark.executor.memoryOverhead站總內存的 15%-20%。那么最終spark.executor.memoryOverhead=2G和spark.executor.memory=12G.

根據上面的配置的化,每個主機就可以申請7個executor,每個executor可以運行4個任務,每個core一個task。那么每個task的平均內存是 14/4 = 3.5GB。在executor運行的task共享內存。其實,executor內部是用newCachedThreadPool運行task的。

確保spark.executor.memoryOverhead和spark.executor.memory的和不超過yarn.scheduler.maximum-allocation-mb。

  1. driver內存

對於drvier的內存配置,當然也包括兩個參數。

  • spark.driver.memoryOverhead 每個driver能從yarn申請的堆外內存的大小。

  • spark.driver.memory 當運行hive on spark的時候,每個spark driver能申請的最大jvm 堆內存。該參數結合 spark.driver.memoryOverhead共同決定着driver的內存大小。

driver的內存大小並不直接影響性能,但是也不要job的運行受限於driver的內存. 這里給出spark driver內存申請的方案,假設yarn.nodemanager.resource.memory-mb是 X。

  • driver內存申請12GB,假設 X > 50GB
  • driver內存申請 4GB,假設 12GB < X <50GB
  • driver內存申請1GB,假設 1GB < X < 12 GB
  • driver內存申請256MB,假設 X < 1GB

這些數值是spark.driver.memory和 spark.driver.memoryOverhead內存的總和。對外內存站總內存的10%-15%。假設 yarn.nodemanager.resource.memory-mb=100*1024MB,那么driver內存設置為12GB,此時 spark.driver.memory=10.5gb和spark.driver.memoryOverhead=1.5gb

注意,資源多少直接對應的是數據量的大小。所以要結合資源和數據量進行適當縮減和增加。

  1. executor數

executor的數目是由每個節點運行的executor數目和集群的節點數共同決定。如果你有四十個節點,那么hive可以使用的最大executor數就是 280(40*7). 最大數目可能比這個小點,因為driver也會消耗1core和12GB。

當前假設是沒有yarn應用在跑。

Hive性能與用於運行查詢的executor數量直接相關。但是,不通查詢還是不通。通常,性能與executor的數量成比例。例如,查詢使用四個executor大約需要使用兩個executor的一半時間。但是,性能在一定數量的executor中達到峰值,高於此值時,增加數量不會改善性能並且可能產生不利影響。

在大多數情況下,使用一半的集群容量(executor數量的一半)可以提供良好的性能。為了獲得最佳性能,最好使用所有可用的executor。例如,設置spark.executor.instances = 280。對於基准測試和性能測量,強烈建議這樣做。

4.動態executor申請

雖然將spark.executor.instances設置為最大值通常可以最大限度地提高性能,但不建議在多個用戶運行Hive查詢的生產環境中這樣做。避免為用戶會話分配固定數量的executor,因為如果executor空閑,executor不能被其他用戶查詢使用。在生產環境中,應該好好計划executor分配,以允許更多的資源共享。

Spark允許您根據工作負載動態擴展分配給Spark應用程序的集群資源集。要啟用動態分配,請按照動態分配中的步驟進行操作。除了在某些情況下,強烈建議啟用動態分配。

  1. 並行度

要使可用的executor得到充分利用,必須同時運行足夠的任務(並行)。在大多數情況下,Hive會自動確定並行度,但也可以在調優並發度方面有一些控制權。在輸入端,map任務的數量等於輸入格式生成的split數。對於Hive on Spark,輸入格式為CombineHiveInputFormat,它可以根據需要對基礎輸入格式生成的split進行分組。可以更好地控制stage邊界的並行度。調整hive.exec.reducers.bytes.per.reducer以控制每個reducer處理的數據量,Hive根據可用的executor,執行程序內存,以及其他因素來確定最佳分區數。實驗表明,只要生成足夠的任務來保持所有可用的executor繁忙,Spark就比MapReduce對hive.exec.reducers.bytes.per.reducer指定的值敏感度低。為獲得最佳性能,請為該屬性選擇一個值,以便Hive生成足夠的任務以完全使用所有可用的executor。

Hive配置

Hive on spark 共享了很多hive性能相關的配置。可以像調優hive on mapreduce一樣調優hive on spark。然而,hive.auto.convert.join.noconditionaltask.size是基於統計信息將基礎join轉化為map join的閾值,可能會對性能產生重大影響。盡管該配置可以用hive on mr和hive on spark,但是兩者的解釋不同。

數據的大小有兩個統計指標:

  • totalSize- 數據在磁盤上的近似大小
  • rawDataSize- 數據在內存中的近似大小

hive on mr用的是totalSize。hive on spark使用的是rawDataSize。由於可能存在壓縮和序列化,這兩個值會有較大的差別。對於hive on spark 需要將 hive.auto.convert.join.noconditionaltask.size指定為更大的值,才能將與hive on mr相同的join轉化為map join。

可以增加此參數的值,以使地圖連接轉換更具凶猛。將common join 轉換為 map join 可以提高性能。如果此值設置得太大,則來自小表的數據將使用過多內存,任務可能會因內存不足而失敗。根據群集環境調整此值。

通過參數 hive.stats.collect.rawdatasize 可以控制是否收集 rawDataSize 統計信息。

對於hiveserver2,建議再配置兩個額外的參數: hive.stats.fetch.column.stats=true 和 hive.optimize.index.filter=true.

Hive性能調優通常建議使用以下屬性:

hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.merge.mapfiles=true
hive.merge.mapredfiles=false
hive.merge.smallfiles.avgsize=16000000
hive.merge.size.per.task=256000000
hive.merge.sparkfiles=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M)
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4 (MR and Spark)
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.optimize.ppd=true

預啟動YARN容器

在開始新會話后提交第一個查詢時,在查看查詢開始之前可能會遇到稍長的延遲。還會注意到,如果再次運行相同的查詢,它的完成速度比第一個快得多。

Spark執行程序需要額外的時間來啟動和初始化yarn上的Spark,這會導致較長的延遲。此外,Spark不會等待所有executor在啟動作業之前全部啟動完成,因此在將作業提交到群集后,某些executor可能仍在啟動。但是,對於在Spark上運行的作業,作業提交時可用executor的數量部分決定了reducer的數量。當就緒executor的數量未達到最大值時,作業可能沒有最大並行度。這可能會進一步影響第一個查詢的性能。

在用戶較長期會話中,這個額外時間不會導致任何問題,因為它只在第一次查詢執行時發生。然而,諸如Oozie發起的Hive工作之類的短期繪畫可能無法實現最佳性能。

為減少啟動時間,可以在作業開始前啟用容器預熱。只有在請求的executor准備就緒時,作業才會開始運行。這樣,在reduce那一側不會減少短會話的並行性。

要啟用預熱功能,請在發出查詢之前將hive.prewarm.enabled設置為true。還可以通過設置hive.prewarm.numcontainers來設置容器數量。默認值為10。

預熱的executor的實際數量受spark.executor.instances(靜態分配)或spark.dynamicAllocation.maxExecutors(動態分配)的值限制。hive.prewarm.numcontainers的值不應超過分配給用戶會話的值。

注意:預熱需要幾秒鍾,對於短會話來說是一個很好的做法,特別是如果查詢涉及reduce階段。但是,如果hive.prewarm.numcontainers的值高於群集中可用的值,則該過程最多可能需要30秒。請謹慎使用預熱。

另外,一個完整調優案例你可以參考:https://blog.csdn.net/u010010664/article/details/77066031


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM