使用Hive構建數據倉庫已經成為了比較普遍的一種解決方案。目前,一些比較常見的大數據處理引擎,都無一例外兼容Hive。Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產環境中使用。在Flink1.10版本中,標志着對 Blink的整合宣告完成,對 Hive 的集成也達到了生產級別的要求。值得注意的是,不同版本的Flink對於Hive的集成有所差異,本文將以最新的Flink1.12版本為例,闡述Flink集成Hive的簡單步驟,以下是全文,希望對你有所幫助。
公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包
Flink集成Hive的基本方式
Flink 與 Hive 的集成主要體現在以下兩個方面:
- 持久化元數據
Flink利用 Hive 的 MetaStore 作為持久化的 Catalog,我們可通過HiveCatalog將不同會話中的 Flink 元數據存儲到 Hive Metastore 中。 例如,我們可以使用HiveCatalog將其 Kafka的數據源表存儲在 Hive Metastore 中,這樣該表的元數據信息會被持久化到Hive的MetaStore對應的元數據庫中,在后續的 SQL 查詢中,我們可以重復使用它們。
- 利用 Flink 來讀寫 Hive 的表。
Flink打通了與Hive的集成,如同使用SparkSQL或者Impala操作Hive中的數據一樣,我們可以使用Flink直接讀寫Hive中的表。
HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive表。 不需要修改現有的 Hive Metastore,也不需要更改表的數據位置或分區。
Flink集成Hive的步驟
Flink支持的Hive版本
| 大版本 | V1 | V2 | V3 | V4 | V5 | V6 | V7 |
|---|---|---|---|---|---|---|---|
| 1.0 | 1.0.0 | 1.0.1 | |||||
| 1.1 | 1.1.0 | 1.1.1 | |||||
| 1.2 | 1.2.0 | 1.2.1 | 1.2.2 | ||||
| 2.0 | 2.0.0 | 2.0.1 | |||||
| 2.1 | 2.1.0 | 2.1.1 | |||||
| 2.2 | 2.2.0 | ||||||
| 2.3 | 2.3.0 | 2.3.1 | 2.3.2 | 2.3.3 | 2.3.4 | 2.3.5 | 2.3.6 |
| 3.1 | 3.1.0 | 3.1.1 | 3.1.2 |
值得注意的是,對於不同的Hive版本,可能在功能方面有所差異,這些差異取決於你使用的Hive版本,而不取決於Flink,一些版本的功能差異如下:
- Hive 內置函數在使用 Hive-1.2.0 及更高版本時支持。
- 列約束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本時支持。
- 更改表的統計信息,在使用 Hive-1.2.0 及更高版本時支持。
DATE列統計信息,在使用 Hive-1.2.0 及更高版時支持。- 使用 Hive-2.0.x 版本時不支持寫入 ORC 表。
依賴項
本文以Flink1.12為例,集成的Hive版本為Hive2.3.4。集成Hive需要額外添加一些依賴jar包,並將其放置在Flink安裝目錄下的lib文件夾下,這樣我們才能通過 Table API 或 SQL Client 與 Hive 進行交互。
另外,Apache Hive 是基於 Hadoop 之上構建的, 所以還需要 Hadoop 的依賴,配置好HADOOP_CLASSPATH即可。這一點非常重要,否則在使用FlinkSQL Cli查詢Hive中的表時,會報如下錯誤:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的環境變量:
export HADOOP_CLASSPATH=`hadoop classpath`
Flink官網提供了兩種方式添加Hive的依賴項。第一種是使用 Flink 提供的 Hive Jar包(根據使用的 Metastore 的版本來選擇對應的 Hive jar),建議優先使用Flink提供的Hive jar包,這種方式比較簡單方便。本文使用的就是此種方式。當然,如果你使用的Hive版本與Flink提供的Hive jar包兼容的版本不一致,你可以選擇第二種方式,即別添加每個所需的 jar 包。
下面列舉了可用的jar包及其適用的Hive版本,我們可以根據使用的Hive版本,下載對應的jar包即可。比如本文使用的Hive版本為Hive2.3.4,所以只需要下載flink-sql-connector-hive-2.3.6即可,並將其放置在Flink安裝目錄的lib文件夾下。
| Metastore version | Maven dependency | SQL Client JAR |
|---|---|---|
| 1.0.0 ~ 1.2.2 | flink-sql-connector-hive-1.2.2 |
Download |
| 2.0.0 ~2.2.0 | flink-sql-connector-hive-2.2.0 |
Download |
| 2.3.0 ~2.3.6 | flink-sql-connector-hive-2.3.6 |
Download |
| 3.0.0 ~ 3.1.2 | flink-sql-connector-hive-3.1.2 |
Download |
上面列舉的jar包,是我們在使用Flink SQL Cli所需要的jar包,除此之外,根據不同的Hive版本,還需要添加如下jar包。以Hive2.3.4為例,除了上面的一個jar包之外,還需要添加下面兩個jar包:
flink-connector-hive_2.11-1.12.0.jar和hive-exec-2.3.4.jar。其中hive-exec-2.3.4.jar包存在於Hive安裝路徑下的lib文件夾。flink-connector-hive_2.11-1.12.0.jar的下載地址為:
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.12.0/
NOTE✒️:Flink1.12集成Hive只需要添加如下三個jar包,以Hive2.3.4為例,分別為:
flink-sql-connector-hive-2.3.6
flink-connector-hive_2.11-1.12.0.jar
hive-exec-2.3.4.jar
Flink SQL Cli集成Hive
將上面的三個jar包添加至Flink的lib目錄下之后,就可以使用Flink操作Hive的數據表了。以FlinkSQL Cli為例:
配置sql-client-defaults.yaml
該文件時Flink SQL Cli啟動時使用的配置文件,該文件位於Flink安裝目錄的conf/文件夾下,具體的配置如下,主要是配置catalog:

除了上面的一些配置參數,Flink還提供了下面的一些其他配置參數:
| 參數 | 必選 | 默認值 | 類型 | 描述 |
|---|---|---|---|---|
| type | 是 | (無) | String | Catalog 的類型。 創建 HiveCatalog 時,該參數必須設置為'hive'。 |
| name | 是 | (無) | String | Catalog 的名字。僅在使用 YAML file 時需要指定。 |
| hive-conf-dir | 否 | (無) | String | 指向包含 hive-site.xml 目錄的 URI。 該 URI 必須是 Hadoop 文件系統所支持的類型。 如果指定一個相對 URI,即不包含 scheme,則默認為本地文件系統。如果該參數沒有指定,我們會在 class path 下查找hive-site.xml。 |
| default-database | 否 | default | String | 當一個catalog被設為當前catalog時,所使用的默認當前database。 |
| hive-version | 否 | (無) | String | HiveCatalog 能夠自動檢測使用的 Hive 版本。我們建議不要手動設置 Hive 版本,除非自動檢測機制失敗。 |
| hadoop-conf-dir | 否 | (無) | String | Hadoop 配置文件目錄的路徑。目前僅支持本地文件系統路徑。我們推薦使用 HADOOP_CONF_DIR 環境變量來指定 Hadoop 配置。因此僅在環境變量不滿足您的需求時再考慮使用該參數,例如當您希望為每個 HiveCatalog 單獨設置 Hadoop 配置時。 |
操作Hive中的表
首先啟動FlinkSQL Cli,命令如下:
./bin/sql-client.sh embedded
接下來,我們可以查看注冊的catalog
Flink SQL> show catalogs;
default_catalog
myhive
使用注冊的myhive catalog
Flink SQL> use catalog myhive;
假設Hive中有一張users表,在Hive中查詢該表:
hive (default)> select * from users;
OK
users.id users.mame
1 jack
2 tom
3 robin
4 haha
5 haha
查看對應的數據庫表,我們可以看到Hive中已經存在的表,這樣就可以使用FlinkSQL操作Hive中的表,比如查詢,寫入數據。
Flink SQL> show tables;
Flink SQL> select * from users;

向Hive表users中插入一條數據:
Flink SQL> insert into users select 6,'bob';
再次使用Hive客戶端去查詢該表的數據,會發現寫入了一條數據。
接下來,我們再在FlinkSQL Cli中創建一張kafka的數據源表:
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用戶id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品類id
`action` STRING, -- 用戶行為
`province` INT, -- 用戶所在的省份
`ts` BIGINT, -- 用戶行為發生的時間戳
`proctime` AS PROCTIME(), -- 通過計算列產生一個處理時間列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定義watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主題
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消費者組
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 數據源格式為json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
查看表結構
Flink SQL> DESCRIBE user_behavior;

我們可以在Hive的客戶端中執行下面命令查看剛剛在Flink SQLCli中創建的表
hive (default)> desc formatted user_behavior;
# Detailed Table Information
Database: default
Owner: null
CreateTime: Sun Dec 20 16:04:59 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://kms-1.apache.com:8020/user/hive/warehouse/user_behavior
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector kafka
flink.format json
flink.json.fail-on-missing-field true
flink.json.ignore-parse-errors false
flink.properties.bootstrap.servers kms-2:9092,kms-3:9092,kms-4:9092
flink.properties.group.id group1
flink.scan.startup.mode earliest-offset
flink.schema.0.data-type BIGINT
flink.schema.0.name user_id
flink.schema.1.data-type BIGINT
flink.schema.1.name item_id
flink.schema.2.data-type BIGINT
flink.schema.2.name cat_id
flink.schema.3.data-type VARCHAR(2147483647)
flink.schema.3.name action
flink.schema.4.data-type INT
flink.schema.4.name province
flink.schema.5.data-type BIGINT
flink.schema.5.name ts
flink.schema.6.data-type TIMESTAMP(3) NOT NULL
flink.schema.6.expr PROCTIME()
flink.schema.6.name proctime
flink.schema.7.data-type TIMESTAMP(3)
flink.schema.7.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
flink.schema.7.name eventTime
flink.schema.watermark.0.rowtime eventTime
flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '5' SECOND
flink.topic user_behavior
is_generic true
transient_lastDdlTime 1608451499
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
NOTE🏴:在Flink中創建一張表,會把該表的元數據信息持久化到Hive的metastore中,我們可以在Hive的metastore中查看該表的元數據信息
進入Hive的元數據信息庫,本文使用的是MySQL。執行下面的命令:
SELECT
a.tbl_id, -- 表id
from_unixtime(create_time) AS create_time, -- 創建時間
a.db_id, -- 數據庫id
b.name AS db_name, -- 數據庫名稱
a.tbl_name -- 表名稱
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";

使用代碼連接到 Hive
maven依賴
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.4</version>
</dependency>
代碼
public class HiveIntegrationDemo {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用注冊的catalog
tableEnv.useCatalog("myhive");
// 向Hive表中寫入一條數據
String insertSQL = "insert into users select 10,'lihua'";
TableResult result2 = tableEnv.executeSql(insertSQL);
System.out.println(result2.getJobClient().get().getJobStatus());
}
}
提交程序,觀察Hive表的變化:
bin/flink run -m kms-1:8081 \
-c com.flink.sql.hiveintegration.HiveIntegrationDemo \
./original-study-flink-sql-1.0-SNAPSHOT.jar
總結
本文以最新的Flink1.12為例,闡述了Flink集成Hive的基本步驟,並對其注意事項進行了說明。文中也給出了如何通過FlinkSQL Cli和代碼去操作Hive表的步驟。下一篇,將介紹Hive Catalog與Hive Dialect。
公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包

