0 簡介
Catalog 提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函數和信息。
數據處理最關鍵的方面之一是管理元數據。 元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。 元數據也可以是持久化的,例如 Hive Metastore 中的元數據。Catalog 提供了一個統一的API,用於管理元數據,並使其可以從 Table API 和 SQL 查詢語句中來訪問。
1 Catalog 類型
1.1 GenericInMemoryCatalog
GenericInMemoryCatalog 是基於內存實現的 Catalog,所有元數據只在 session 的生命周期內可用。
1.2 JdbcCatalog
JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協議連接到關系數據庫。PostgresCatalog 是當前實現的唯一一種 JDBC Catalog。 參考 JdbcCatalog 文檔 獲取關於配置 JDBC catalog 的詳細信息。
1.3 HiveCatalog
HiveCatalog 有兩個用途:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口。 Flink 的 Hive 文檔 提供了有關設置 HiveCatalog 以及訪問現有 Hive 元數據的詳細信息。
警告 Hive Metastore 以小寫形式存儲所有元數據對象名稱。而 GenericInMemoryCatalog 區分大小寫。
1.4 用戶自定義 Catalog
Catalog 是可擴展的,用戶可以通過實現 Catalog 接口來開發自定義 Catalog。 想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現自定義的 Catalog 之外,還需要為這個 Catalog 實現對應的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用於 SQL CLI 啟動時配置 Catalog。 這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 並初始化相應的 Catalog 實例。
2 如何創建 Flink 表並將其注冊到 Catalog
2.1 使用 SQL DDL
用戶可以使用 DDL 通過 Table API 或者 SQL Client 在 Catalog 中創建表。
val tableEnv = ... // Create a HiveCatalog val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>"); // Register the catalog tableEnv.registerCatalog("myhive", catalog); // Create a catalog database tableEnv.executeSql("CREATE DATABASE mydb WITH (...)"); // Create a catalog table tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)"); tableEnv.listTables(); // should return the tables in current catalog and database.
// the catalog should have been registered via yaml file Flink SQL> CREATE DATABASE mydb WITH (...); Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...); Flink SQL> SHOW TABLES; mytable
2.2 使用 Java/Scala
用戶可以用編程的方式使用Java 或者 Scala 來創建 Catalog 表。
import org.apache.flink.table.api._ import org.apache.flink.table.catalog._ import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.descriptors.Kafka val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build) // Create a HiveCatalog val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>") // Register the catalog tableEnv.registerCatalog("myhive", catalog) // Create a catalog database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...)) // Create a catalog table val schema = TableSchema.builder() .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) .build() catalog.createTable( new ObjectPath("mydb", "mytable"), new CatalogTableImpl( schema, new Kafka() .version("0.11") .... .startFromEarlist() .toProperties(), "my comment" ), false ) val tables = catalog.listTables("mydb") // tables should contain "mytable"
3 Catalog API
注意:這里只列出了編程方式的 Catalog API,用戶可以使用 SQL DDL 實現許多相同的功能。 關於 DDL 的詳細信息請參考 SQL CREATE DDL。
3.1 數據庫操作
// create database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false); // drop database catalog.dropDatabase("mydb", false); // alter database catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false); // get databse catalog.getDatabase("mydb"); // check if a database exist catalog.databaseExists("mydb"); // list databases in a catalog catalog.listDatabases("mycatalog");
3.2 表操作
// create table catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); // drop table catalog.dropTable(new ObjectPath("mydb", "mytable"), false); // alter table catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); // rename table catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table"); // get table catalog.getTable("mytable"); // check if a table exist or not catalog.tableExists("mytable"); // list tables in a database catalog.listTables("mydb");
3.3 視圖操作
// create view catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false); // drop view catalog.dropTable(new ObjectPath("mydb", "myview"), false); // alter view catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false); // rename view catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false); // get view catalog.getTable("myview"); // check if a view exist or not catalog.tableExists("mytable"); // list views in a database catalog.listViews("mydb");
3.4 分區操作
// create view catalog.createPartition( new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), new CatalogPartitionImpl(...), false); // drop partition catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false); // alter partition catalog.alterPartition( new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), new CatalogPartitionImpl(...), false); // get partition catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...)); // check if a partition exist or not catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...)); // list partitions of a table catalog.listPartitions(new ObjectPath("mydb", "mytable")); // list partitions of a table under a give partition spec catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...)); // list partitions of a table by expression filter catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
3.5 函數操作
// create function catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false); // drop function catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false); // alter function catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false); // get function catalog.getFunction("myfunc"); // check if a function exist or not catalog.functionExists("myfunc"); // list functions in a database catalog.listFunctions("mydb");
4 通過 Table API 和 SQL Client 操作 Catalog
4.1 注冊 Catalog
用戶可以訪問默認創建的內存 Catalog default_catalog,這個 Catalog 默認擁有一個默認數據庫 default_database。 用戶也可以注冊其他的 Catalog 到現有的 Flink 會話中。
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
YAML
使用 YAML 定義的 Catalog 必須提供 type 屬性,以表示指定的 Catalog 類型。 以下幾種類型可以直接使用。
| Catalog | Type Value |
|---|---|
| GenericInMemory | generic_in_memory |
| Hive | hive |
catalogs: - name: myCatalog type: custom_catalog hive-conf-dir: ...
4.2 修改當前的 Catalog 和數據庫
Flink 始終在當前的 Catalog 和數據庫中尋找表、視圖和 UDF。
tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;
通過提供全限定名 catalog.database.object 來訪問不在當前 Catalog 中的元數據信息。
tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
4.3 列出可用的 Catalog
tableEnv.listCatalogs();
Flink SQL> show catalogs;
4.4 列出可用的數據庫
tableEnv.listDatabases();
Flink SQL> show databases;
4.5 列出可用的表
tableEnv.listTables();
Flink SQL> show tables;
