Flink集成Hive之快速入門--以Flink1.12為例


使用Hive構建數據倉庫已經成為了比較普遍的一種解決方案。目前,一些比較常見的大數據處理引擎,都無一例外兼容Hive。Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產環境中使用。在Flink1.10版本中,標志着對 Blink的整合宣告完成,對 Hive 的集成也達到了生產級別的要求。值得注意的是,不同版本的Flink對於Hive的集成有所差異,本文將以最新的Flink1.12版本為例,闡述Flink集成Hive的簡單步驟,以下是全文,希望對你有所幫助。

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包
在這里插入圖片描述

Flink集成Hive的基本方式

Flink 與 Hive 的集成主要體現在以下兩個方面:

  • 持久化元數據

Flink利用 Hive 的 MetaStore 作為持久化的 Catalog,我們可通過HiveCatalog將不同會話中的 Flink 元數據存儲到 Hive Metastore 中。 例如,我們可以使用HiveCatalog將其 Kafka的數據源表存儲在 Hive Metastore 中,這樣該表的元數據信息會被持久化到Hive的MetaStore對應的元數據庫中,在后續的 SQL 查詢中,我們可以重復使用它們。

  • 利用 Flink 來讀寫 Hive 的表。

Flink打通了與Hive的集成,如同使用SparkSQL或者Impala操作Hive中的數據一樣,我們可以使用Flink直接讀寫Hive中的表。

HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive表。 不需要修改現有的 Hive Metastore,也不需要更改表的數據位置或分區。

Flink集成Hive的步驟

Flink支持的Hive版本

大版本 V1 V2 V3 V4 V5 V6 V7
1.0 1.0.0 1.0.1
1.1 1.1.0 1.1.1
1.2 1.2.0 1.2.1 1.2.2
2.0 2.0.0 2.0.1
2.1 2.1.0 2.1.1
2.2 2.2.0
2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6
3.1 3.1.0 3.1.1 3.1.2

值得注意的是,對於不同的Hive版本,可能在功能方面有所差異,這些差異取決於你使用的Hive版本,而不取決於Flink,一些版本的功能差異如下:

  • Hive 內置函數在使用 Hive-1.2.0 及更高版本時支持。
  • 列約束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本時支持。
  • 更改表的統計信息,在使用 Hive-1.2.0 及更高版本時支持。
  • DATE列統計信息,在使用 Hive-1.2.0 及更高版時支持。
  • 使用 Hive-2.0.x 版本時不支持寫入 ORC 表。

依賴項

本文以Flink1.12為例,集成的Hive版本為Hive2.3.4。集成Hive需要額外添加一些依賴jar包,並將其放置在Flink安裝目錄下的lib文件夾下,這樣我們才能通過 Table API 或 SQL Client 與 Hive 進行交互。

另外,Apache Hive 是基於 Hadoop 之上構建的, 所以還需要 Hadoop 的依賴,配置好HADOOP_CLASSPATH即可。這一點非常重要,否則在使用FlinkSQL Cli查詢Hive中的表時,會報如下錯誤:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的環境變量

export HADOOP_CLASSPATH=`hadoop classpath`

Flink官網提供了兩種方式添加Hive的依賴項。第一種是使用 Flink 提供的 Hive Jar包(根據使用的 Metastore 的版本來選擇對應的 Hive jar),建議優先使用Flink提供的Hive jar包,這種方式比較簡單方便。本文使用的就是此種方式。當然,如果你使用的Hive版本與Flink提供的Hive jar包兼容的版本不一致,你可以選擇第二種方式,即別添加每個所需的 jar 包。

下面列舉了可用的jar包及其適用的Hive版本,我們可以根據使用的Hive版本,下載對應的jar包即可。比如本文使用的Hive版本為Hive2.3.4,所以只需要下載flink-sql-connector-hive-2.3.6即可,並將其放置在Flink安裝目錄的lib文件夾下。

Metastore version Maven dependency SQL Client JAR
1.0.0 ~ 1.2.2 flink-sql-connector-hive-1.2.2 Download
2.0.0 ~2.2.0 flink-sql-connector-hive-2.2.0 Download
2.3.0 ~2.3.6 flink-sql-connector-hive-2.3.6 Download
3.0.0 ~ 3.1.2 flink-sql-connector-hive-3.1.2 Download

上面列舉的jar包,是我們在使用Flink SQL Cli所需要的jar包,除此之外,根據不同的Hive版本,還需要添加如下jar包。以Hive2.3.4為例,除了上面的一個jar包之外,還需要添加下面兩個jar包:

flink-connector-hive_2.11-1.12.0.jarhive-exec-2.3.4.jar。其中hive-exec-2.3.4.jar包存在於Hive安裝路徑下的lib文件夾。flink-connector-hive_2.11-1.12.0.jar的下載地址為:

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.12.0/

NOTE✒️:Flink1.12集成Hive只需要添加如下三個jar包,以Hive2.3.4為例,分別為:

flink-sql-connector-hive-2.3.6

flink-connector-hive_2.11-1.12.0.jar

hive-exec-2.3.4.jar

Flink SQL Cli集成Hive

將上面的三個jar包添加至Flink的lib目錄下之后,就可以使用Flink操作Hive的數據表了。以FlinkSQL Cli為例:

配置sql-client-defaults.yaml

該文件時Flink SQL Cli啟動時使用的配置文件,該文件位於Flink安裝目錄的conf/文件夾下,具體的配置如下,主要是配置catalog:

除了上面的一些配置參數,Flink還提供了下面的一些其他配置參數:

參數 必選 默認值 類型 描述
type (無) String Catalog 的類型。 創建 HiveCatalog 時,該參數必須設置為'hive'
name (無) String Catalog 的名字。僅在使用 YAML file 時需要指定。
hive-conf-dir (無) String 指向包含 hive-site.xml 目錄的 URI。 該 URI 必須是 Hadoop 文件系統所支持的類型。 如果指定一個相對 URI,即不包含 scheme,則默認為本地文件系統。如果該參數沒有指定,我們會在 class path 下查找hive-site.xml。
default-database default String 當一個catalog被設為當前catalog時,所使用的默認當前database。
hive-version (無) String HiveCatalog 能夠自動檢測使用的 Hive 版本。我們建議不要手動設置 Hive 版本,除非自動檢測機制失敗。
hadoop-conf-dir (無) String Hadoop 配置文件目錄的路徑。目前僅支持本地文件系統路徑。我們推薦使用 HADOOP_CONF_DIR 環境變量來指定 Hadoop 配置。因此僅在環境變量不滿足您的需求時再考慮使用該參數,例如當您希望為每個 HiveCatalog 單獨設置 Hadoop 配置時。

操作Hive中的表

首先啟動FlinkSQL Cli,命令如下:

./bin/sql-client.sh embedded

接下來,我們可以查看注冊的catalog

Flink SQL> show catalogs;
default_catalog
myhive

使用注冊的myhive catalog

Flink SQL> use catalog myhive;

假設Hive中有一張users表,在Hive中查詢該表:

hive (default)> select * from users;
OK
users.id        users.mame
1       jack
2       tom
3       robin
4       haha
5       haha

查看對應的數據庫表,我們可以看到Hive中已經存在的表,這樣就可以使用FlinkSQL操作Hive中的表,比如查詢,寫入數據。

Flink SQL> show tables;
Flink SQL> select * from users;

向Hive表users中插入一條數據:

Flink SQL> insert into users select 6,'bob';

再次使用Hive客戶端去查詢該表的數據,會發現寫入了一條數據。

接下來,我們再在FlinkSQL Cli中創建一張kafka的數據源表:

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT, -- 用戶行為發生的時間戳
    `proctime` AS PROCTIME(), -- 通過計算列產生一個處理時間列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定義watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數據源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

查看表結構

Flink SQL> DESCRIBE user_behavior;

我們可以在Hive的客戶端中執行下面命令查看剛剛在Flink SQLCli中創建的表

hive (default)> desc formatted  user_behavior;
# Detailed Table Information 
Database:               default                  
Owner:                  null                     
CreateTime:             Sun Dec 20 16:04:59 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/user_behavior 
Table Type:             MANAGED_TABLE            
Table Parameters:                
        flink.connector         kafka               
        flink.format            json                
        flink.json.fail-on-missing-field        true                
        flink.json.ignore-parse-errors  false               
        flink.properties.bootstrap.servers      kms-2:9092,kms-3:9092,kms-4:9092
        flink.properties.group.id       group1              
        flink.scan.startup.mode earliest-offset     
        flink.schema.0.data-type        BIGINT              
        flink.schema.0.name     user_id             
        flink.schema.1.data-type        BIGINT              
        flink.schema.1.name     item_id             
        flink.schema.2.data-type        BIGINT              
        flink.schema.2.name     cat_id              
        flink.schema.3.data-type        VARCHAR(2147483647) 
        flink.schema.3.name     action              
        flink.schema.4.data-type        INT                 
        flink.schema.4.name     province            
        flink.schema.5.data-type        BIGINT              
        flink.schema.5.name     ts                  
        flink.schema.6.data-type        TIMESTAMP(3) NOT NULL
        flink.schema.6.expr     PROCTIME()          
        flink.schema.6.name     proctime            
        flink.schema.7.data-type        TIMESTAMP(3)        
        flink.schema.7.expr     TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
        flink.schema.7.name     eventTime           
        flink.schema.watermark.0.rowtime        eventTime           
        flink.schema.watermark.0.strategy.data-type     TIMESTAMP(3)        
        flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '5' SECOND
        flink.topic             user_behavior       
        is_generic              true                
        transient_lastDdlTime   1608451499          
                 
# Storage Information 
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        serialization.format    1                   

NOTE🏴:在Flink中創建一張表,會把該表的元數據信息持久化到Hive的metastore中,我們可以在Hive的metastore中查看該表的元數據信息

進入Hive的元數據信息庫,本文使用的是MySQL。執行下面的命令:

SELECT 
    a.tbl_id, -- 表id
    from_unixtime(create_time) AS create_time, -- 創建時間
    a.db_id, -- 數據庫id
    b.name AS db_name, -- 數據庫名稱
    a.tbl_name -- 表名稱
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";

使用代碼連接到 Hive

maven依賴

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.4</version>
</dependency>

代碼

public class HiveIntegrationDemo {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
        
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 使用注冊的catalog
        tableEnv.useCatalog("myhive");
        // 向Hive表中寫入一條數據 
        String insertSQL = "insert into users select 10,'lihua'";

        TableResult result2 = tableEnv.executeSql(insertSQL);
        System.out.println(result2.getJobClient().get().getJobStatus());

    }
}

提交程序,觀察Hive表的變化:

bin/flink run -m kms-1:8081 \
-c com.flink.sql.hiveintegration.HiveIntegrationDemo \
./original-study-flink-sql-1.0-SNAPSHOT.jar

總結

本文以最新的Flink1.12為例,闡述了Flink集成Hive的基本步驟,並對其注意事項進行了說明。文中也給出了如何通過FlinkSQL Cli和代碼去操作Hive表的步驟。下一篇,將介紹Hive Catalog與Hive Dialect。

公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM