Flink基礎(二十一):Table API 和 Flink SQL(六)Flink和Hive集成


Apache Hive 已經成為了數據倉庫生態系統中的核心。 它不僅僅是一個用於大數據分析和ETL場景的SQL引擎,同樣它也是一個數據管理平台,可用於發現,定義,和演化數據。

Flink 與 Hive 的集成包含兩個層面。

一是利用了 Hive 的 MetaStore 作為持久化的 Catalog,用戶可通過HiveCatalog將不同會話中的 Flink 元數據存儲到 Hive Metastore 中。 例如,用戶可以使用HiveCatalog將其 Kafka 表或 Elasticsearch 表存儲在 Hive Metastore 中,並后續在 SQL 查詢中重新使用它們。

二是利用 Flink 來讀寫 Hive 的表。

HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive 數倉。 您不需要修改現有的 Hive Metastore,也不需要更改表的數據位置或分區。

Maven依賴

主要包含三部分的依賴:flink和hive的連接器,hive的依賴和hadoop的依賴。

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <!--            <scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>${hadoop.version}</version>
    <!--<scope>provided</scope>-->
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop.version}</version>
</dependency>
View Code

示例程序

先在hive中新建數據庫和表

create database mydb;
use mydb;
create table if not exists t_user(id string, name string);
insert into table t_user values ('1','huangbo'), ('2','xuzheng'),('3','wangbaoqiang');

然后編寫程序,將數據流寫入到hive中

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object TestHiveStreaming {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val stream = env
      .fromElements(
        ("10", "haha"),
        ("11", "hehe")
      )

    val name            = "myhive"
    val defaultDatabase = "mydb"
    val hiveConfDir     = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
    val version         = "3.1.2"

    val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
    tableEnv.registerCatalog("myhive", hive)

    // set the HiveCatalog as the current catalog of the session
    tableEnv.useCatalog("myhive")
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useDatabase("mydb")

    tableEnv.createTemporaryView("users", stream, 'id, 'name)

    tableEnv.executeSql("insert into t_user select id, name from users")
    tableEnv.executeSql("select * from t_user")
  }
}

一個復雜一點的程序

import java.sql.Timestamp

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object TestHiveStreaming {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val stream = env.fromElements(
      ("1", 1000, new Timestamp(1000L)),
      ("2", 2000, new Timestamp(2000L)),
      ("3", 3000, new Timestamp(3000L))
    )

    val name            = "myhive"
    val defaultDatabase = "mydb"
    val hiveConfDir     = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
    val version         = "3.1.2"

    val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
    tableEnv.registerCatalog("myhive", hive)

    // set the HiveCatalog as the current catalog of the session
    tableEnv.useCatalog("myhive")
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useDatabase("mydb")

    tableEnv.createTemporaryView("users", stream, 'userId, 'amount, 'ts)

    val hiveSql = "CREATE external TABLE fs_table (\n" +
                     "  user_id STRING,\n" +
                     "  order_amount DOUBLE" +
                     ") partitioned by (dt string,h string,m string) " +
                     "stored as ORC " +
                     "TBLPROPERTIES (\n" +
                     "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
                     "  'sink.partition-commit.delay'='0s',\n" +
                     "  'sink.partition-commit.trigger'='partition-time',\n" +
                     "  'sink.partition-commit.policy.kind'='metastore'" +
                     ")"

    tableEnv.executeSql(hiveSql)

    val insertSql = "insert into fs_table SELECT userId, amount, " +
      " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users"
    tableEnv.executeSql(insertSql)
  }
}

徹底重置hadoop和hive的方法

stop-all.sh
hadoop namenode -format
# 在mysql中刪除hive的元數據庫
start-all.sh
hadoop fs -mkdir /tmp
hadoop fs -mkdir -p /user/hive/warehouse
hadoop fs -chmod g+w /tmp
hadoop fs -chmod g+w /user/hive/warehouse
schematool -dbType mysql -initSchema
hive --service metastore
hive

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM