Flink基礎(四十一):FLINK-SQL應用場景(2)Catalogs


0 簡介

Catalog 提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函數和信息。

數據處理最關鍵的方面之一是管理元數據。 元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。 元數據也可以是持久化的,例如 Hive Metastore 中的元數據。Catalog 提供了一個統一的API,用於管理元數據,並使其可以從 Table API 和 SQL 查詢語句中來訪問。

1 Catalog 類型

1.1 GenericInMemoryCatalog

GenericInMemoryCatalog 是基於內存實現的 Catalog,所有元數據只在 session 的生命周期內可用。

1.2 JdbcCatalog

JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協議連接到關系數據庫。PostgresCatalog 是當前實現的唯一一種 JDBC Catalog。 參考 JdbcCatalog 文檔 獲取關於配置 JDBC catalog 的詳細信息。

1.3 HiveCatalog

HiveCatalog 有兩個用途:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口。 Flink 的 Hive 文檔 提供了有關設置 HiveCatalog 以及訪問現有 Hive 元數據的詳細信息。

警告 Hive Metastore 以小寫形式存儲所有元數據對象名稱。而 GenericInMemoryCatalog 區分大小寫。

1.4 用戶自定義 Catalog

Catalog 是可擴展的,用戶可以通過實現 Catalog 接口來開發自定義 Catalog。 想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現自定義的 Catalog 之外,還需要為這個 Catalog 實現對應的 CatalogFactory 接口。

CatalogFactory 定義了一組屬性,用於 SQL CLI 啟動時配置 Catalog。 這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 並初始化相應的 Catalog 實例。

2.1 使用 SQL DDL

用戶可以使用 DDL 通過 Table API 或者 SQL Client 在 Catalog 中創建表。

val tableEnv = ...

// Create a HiveCatalog 
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");

// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

tableEnv.listTables(); // should return the tables in current catalog and database.
// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);

Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);

Flink SQL> SHOW TABLES;
mytable

2.2 使用 Java/Scala

用戶可以用編程的方式使用Java 或者 Scala 來創建 Catalog 表。

import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors.Kafka

val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)

// Create a HiveCatalog
val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>")

// Register the catalog
tableEnv.registerCatalog("myhive", catalog)

// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))

// Create a catalog table
val schema = TableSchema.builder()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT())
    .build()

catalog.createTable(
        new ObjectPath("mydb", "mytable"),
        new CatalogTableImpl(
            schema,
            new Kafka()
                .version("0.11")
                ....
                .startFromEarlist()
                .toProperties(),
            "my comment"
        ),
        false
    )

val tables = catalog.listTables("mydb") // tables should contain "mytable"

3 Catalog API

注意:這里只列出了編程方式的 Catalog API,用戶可以使用 SQL DDL 實現許多相同的功能。 關於 DDL 的詳細信息請參考 SQL CREATE DDL

3.1 數據庫操作

// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);

// drop database
catalog.dropDatabase("mydb", false);

// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);

// get databse
catalog.getDatabase("mydb");

// check if a database exist
catalog.databaseExists("mydb");

// list databases in a catalog
catalog.listDatabases("mycatalog");

3.2 表操作

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// get table
catalog.getTable("mytable");

// check if a table exist or not
catalog.tableExists("mytable");

// list tables in a database
catalog.listTables("mydb");

3.3 視圖操作

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// get view
catalog.getTable("myview");

// check if a view exist or not
catalog.tableExists("mytable");

// list views in a database
catalog.listViews("mydb");

3.4 分區操作

// create view
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// alter partition
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

3.5 函數操作

// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// get function
catalog.getFunction("myfunc");

// check if a function exist or not
catalog.functionExists("myfunc");

// list functions in a database
catalog.listFunctions("mydb");

4 通過 Table API 和 SQL Client 操作 Catalog

4.1 注冊 Catalog

用戶可以訪問默認創建的內存 Catalog default_catalog,這個 Catalog 默認擁有一個默認數據庫 default_database。 用戶也可以注冊其他的 Catalog 到現有的 Flink 會話中。

tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

YAML

使用 YAML 定義的 Catalog 必須提供 type 屬性,以表示指定的 Catalog 類型。 以下幾種類型可以直接使用。

Catalog Type Value
GenericInMemory generic_in_memory
Hive hive
catalogs: - name: myCatalog type: custom_catalog hive-conf-dir: ...

4.2 修改當前的 Catalog 和數據庫

Flink 始終在當前的 Catalog 和數據庫中尋找表、視圖和 UDF。

tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;

通過提供全限定名 catalog.database.object 來訪問不在當前 Catalog 中的元數據信息。

tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");

4.3 列出可用的 Catalog

tableEnv.listCatalogs();

Flink SQL> show catalogs;

4.4 列出可用的數據庫

tableEnv.listDatabases();
Flink SQL> show databases;
 
        

4.5 列出可用的表

tableEnv.listTables();
Flink SQL> show tables;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM