Flink集成Hive之Hive Catalog與Hive Dialect--以Flink1.12


在上一篇分享Flink集成Hive之快速入門–以Flink1.12為例中,介紹了Flink集成Hive的進本步驟。本文分享,將繼續介紹Flink集成Hive的另外兩個概念:Hive Catalog與Hive Dialect。本文包括以下內容,希望對你有所幫助。

  • 什么是Hive Catalog
  • 如何使用Hive Catalog
  • 什么是Hive Dialect
  • 如何使用Hive Dialect

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包

什么是Hive Catalog

我們知道,Hive使用Hive Metastore(HMS)存儲元數據信息,使用關系型數據庫來持久化存儲這些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元數據,這就是Hive Catalog的功能。

Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元數據。Hive Catalog可以將元數據進行持久化,這樣后續的操作就可以反復使用這些表的元數據,而不用每次使用時都要重新注冊。如果不去持久化catalog,那么在每個session中取處理數據,都要去重復地創建元數據對象,這樣是非常耗時的。

如何使用Hive Catalog

HiveCatalog是開箱即用的,所以,一旦配置好Flink與Hive集成,就可以使用HiveCatalog。比如,我們通過FlinkSQL 的DDL語句創建一張kafka的數據源表,立刻就能查看該表的元數據信息。

HiveCatalog可以處理兩種類型的表:一種是Hive兼容的表,另一種是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式來存儲的,所以,對於Hive兼容表而言,我們既可以使用Flink去操作該表,又可以使用Hive去操作該表。

普通表是對Flink而言的,當使用HiveCatalog創建一張普通表,僅僅是使用Hive MetaStore將其元數據進行了持久化,所以可以通過Hive查看這些表的元數據信息(通過DESCRIBE FORMATTED命令),但是不能通過Hive去處理這些表,因為語法不兼容。

對於是否是普通表,Flink使用is_generic屬性進行標識。默認情況下,創建的表是普通表,即is_generic=true,如果要創建Hive兼容表,需要在建表屬性中指定is_generic=false

尖叫提示:

由於依賴Hive Metastore,所以必須開啟Hive MetaStore服務

代碼中使用Hive Catalog

   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 使用注冊的catalog
        tableEnv.useCatalog("myhive");

Flink SQLCli中使用Hive Catalog

在FlinkSQL Cli中使用Hive Catalog很簡單,只需要配置一下sql-cli-defaults.yaml文件即可。配置內容如下:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

在FlinkSQL Cli中創建一張kafka表,該表默認為普通表,即is_generic=true

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT, -- 用戶行為發生的時間戳
    `proctime` AS PROCTIME(), -- 通過計算列產生一個處理時間列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定義watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數據源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

我們可以在Hive客戶端中查看該表的元數據信息

hive (default)> desc formatted  user_behavior;
Table Parameters:                
       ...
        is_generic              true                
      ...         

從上面的元數據信息可以看出,is_generic=true,說明該表是一張普通表,如果在Hive中去查看該表,則會報錯。

上面創建的表是普通表,該表不能使用Hive去查詢。那么,該如何創建一張Hive兼容表呢?我們只需要在建表的屬性中顯示指定is_generic=false即可,具體如下:

CREATE TABLE hive_compatible_tbl ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT -- 用戶行為發生的時間戳
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數據源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false',
    'is_generic' = 'false'
);

當我們在Hive中查看該表的元數據信息時,可以看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:                
        ...           
        is_generic              false               
        ...

我們可以使用FlinkSQL Cli或者HiveCli向該表中寫入數據,然后分別通過FlinkSQL Cli和Hive Cli去查看該表數據的變化

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;

再在FlinkSQL Cli中查看該表,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
                   user_id                   item_id                    action
                      2020                      1221                       buy
    

同樣,我們可以在FlinkSQL Cli中去向該表中寫入數據:

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;

                   user_id                   item_id                    action
                      2020                      1221                       buy
                      2020                      1222                       fav

尖叫提示:

對於Hive兼容的表,需要注意數據類型,具體的數據類型對應關系以及注意點如下

Flink 數據類型 Hive 數據類型
CHAR§ CHAR§
VARCHAR§ VARCHAR§
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) TIMESTAMP
BYTES BINARY
ARRAY LIST
MAP<K, V> MAP<K, V>
ROW STRUCT

注意

  • Hive CHAR(p) 類型的最大長度為255
  • Hive VARCHAR(p)類型的最大長度為65535
  • Hive MAP類型的key僅支持基本類型,而Flink’s MAP 類型的key執行任意類型
  • Hive不支持聯合數據類型,比如STRUCT
  • Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函數只能處理 precision <= 9的 TIMESTAMP
  • Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET類型
  • FlinkINTERVAL 類型與 Hive INTERVAL 類型不一樣

上面介紹了普通表和Hive兼容表,那么我們該如何使用Hive的語法進行建表呢?這個時候就需要使用Hive Dialect

什么是Hive Dialect

從Flink1.11.0開始,只要開啟了Hive dialect配置,用戶就可以使用HiveQL語法,這樣我們就可以在Flink中使用Hive的語法使用一些DDL和DML操作。

Flink目前支持兩種SQL方言(SQL dialects),分別為:default和hive。默認的SQL方言是default,如果要使用Hive的語法,需要將SQL方言切換到hive

如何使用Hive Dialect

在SQL Cli中使用Hive dialect

使用hive dialect只需要配置一個參數即可,該參數名稱為:table.sql-dialect。我們就可以在sql-client-defaults.yaml配置文件中進行配置,也可以在具體的會話窗口中進行設定,對於SQL dialect的切換,不需要進行重啟session。

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive

如果我們需要在SQL Cli中進行切換hive dialect,可以使用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
Flink SQL> set table.sql-dialect=default; -- 使用default dialect

尖叫提示:

一旦切換到了hive dialect,就只能使用Hive的語法建表,如果嘗試使用Flink的語法建表,則會報錯

在Table API中配合dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 使用hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 使用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

操作示例

Flink SQL> set table.sql-dialect=hive;
-- 使用Hive語法創建一張表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
  `id` int COMMENT 'id',
  `name` string COMMENT '名稱',
  `age` int COMMENT '年齡' 
)
COMMENT 'hive dialect表測試'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

進入Hive客戶端去查看該表的元數據信息

desc formatted hive_dialect_tbl;
col_name        data_type       comment
# col_name data_type comment 
                 
id                      int                                         
name                    string                                      
age                     int                                         
                 
# Detailed Table Information 
Database:               default                  
Owner:                  null                     
CreateTime:             Mon Dec 21 17:23:48 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl 
Table Type:             MANAGED_TABLE            
Table Parameters:                
        comment                 hive dialect表測試     
        is_generic              false               
        transient_lastDdlTime   1608542628          
                 
# Storage Information 
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        field.delim             ,                   
        serialization.format    ,                   

很明顯,該表是一張Hive兼容表,即is_generic=false

使用FlinkSQLCli向該表中寫入一條數據:

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

我們也可以在Hive的Cli中去操作該表

hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

以下是使用Hive方言的一些注意事項。

  • Hive dialect只能用於操作Hive表,不能用於普通表。Hive方言應與HiveCatalog一起使用。
  • 雖然所有Hive版本都支持相同的語法,但是是否有特定功能仍然取決於使用的Hive版本。例如,僅在Hive-2.4.0或更高版本中支持更新數據庫位置。
  • Hive和Calcite具有不同的保留關鍵字。例如,default在Calcite中是保留關鍵字,在Hive中是非保留關鍵字。所以,在使用Hive dialect時,必須使用反引號(`)引用此類關鍵字,才能將其用作標識符。
  • 在Hive中不能查詢在Flink中創建的視圖。

當然,一旦開啟了Hive dialect,我們就可以按照Hive的操作方式在Flink中去處理Hive的數據了,具體的操作與Hive一致,本文不再贅述。

總結

本文主要介紹了Hive Catalog和Hive Dialect。其中Hive Catalog的作用是持久化Flink的元數據信息,Hive Dialect是支持Hive語法的一個配置參數,這兩個概念是Flink集成Hive的關鍵。下一篇分享將介紹如何使用Flink讀寫Hive。

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM