Flink集成Iceberg簡介


1. 概述

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

官方的定義,iceberg是一種表格式。我們可以簡單理解為他是基於計算層(flink、spark)和存儲層(orc、parqurt)的一個中間層,我們可以把它定義成一種“數據組織格式”,Iceberg將其稱之為“表格式”也是表達類似的含義。他與底層的存儲格式(比如ORC、Parquet之類的列式存儲格式)最大的區別是,它並不定義數據存儲方式,而是定義了數據、元數據的組織方式,向上提供統一的“表”的語義。它構建在數據存儲格式之上,其底層的數據存儲仍然使用Parquet、ORC等進行存儲。在hive建立一個iceberg格式的表。用flink或者spark寫入iceberg,然后再通過其他方式來讀取這個表,比如spark、flink、presto等。

 

Iceberg的架構和實現並未綁定於某一特定引擎,它實現了通用的數據組織格式,利用此格式可以方便地與不同引擎(如Flink、Hive、Spark)對接。

2. Iceberg優勢

  • 增量讀取處理能力:Iceberg支持通過流式方式讀取增量數據,支持Structed Streaming以及Flink table Source;
  • 支持事務(ACID),上游數據寫入即可見,不影響當前數據處理任務,簡化ETL;提供upsert和merge into能力,可以極大地縮小數據入庫延遲;
  • 可擴展的元數據,快照隔離以及對於文件列表的所有修改都是原子操作;
  • 同時支持流批處理、支持多種存儲格式和靈活的文件組織:提供了基於流式的增量計算模型和基於批處理的全量表計算模型。批處理和流任務可以使用相同的存儲模型,數據不再孤立;Iceberg支持隱藏分區和分區進化,方便業務進行數據分區策略更新。支持Parquet、Avro以及ORC等存儲格式。
  • 支持多種計算引擎,優秀的內核抽象使之不綁定特定的計算引擎,目前Iceberg支持的計算引擎有Spark、Flink、Presto以及Hive。

3. Flink+ Iceberg搭建使用

Apache Iceberg支持Apache Flink的DataStream Api和Table Api寫記錄進iceberg表。當前,我們只集成Iceberg和apache flink 1.11.x。

  

3.1. 准備

為了在flink中創建iceberg表,我們要求使用flink SQL client,因為這對使用者們來說更容易去理解概念。

准備兩個jar包:

啟動flink sql client,不帶hive connector jar包,可以創建hadoop catalog如下:

./bin/sql-client.sh embedded \
    -j /data/flink-1.11.2/otherlib/iceberg-flink-runtime-0.10.0.jar \
    shell

啟動flink sql client,帶hive connector jar包,可以創建hadoop catalog和hive catalog如下:

./bin/sql-client.sh embedded \
    -j /data/flink-1.11.2/otherlib/iceberg-flink-runtime-0.10.0.jar \
    -j /data/flink-1.11.2/otherlib/flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar \
    shell

  注:

    iceberg-flink-runtime-0.10.0.jar和flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar不能放在flink安裝目錄的lib下,需要另外放置在別的目錄

3.2. 創建catalogs和使用catalogs

Flink1.11支持通過flink sql創建catalogs。catalog是Iceberg對表進行管理(create、drop、rename等)的一個組件。目前Iceberg主要支持HiveCatalog和HadoopCatalog兩種Catalog。其中HiveCatalog將當前表metadata文件路徑存儲在hive Metastore,這個表metadata文件是所有讀寫Iceberg表的入口,所以每次讀寫Iceberg表都需要先從hive Metastore中取出對應的表metadata文件路徑,然后再解析這個Metadata文件進行接下來的操作。而HadoopCatalog將當前表metadata文件路徑記錄在一個文件目錄下,因此不需要連接hive Metastore。

3.2.1 Hive catalog

創建一個名為hive_catalog的 iceberg catalog ,用來從 hive metastore 中加載表。

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);
  • type: 只能使用iceberg,用於 iceberg 表格式。(必須)
  • catalog-type: Iceberg 當前支持hive或hadoopcatalog 類型。(必須)
  • uri: Hive metastore 的 thrift URI。 (必須)
  • clients: Hive metastore 客戶端池大小,默認值為 2。 (可選)
  • property-version: 版本號來描述屬性版本。此屬性可用於在屬性格式發生更改時進行向后兼容。當前的屬性版本是 1。(可選)
  • warehouse: Hive 倉庫位置, 如果既不將 hive-conf-dir 設置為指定包含 hive-site.xml 配置文件的位置,也不將正確的 hive-site.xml 添加到類路徑,則用戶應指定此路徑。
  • hive-conf-dir: 包含 Hive-site.xml 配置文件的目錄的路徑,該配置文件將用於提供自定義的 Hive 配置值。 如果在創建 iceberg catalog 時同時設置 hive-conf-dir 和 warehouse,那么將使用 warehouse 值覆蓋 < hive-conf-dir >/hive-site.xml (或者 classpath 中的 hive 配置文件)中的 hive.metastore.warehouse.dir 的值。

3.2.2 Hadoop catalog

Iceberg 還支持 HDFS 中基於目錄的 catalog ,可以使用’catalog-type’='hadoop’進行配置:

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nn:8020/warehouse/path',
  'property-version'='1'
);
  • warehouse:hdfs目錄存儲元數據文件和數據文件。(必須)

我們可以執行sql命令USE CATALOG hive_catalog來設置當前的catalog。

3.2.3 Custom catalog

Flink也支持通過指定catalog-impl屬性來加載自定義的Iceberg catalog接口。當catalog-impl設置了,catalog-type的值可以忽略,這里有個例子:

CREATE CATALOG my_catalog WITH (
  'type'='iceberg',
  'catalog-impl'='com.my.custom.CatalogImpl',
  'my-additional-catalog-config'='my-value'
);

3.2.4 Create through YAML config

在啟動SQL客戶端之前,Catalogs可以通過在sql-client-defaults.yaml文件中注冊。這里有個例子:

catalogs: 
  - name: my_catalog
    type: iceberg
    catalog-type: hadoop
    warehouse: hdfs://nn:8020/warehouse/path

3.2.5 Iceberg表的目錄組織形式

1. HiveCatalog

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs
Found 2 items
drwxrwxrwx   - hadoop supergroup          0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data
drwxrwxrwx   - hadoop supergroup          0 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata

其中data目錄下存儲數據文件,metadata目錄下存儲元數據文件。

2. metadata目錄

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata
Found 4 items
-rw-r--r--   1 hadoop supergroup       1448 2020-06-08 11:31 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00000-e7c1e6ce-8eb9-4faf-a176-bd94dec3c0e4.metadata.json
-rw-r--r--   1 hadoop supergroup       2217 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json
-rw-r--r--   1 hadoop supergroup       5040 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro
-rw-r--r--   1 hadoop supergroup       2567 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro

其中00001-62ade8ab-c1cf-40d3-bc21-fd5027bc3a55.metadata.json中存儲表的shcema、partition spec以及當前snapshot manifests文件路徑。snap-6771375506965563160-1-bb641961-162a-49a8-b567-885430d4e799.avro存儲manifest文件路徑。bb641961-162a-49a8-b567-885430d4e799-m0.avro記錄本次提交的文件以及文件級別元數據。

3. data目錄

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click
Found 1 items
-rw-r--r--   1 hadoop supergroup       1425 2020-06-08 12:20 /libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet

4. HadoopCatalog

Hadoopcatalog與Hivecatalog的data目錄完全相同,metadata目錄下文件稍有不同,HadoopCatalog管理的metadata目錄如下所示:

hadoop@xxx:~$ hdfs dfs -ls /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata
Found 5 items
-rw-r--r--   1 hadoop supergroup       5064 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/b222d277-2692-4e35-9327-3716dec9f070-m0.avro
-rw-r--r--   1 hadoop supergroup       2591 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/snap-3124052841098464551-1-b222d277-2692-4e35-9327-3716dec9f070.avro
-rw-r--r--   1 hadoop supergroup       1476 2020-06-08 17:23 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v1.metadata.json
-rw-r--r--   1 hadoop supergroup       2261 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/v2.metadata.json
-rw-r--r--   1 hadoop supergroup          1 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text
 其中文件version-hint.text中存儲當前iceberg表的最新snapshot_id,如下所示:
hadoop@xxx:~$ hdfs dfs -cat /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text 2

說明該表的最新snapshot_id是2,即對應的snapshot元數據文件是v2.metadata.json,解析v2.metadata.json可以獲取到該表當前最新snapshot對應的scheme、partition spec、父snapshot以及該snapshot對應的manifestList文件路徑等,因此version-hint.text是HadoopCatalog獲取當前snapshot版本的入口。

HiveCatalog的metadata目錄下並沒有version-hint.text文件,那它獲取當前snapshot版本的入口在哪里呢?它的入口在Metastore中的schema里面,可以在HiveCatalog建表schema中的TBPROPERTIES中有個key是“metadata_location”,對應的value就是當前最新的snapshot文件。因此,有兩點需要說明:

  • HiveCatalog創建的表,每次提交寫入文件生成新的snapshot后都需要更新Metastore中的metadata_location字段。
  • HiveCatalog和HadoopCatalog不能混用。即使用HiveCatalog創建的表,再使用HadoopCatalog是不能正常加載的,反之亦然。

3.2.6 為什么選擇HadoopCatalog

上面說到Iceberg目前支持兩種Catalog,而且兩種Catalog相互不兼容。那這里有兩個問題:

  1. 社區是出於什么考慮實現兩種不兼容的Catalog?

  2. 因為兩者不兼容,必須選擇其一作為系統唯一的Catalog,那是選擇HiveCatalog還是HadoopCatalog,為什么?

先回答第一個問題。社區是出於什么考慮實現兩種不兼容的Catalog?

在回答這個問題之前,首先回顧一下上一篇文章中介紹到的基於HadoopCatalog,Iceberg實現數據寫入提交的ACID機制,最終的結論是使用了樂觀鎖機制和HDFS rename的原子性一起保障寫入提交的ACID。如果某些文件系統比如S3不支持rename的原子性呢?那就需要另外一種機制保障寫入提交的ACID,HiveCatalog就是另一種不依賴文件系統支持,但是可以提供ACID支持的方案,它在每次提交的時候都更新MySQL中同一行記錄,這樣的更新MySQL本身是可以保證ACID的。這就是社區為什么會支持兩種不兼容Catalog的本質原因。

再來回答第二個問題。HadoopCatalog依賴於HDFS提供的rename原子性語義,而HiveCatalog不依賴於任何文件系統的rename原子性語義支持,因此基於HiveCatalog的表不僅可以支持HDFS,而且可以支持s3、oss等其他文件系統。但是HadoopCatalog可以認為只支持HDFS表,比較難以遷移到其他文件系統。但是HadoopCatalog寫入提交的過程只依賴HDFS,不和Metastore/MySQL交互,而HiveCatalog每次提交都需要和Metastore/MySQL交互,可以認為是強依賴於Metastore,如果Metastore有異常,基於HiveCatalog的Iceberg表的寫入和查詢會有問題。相反,HadoopCatalog並不依賴於Metastore,即使Metastore有異常,也不影響Iceberg表的寫入和查詢。

考慮到我們目前主要還是依賴HDFS,同時不想強依賴於Metastore,所以我們選擇HadoopCatalog作為我們系統唯一的Catalog。即使有一天,想要把HDFS上的表遷移到S3上去,也是可以辦到的,大家想想,無論是HadoopCatalog還是HiveCatalog,數據文件和元數據文件本身都是相同的,只是標記當前最新的snapshot的入口不一樣,那只需要簡單的手動變換一下入口就可以實現Catalog的切換,切換到HiveCatalog上之后,就可以擺脫HDFS的依賴,問題並不大。

3.3. DDL命令

  • 創建數據庫

默認的,iceberg將會在flink中使用default數據庫。如果我們不想在default數據庫下面創建表,可以使用下面的例子去創建別的數據庫。

CREATE DATABASE iceberg_db;
USE iceberg_db;
  • 創建表
CREATE TABLE hive_catalog.default.sample (
    id BIGINT COMMENT 'unique id',
    data STRING
);

注:default為數據庫

表創建命令支持最常用的 flink create 子句,包括:

PARTITION BY (column1, column2, ...) 配置分區,apache flik 還不支持隱藏分區。

COMMENT 'table document'設置一個表描述。

WITH ('key'='value', ...)設置將存儲在 apache iceberg 表屬性中的表配置。

目前,它不支持計算列、主鍵和水印定義等。

  • PARTITIONED BY 分區

要創建分區表,使用 PARTITIONED BY:

CREATE TABLE hive_catalog.default.sample (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Apache Iceberg支持隱藏分區但apache flink不支持在列上按照函數分區,因此我們現在沒有途徑在flink DDL上支持隱藏分區,我們在未來將會改善flink DDL。

  • CREATE TABLE LIKE

為了創建和另一張表具有相同結構、分區和表屬性的一張表,使用CREATE TAABLE LIKE。

CREATE TABLE hive_catalog.default.sample (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE  hive_catalog.default.sample_like LIKE hive_catalog.default.sample;

為了更詳細,可以查看Flink CREATE TABLE documentation

  • ALTER TABLE 更改表

Iceberg 現在只支持在 flink 1.11中修改表屬性。

ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
  • ALTER TABLE .. RENAME TO
ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;
  • DROP TABLE 刪除表
DROP TABLE hive_catalog.default.sample;

3.4. sql讀寫

3.4.1.  Sql查詢

Iceberg現在支持flink流式讀和批量讀。我們可以執行下面sql命令去把執行類型流式模式切換為批處理模式,如下:

-- Execute the flink job in streaming mode for current session context
SET execution.type = streaming

-- Execute the flink job in batch mode for current session context
SET execution.type = batch

3.4.1.1. Flink批量讀

如果在提交flink批處理作業時想要檢查iceberg表中所有的記錄,你可以執行下面的句子:

-- Execute the flink job in streaming mode for current session context
SET execution.type = batch ;
SELECT * FROM sample;

3.4.1.2. Flink流式讀

Iceberg支持處理flink流式作業中的增量數據,該數據從歷史快照ID開始:

-- Submit the flink job in streaming mode for current session.
SET execution.type = streaming ;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

這些是可以在flink SQL提示選項中為流作業設置的選項:

  • monitor-interval:連續監視新提交的數據文件的時間間隔(默認值:1s)
  • start-snapshot-id:流式作業開始的快照id

3.4.2. Sql寫入

現在Iceberg支持在flink1.11中使用insert into和insert overwrite。

  • INSERT INTO

flink 流作業將新數據追加到表中,使用 INSERT INTO:

INSERT INTO hive_catalog.default.sample VALUES (1, 'a');
INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;
  • INSERT OVERWRITE

要使用查詢結果替換表中的數據,請在批作業中使用 INSERT OVERWRITE (flink 流作業不支持 INSERT OVERWRITE)。覆蓋是 Iceberg 表的原子操作。

具有由 SELECT 查詢生成的行的分區將被替換,例如:

INSERT OVERWRITE sample VALUES (1, 'a');

Iceberg 還支持通過 select 值覆蓋給定的分區:

INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;

對於分區的Iceberg表,當在PARTITION子句中為所有分區設置值時,它將插入到靜態分區中;否則,如果在PARTITON子句中將部分分區列(所有分區列的前綴部分)設置為值,則將查詢結果寫入動態分區。對於未分區的Iceberg表,其數據將被INSERT OVERWRITE完全覆蓋。

3.5. DataStream讀寫數據(Java API)

3.5.1.  DataStream讀數據

Iceberg現在支持使用Java API流式或者批量讀取。

3.5.1.1. 批量讀

這個例子從Iceberg表讀取所有記錄,然后在flink批處理作業中打印到stdout控制台。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(loader)
     .streaming(false)
     .build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

3.5.1.2. 流式讀

這個例子將會讀取從快照id‘3821550127947089987’開始的增量記錄,然后在flink流式作業中打印到stdout控制台中。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(loader)
     .streaming(true)
     .startSnapshotId(3821550127947089987)
     .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg streaming Read");

還有其他選項可以通過Java Api設置,詳情請看FlinkSource#Builder.

3.5.2. DataStream寫數據

Iceberg 支持從不同的 DataStream 輸入寫入 Iceberg 表。

  • Appending data 追加數據

我們支持在本地編寫 DataStream < rowdata > 和 DataStream < Row> 到 sink iceberg 表。

StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .hadoopConf(hadoopConf)
    .build();
env.execute("Test Iceberg DataStream");
  • Overwrite data 重寫數據

為了動態覆蓋現有 Iceberg 表中的數據,我們可以在FlinkSink構建器中設置overwrite標志。

StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .overwrite(true)
    .hadoopConf(hadoopConf)
    .build();
env.execute("Test Iceberg DataStream");

3.6. 檢查表

現在Iceberg不支持在flink Sql中檢查表,我們需要使用 iceberg’s Java API 去讀取Iceberg來得到這些表信息。

3.7. 重寫文件操作

Iceberg可以通過提交flink批作業去提供API重寫小文件變為大文件。flink操作表現與spark的rewriteDataFiles.一樣。

import org.apache.iceberg.flink.actions.Actions;

TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
        .rewriteDataFiles()
        .execute();

更多的重寫文件操作選項文檔,請看RewriteDataFilesAction

3.8. 將來提升

當前flink iceberg整合工作還有下面的特性不支持:

  • 不支持創建帶有隱藏分區的Iceberg表;
  • 不支持創建帶有計算列的Iceberg表;
  • 不支持創建帶有水印的Iceberg表;
  • 不支持添加列,刪除列,重命名列,修改列;

4. Iceberg實例

4.1. 使用編程SQL方式讀寫Iceberg表

4.1.1.  添加依賴

<dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime</artifactId>
            <version>0.10.0</version>
</dependency>

4.1.2.  部分代碼實現

// 使用table api 創建 hadoop catalog
 TableResult tableResult = tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
                "  'type'='iceberg',\n" +
                "  'catalog-type'='hadoop',\n" +
                "  'warehouse'='hdfs://nameservice1/tmp',\n" +
                "  'property-version'='1'\n" +
                ")");
 
        // 使用catalog
        tenv.useCatalog("hadoop_catalog");
        // 創建庫
        tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
        tenv.useDatabase("iceberg_hadoop_db");
 
     
        // 創建iceberg 結果表
        tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");
        tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_001 (\n" +
                "    id BIGINT COMMENT 'unique id',\n" +
                "    data STRING\n" +
                ")");
 
        // 測試寫入
        tenv.executeSql("insert into hadoop_catalog.iceberg_hadoop_db.iceberg_001 select 100,'abc'");

4.1.3. 創建hive的外部表來實時查詢iceberg表

hive> add jar /tmp/iceberg-hive-runtime-0.10.0.jar;
 
hive> CREATE EXTERNAL TABLE tmp.iceberg_001(id bigint,data string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION '/tmp/iceberg_hadoop_db/iceberg_001';
 
hive> select * from tmp.iceberg_001;
OK
100        abc
1001    abcd
Time taken: 0.535 seconds, Fetched: 2 row(s)

4.2. Flink結合Kafka實時寫入Iceberg實踐筆記

4.2.1. 創建Hadoop Catalog的Iceberg 表

// create hadoop catalog
        tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
                "  'type'='iceberg',\n" +
                "  'catalog-type'='hadoop',\n" +
                "  'warehouse'='hdfs://nameservice1/tmp',\n" +
                "  'property-version'='1'\n" +
                ")");
 
        // change catalog
        tenv.useCatalog("hadoop_catalog");
        tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
        tenv.useDatabase("iceberg_hadoop_db");
        // create iceberg result table
        tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002"); 
        tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n" +
                "    user_id STRING COMMENT 'user_id',\n" +
                "    order_amount DOUBLE COMMENT 'order_amount',\n" +
                "    log_ts STRING\n" +
                ")");

4.2.2. 使用Hive Catalog創建Kafka流表

  String HIVE_CATALOG = "myhive";
        String DEFAULT_DATABASE = "tmp";
        String HIVE_CONF_DIR = "/xx/resources";
        Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
        tenv.registerCatalog(HIVE_CATALOG, catalog);
        tenv.useCatalog("myhive");
        // create kafka stream table
        tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
        tenv.executeSql(
                "CREATE TABLE ods_k_2_iceberg (\n" +
                        " user_id STRING,\n" +
                        " order_amount DOUBLE,\n" +
                        " log_ts TIMESTAMP(3),\n" +
                        " WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n" +
                        ") WITH (\n" +
                        "  'connector'='kafka',\n" +
                        "  'topic'='t_kafka_03',\n" +
                        "  'scan.startup.mode'='latest-offset',\n" +
                        "  'properties.bootstrap.servers'='xx:9092',\n" +
                        "  'properties.group.id' = 'testGroup_01',\n" +
                        "  'format'='json'\n" +
                        ")");

4.2.3. 使用SQL連接kafka流表和iceberg 目標表

 System.out.println("---> 3. insert into iceberg  table from kafka stream table .... ");
        tenv.executeSql(
                "INSERT INTO  hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +
                        " SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");

4.2.4. 數據驗證

bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
 
hive> add jar /home/zmbigdata/iceberg-hive-runtime-0.10.0.jar;
hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';
hive> select * from tmp.iceberg_002  limit 5;
a1111    11.0    2020-06-29
a1111    11.0    2020-06-29
a1111    11.0    2020-06-29
a1111    11.0    2020-06-29
a1111    13.0    2020-06-29
Time taken: 0.108 seconds, Fetched: 5 row(s)

 

參考資料:

https://blog.csdn.net/u010834071/article/details/112507474  Flink結合Iceberg的一種實現方式筆記

https://zhengqiang.blog.csdn.net/article/details/112850376  Flink結合Kafka實時寫入Iceberg實踐筆記

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM