Flink1.10集成Hive快速入門


點擊上方藍字了解更多精彩

Hive 是大數據領域最早出現的 SQL 引擎,發展至今有着豐富的功能和廣泛的用戶基礎。之后出現的 SQL 引擎,如 Spark SQL、Impala 等,都在一定程度上提供了與 Hive 集成的功能,從而方便用戶使用現有的數據倉庫、進行作業遷移等。

Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產環境中使用。在最新版Flink1.10版本,標志着對 Blink的整合宣告完成,達到了對 Hive 的生產級別集成,Hive作為數據倉庫系統的絕對核心,承擔着絕大多數的離線數據ETL計算和數據管理,期待Flink未來對Hive的完美支持。

而 HiveCatalog 會與一個 Hive Metastore 的實例連接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶需要配置一個 HiveCatalog,並通過 HiveCatalog 訪問 Hive 中的元數據。

添加依賴

要與Hive集成,需要在Flink的lib目錄下添加額外的依賴jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以將這些依賴項放在文件夾中,並分別使用Table API程序或SQL Client 的-C-l選項將它們添加到classpath中。本文使用第一種方式,即將jar包直接復制到$FLINK_HOME/lib目錄下。本文使用的Hive版本為2.3.4(對於不同版本的Hive,可以參照官網選擇不同的jar包依賴),總共需要3個jar包,如下:

  • flink-connector-hive_2.11-1.10.0.jar

  • flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

  • hive-exec-2.3.4.jar

其中hive-exec-2.3.4.jar在hive的lib文件夾下,另外兩個需要自行下載,下載地址:flink-connector-hive_2.11-1.10.0.jar

[https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.10.0/]

flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

[https://maven.aliyun.com/mvn/search]

切莫拔劍四顧心茫然,話不多說,直接上代碼。

構建程序

添加Maven依賴

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>  

實例代碼

package com.flink.sql.hiveintegration;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 *  @Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/3/31
 *  @Time: 13:22
 *  
 */

public class FlinkHiveIntegration {

    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inBatchMode() // Batch模式,默認為StreamingMode
                .build();

        //使用StreamingMode
       /* EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inStreamingMode() // StreamingMode
                .build();*/


        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";      // Catalog名稱,定義一個唯一的名稱表示
        String defaultDatabase = "qfbap_ods";  // 默認數據庫名稱
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";  // hive-site.xml路徑
        String version = "2.3.4";       // Hive版本號

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);

        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");
        // 創建數據庫,目前不支持創建hive表
        String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123";

        tableEnv.sqlUpdate(createDbSql);  

    }
}

Flink SQL Client集成Hive

Flink的表和SQL API可以處理用SQL語言編寫的查詢,但是這些查詢需要嵌入到用Java或Scala編寫的程序中。此外,這些程序在提交到集群之前需要與構建工具打包。這或多或少地限制了Java/Scala程序員對Flink的使用。

SQL客戶端旨在提供一種簡單的方式,無需一行Java或Scala代碼,即可將表程序編寫、調試和提交到Flink集群。Flink SQL客戶端CLI允許通過命令行的形式運行分布式程序。使用Flink SQL cli訪問Hive,需要配置sql-client-defaults.yaml文件。

sql-client-defaults.yaml配置

目前 HiveTableSink 不支持流式寫入(未實現 AppendStreamTableSink)。需要將執行模式改成 batch
模式,否則會報如下錯誤:

org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

需要修改的配置內容如下:

#...省略的配置項...

#==============================================================================
# Catalogs
#==============================================================================
# 配置catalogs,可以配置多個.
catalogs: # empty list
  - name: myhive
    type: hive
    hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
    hive-version: 2.3.4
    default-database: qfbap_ods

#...省略的配置項...

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'blink' (used by default) or 'old'
  planner: blink
  # 'batch' or 'streaming' execution
  type: batch

啟動Flink SQL Cli

bin/sql-client.sh  embedded

在啟動之前,確保Hive的metastore已經開啟了,否則會報Failed to create Hive Metastore client異常。啟動成功,如下圖:

啟動之后,就可以在此Cli下執行SQL命令訪問Hive的表了,基本的操作如下:

-- 命令行幫助
Flink SQL> help
-- 查看當前會話的catalog,其中myhive為自己配置的,default_catalog為默認的
Flink SQLshow catalogs;
default_catalog
myhive
-- 使用catalog
Flink SQL> use catalog myhive;
-- 查看當前catalog的數據庫
Flink SQL> show databases;
-- 創建數據庫
Flink SQL> create database testdb;
-- 刪除數據庫
Flink SQL> drop database testdb;
-- 創建表
Flink SQL> create table tbl(id int,name string);
-- 刪除表
Flink SQL> drop table tbl;
-- 查詢表
Flink SQL> select * from  code_city;
-- 插入數據
Flink SQL> insert overwrite code_city select id,city,province,event_time from code_city_delta ;
Flink SQL> INSERT into code_city values(1,'南京','江蘇','');

小結

本文以最新版本的Flink為例,對Flink集成Hive進行了實操。首先通過代碼的方式與Hive進行集成,然后介紹了如何使用Flink SQL 客戶端訪問Hive,並對其中會遇到的坑進行了描述,最后給出了Flink SQL Cli的詳細使用。相信在未來的版本中Flink SQL會越來越完善,期待Flink未來對Hive的完美支持。

溫馨提示

如果你喜歡本文,請分享到朋友圈,想要獲得更多信息,請關注我。


我就知道你“在看”

本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM