

Hive 是大數據領域最早出現的 SQL 引擎,發展至今有着豐富的功能和廣泛的用戶基礎。之后出現的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了與 Hive 集成的功能,從而方便用戶使用現有的數據倉庫、進行作業遷移等。
Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產環境中使用。在最新版Flink1.10版本,標志着對 Blink的整合宣告完成,達到了對 Hive 的生產級別集成,Hive作為數據倉庫系統的絕對核心,承擔着絕大多數的離線數據ETL計算和數據管理,期待Flink未來對Hive的完美支持。
而 HiveCatalog 會與一個 Hive Metastore 的實例連接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶需要配置一個 HiveCatalog,並通過 HiveCatalog 訪問 Hive 中的元數據。
添加依賴
要與Hive集成,需要在Flink的lib目錄下添加額外的依賴jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以將這些依賴項放在文件夾中,並分別使用Table API程序或SQL Client 的-C
或-l
選項將它們添加到classpath中。本文使用第一種方式,即將jar包直接復制到$FLINK_HOME/lib目錄下。本文使用的Hive版本為2.3.4(對於不同版本的Hive,可以參照官網選擇不同的jar包依賴),總共需要3個jar包,如下:
flink-connector-hive_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
hive-exec-2.3.4.jar
其中hive-exec-2.3.4.jar在hive的lib文件夾下,另外兩個需要自行下載,下載地址:flink-connector-hive_2.11-1.10.0.jar
[https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.10.0/]
flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
[https://maven.aliyun.com/mvn/search]
切莫拔劍四顧心茫然,話不多說,直接上代碼。
構建程序
添加Maven依賴
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
實例代碼
package com.flink.sql.hiveintegration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* @Created with IntelliJ IDEA.
* @author : jmx
* @Date: 2020/3/31
* @Time: 13:22
*
*/
public class FlinkHiveIntegration {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() // 使用BlinkPlanner
.inBatchMode() // Batch模式,默認為StreamingMode
.build();
//使用StreamingMode
/* EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner() // 使用BlinkPlanner
.inStreamingMode() // StreamingMode
.build();*/
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive"; // Catalog名稱,定義一個唯一的名稱表示
String defaultDatabase = "qfbap_ods"; // 默認數據庫名稱
String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf"; // hive-site.xml路徑
String version = "2.3.4"; // Hive版本號
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// 創建數據庫,目前不支持創建hive表
String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123";
tableEnv.sqlUpdate(createDbSql);
}
}
Flink SQL Client集成Hive
Flink的表和SQL API可以處理用SQL語言編寫的查詢,但是這些查詢需要嵌入到用Java或Scala編寫的程序中。此外,這些程序在提交到集群之前需要與構建工具打包。這或多或少地限制了Java/Scala程序員對Flink的使用。
SQL客戶端旨在提供一種簡單的方式,無需一行Java或Scala代碼,即可將表程序編寫、調試和提交到Flink集群。Flink SQL客戶端CLI允許通過命令行的形式運行分布式程序。使用Flink SQL cli訪問Hive,需要配置sql-client-defaults.yaml文件。
sql-client-defaults.yaml配置
目前 HiveTableSink 不支持流式寫入(未實現 AppendStreamTableSink)。需要將執行模式改成 batch
模式,否則會報如下錯誤:
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.
需要修改的配置內容如下:
#...省略的配置項...
#==============================================================================
# Catalogs
#==============================================================================
# 配置catalogs,可以配置多個.
catalogs: # empty list
- name: myhive
type: hive
hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
hive-version: 2.3.4
default-database: qfbap_ods
#...省略的配置項...
#==============================================================================
# Execution properties
#==============================================================================
# Properties that change the fundamental execution behavior of a table program.
execution:
# select the implementation responsible for planning table programs
# possible values are 'blink' (used by default) or 'old'
planner: blink
# 'batch' or 'streaming' execution
type: batch
啟動Flink SQL Cli
bin/sql-client.sh embedded
在啟動之前,確保Hive的metastore已經開啟了,否則會報Failed to create Hive Metastore client異常。啟動成功,如下圖:
啟動之后,就可以在此Cli下執行SQL命令訪問Hive的表了,基本的操作如下:
-- 命令行幫助
Flink SQL> help
-- 查看當前會話的catalog,其中myhive為自己配置的,default_catalog為默認的
Flink SQL> show catalogs;
default_catalog
myhive
-- 使用catalog
Flink SQL> use catalog myhive;
-- 查看當前catalog的數據庫
Flink SQL> show databases;
-- 創建數據庫
Flink SQL> create database testdb;
-- 刪除數據庫
Flink SQL> drop database testdb;
-- 創建表
Flink SQL> create table tbl(id int,name string);
-- 刪除表
Flink SQL> drop table tbl;
-- 查詢表
Flink SQL> select * from code_city;
-- 插入數據
Flink SQL> insert overwrite code_city select id,city,province,event_time from code_city_delta ;
Flink SQL> INSERT into code_city values(1,'南京','江蘇','');
小結
本文以最新版本的Flink為例,對Flink集成Hive進行了實操。首先通過代碼的方式與Hive進行集成,然后介紹了如何使用Flink SQL 客戶端訪問Hive,並對其中會遇到的坑進行了描述,最后給出了Flink SQL Cli的詳細使用。相信在未來的版本中Flink SQL會越來越完善,期待Flink未來對Hive的完美支持。

溫馨提示
如果你喜歡本文,請分享到朋友圈,想要獲得更多信息,請關注我。

本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。