Apache Hive 已經成為了數據倉庫生態系統中的核心。 它不僅僅是一個用於大數據分析和ETL場景的SQL引擎,同樣它也是一個數據管理平台,可用於發現,定義,和演化數據。
Flink 與 Hive 的集成包含兩個層面。
一是利用了 Hive 的 MetaStore 作為持久化的 Catalog,用戶可通過HiveCatalog將不同會話中的 Flink 元數據存儲到 Hive Metastore 中。 例如,用戶可以使用HiveCatalog將其 Kafka 表或 Elasticsearch 表存儲在 Hive Metastore 中,並后續在 SQL 查詢中重新使用它們。
二是利用 Flink 來讀寫 Hive 的表。
HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive 數倉。 您不需要修改現有的 Hive Metastore,也不需要更改表的數據位置或分區。
1 Maven依賴
主要包含三部分的依賴:flink和hive的連接器,hive的依賴和hadoop的依賴。

<!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>1.11.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.11.0</version> <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency>
示例程序
先在hive中新建數據庫和表
create database mydb;
use mydb;
create table if not exists t_user(id string, name string);
insert into table t_user values ('1','huangbo'), ('2','xuzheng'),('3','wangbaoqiang');
然后編寫程序,將數據流寫入到hive中
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
object TestHiveStreaming {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val stream = env
.fromElements(
("10", "haha"),
("11", "hehe")
)
val name = "myhive"
val defaultDatabase = "mydb"
val hiveConfDir = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path
val version = "3.1.2"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("mydb")
tableEnv.createTemporaryView("users", stream, 'id, 'name)
tableEnv.executeSql("insert into t_user select id, name from users")
tableEnv.executeSql("select * from t_user")
}
}
2 一個復雜一點的程序
import java.sql.Timestamp import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog object TestHiveStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val stream = env.fromElements( ("1", 1000, new Timestamp(1000L)), ("2", 2000, new Timestamp(2000L)), ("3", 3000, new Timestamp(3000L)) ) val name = "myhive" val defaultDatabase = "mydb" val hiveConfDir = "/Users/yuanzuo/Downloads/apache-hive-3.1.2-bin/conf" // a local path val version = "3.1.2" val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version) tableEnv.registerCatalog("myhive", hive) // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive") tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tableEnv.useDatabase("mydb") tableEnv.createTemporaryView("users", stream, 'userId, 'amount, 'ts) val hiveSql = "CREATE external TABLE fs_table (\n" + " user_id STRING,\n" + " order_amount DOUBLE" + ") partitioned by (dt string,h string,m string) " + "stored as ORC " + "TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + " 'sink.partition-commit.delay'='0s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.policy.kind'='metastore'" + ")" tableEnv.executeSql(hiveSql) val insertSql = "insert into fs_table SELECT userId, amount, " + " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users" tableEnv.executeSql(insertSql) } }
3 徹底重置hadoop和hive的方法
stop-all.sh hadoop namenode -format # 在mysql中刪除hive的元數據庫 start-all.sh hadoop fs -mkdir /tmp hadoop fs -mkdir -p /user/hive/warehouse hadoop fs -chmod g+w /tmp hadoop fs -chmod g+w /user/hive/warehouse schematool -dbType mysql -initSchema hive --service metastore hive