Flink 讀寫 iceberg


iceberg 0.11 發布的時候稍微嘗試了一下,發現實際並沒有說的那么厲害,很多功能其實還在開發中(比如: upsert)

貼段之前寫的 flink sql:


# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

./sql-client.sh embedded -j ../lib/iceberg-flink-runtime-0.11.1.jar shell

CREATE CATALOG t_iceberg_hadoop_catalog_1 WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://thinkpad:8020/tmp/iceberg/flink/t_iceberg_hadoop_catalog_1',
  'property-version'='1'
);

# 這樣也可以
CREATE CATALOG t_iceberg_hadoop_catalog_2 WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/hadoop_catelog/t_iceberg_hadoop_catalog_2',
  'property-version'='1'
);


CREATE DATABASE iceberg_db;
USE iceberg_db;

CREATE TABLE iceberg_db.t_iceberg_sample_1 (
    id BIGINT COMMENT 'unique id',
    data STRING
)WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://thinkpad:8020/tmp/iceberg/flink/hadoop_catalog/iceberg_db/t_iceberg_sample_1',
  'property-version'='1'
);

# insert into iceberg_db.t_iceberg_sample_1(id, data) values(10, '2021-04-29 17:38:00'); # 失敗

SELECT * FROM iceberg_db.t_iceberg_sample_1 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

注: 貌似沒有 hive catalog,只能放在 hadoop 上面,不支持 upsert

iceberg master

github 下載 iceberg master 代碼,編譯了 Flink 1.14、1.13 版本(1.14 遇到包沖突,本次使用 1.13 版本)

iceberg 的包管理工具是 gradle,編譯后 iceberg 的包在 : iceberg/flink/v1.13/flink-runtime/build/libs/iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar

把包放到 flink lib 目錄:

/opt/flink-1.13.2$ ls lib/
flink-connector-hbase-2.2_2.11-1.13.2.jar   flink-json-1.13.2.jar                           hbase-client-2.1.1.jar                hbase-shaded-protobuf-2.1.0.jar                 log4j-api-2.12.1.jar
flink-connector-hbase-base_2.11-1.13.2.jar  flink-shaded-hadoop-2-uber-2.8.3-10.0.jar       hbase-common-2.1.1.jar                htrace-core4-4.2.0-incubating.jar               log4j-core-2.12.1.jar
flink-connector-kafka_2.11-1.13.2.jar       flink-shaded-zookeeper-3.4.14.jar               hbase-protocol-2.1.1.jar              hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar      log4j-slf4j-impl-2.12.1.jar
flink-connector-mysql-cdc-1.4.0.jar         flink-sql-connector-hive-2.3.6_2.11-1.13.2.jar  hbase-protocol-shaded-2.1.1.jar       iceberg-flink-runtime-1.13-0.13.0-SNAPSHOT.jar  metrics-core-3.2.1.jar
flink-csv-1.13.2.jar                        flink-table_2.11-1.13.2.jar                     hbase-shaded-miscellaneous-2.1.0.jar  kafka-clients-2.2.0.jar
flink-dist_2.11-1.13.2.jar                  flink-table-blink_2.11-1.13.2.jar               hbase-shaded-netty-2.1.0.jar          log4j-1.2-api-2.12.1.jar

注意:如果要使用 hive catalog 需要 flink-sql-connector-hive 對應版本的 jar 包

啟動 hive server2 & matestore

hive --service metastore
hive --service hiveserver2

啟動前別忘了先把 hive 的數據庫啟動起來

建表語句:


CREATE CATALOG ice WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://thinkpad:9083',
  'clients'='5',
  'property-version'='2',
  'warehouse'='hdfs://thinkpad:8020/user/hive/datalake/ice'
);

-- use catalog
use catalog ice;
-- create database
create database ice;
-- use database;
use ice;
-- create iceberg table
CREATE TABLE ice1.ice.user_log_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts timestamp(3)
   ,PRIMARY KEY (user_id) NOT ENFORCED
);

寫 iceberg

啟動 yarn-session

@thinkpad:/opt/flink-1.13.2$ ./bin/yarn-session.sh -d -nm ice
# 使用啟動 yarn application 啟動 sql-client
@thinkpad:/opt/flink-1.13.2$ ./bin/sql-client.sh embedded -s application_1640912648992_0001


執行 sql,讀取 kafka 數據,寫入 iceberg

## 創建 catalog
Flink SQL> CREATE CATALOG ice WITH (
>   'type'='iceberg',
>   'catalog-type'='hive',
>   'uri'='thrift://thinkpad:9083',
>   'clients'='5',
>   'property-version'='2',
>   'warehouse'='hdfs://thinkpad:8020/user/hive/datalake/ice'
> );
[INFO] Execute statement succeed.

Flink SQL> use catalog ice;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
|          default |
| default_database |
|           dl_ods |
|            flink |
|              ice |
+------------------+
5 rows in set

Flink SQL> use ice;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE ice.ice.user_log_sink (
>   user_id STRING
>   ,item_id STRING
>   ,category_id STRING
>   ,behavior STRING
>   ,ts timestamp(3)
>    ,PRIMARY KEY (user_id) NOT ENFORCED
> );
[INFO] Execute statement succeed.


# 切換到 default catalog,創建 kakfa 表
Flink SQL> use catalog default_catalog;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE user_log (
>   user_id VARCHAR
>   ,item_id VARCHAR
>   ,category_id VARCHAR
>   ,behavior VARCHAR
>   ,ts TIMESTAMP(3)
>   ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
>   'connector' = 'kafka'
>   ,'topic' = 'user_log'
>   ,'properties.bootstrap.servers' = 'localhost:9092'
>   ,'properties.group.id' = 'user_log'
>   ,'scan.startup.mode' = 'latest-offset'
>   ,'format' = 'json'
> );
[INFO] Execute statement succeed.


## 執行 insert 語句
Flink SQL> insert into ice.ice.user_log_sink
> SELECT user_id, item_id, category_id, behavior, ts
> FROM user_log;
[INFO] Submitting SQL update statement to the cluster...
2021-12-31 09:41:17,315 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.13.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-12-31 09:41:17,361 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at thinkpad/127.0.0.1:8032
2021-12-31 09:41:17,427 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-12-31 09:41:17,428 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-12-31 09:41:17,444 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface localhost:41255 of application 'application_1640912648992_0001'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e2da288cd6e2c7919420c74d555e6ad7


查看 flink web ui:

注意: Checkpoint: flink 任務 checkpoint 的時候才真正往 iceberg 寫數據

直接在 sql-client 寫入數據:


Flink SQL> insert into ice.ice.user_log_sink values('1','item','catagroy','behavior',now());
[INFO] Submitting SQL update statement to the cluster...
2021-12-31 09:49:03,336 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at thinkpad/127.0.0.1:8032
2021-12-31 09:49:03,336 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-12-31 09:49:03,337 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-12-31 09:49:03,339 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface localhost:41255 of application 'application_1640912648992_0001'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 372572a67492c0f5fcef827309c0d4ff


kafka 寫入數據樣例:


{"category_id":52,"user_id":"1","item_id":"52","behavior":"pv","ts":"2021-12-31 09:54:42.617"}
{"category_id":14,"user_id":"2","item_id":"114","behavior":"pv","ts":"2021-12-31 09:54:43.849"}
{"category_id":61,"user_id":"3","item_id":"61","behavior":"buy","ts":"2021-12-31 09:54:44.852"}
{"category_id":41,"user_id":"4","item_id":"341","behavior":"pv","ts":"2021-12-31 09:54:45.853"}
{"category_id":71,"user_id":"5","item_id":"471","behavior":"buy","ts":"2021-12-31 09:54:46.855"}

讀 iceberg

再開一個 sql-client 查看寫入 iceberg 的數據


Flink SQL> CREATE CATALOG ice WITH (
>   'type'='iceberg',
>   'catalog-type'='hive',
>   'uri'='thrift://thinkpad:9083',
>   'clients'='5',
>   'property-version'='1',
>   'warehouse'='hdfs://thinkpad:8020/user/hive/datalake/ice'
> );
[INFO] Execute statement succeed.

Flink SQL> use catalog ice;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
|          default |
| default_database |
|           dl_ods |
|            flink |
|              ice |
+------------------+
5 rows in set

Flink SQL> use ice;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+---------------+
|    table name |
+---------------+
|   flink_table |
|        sample |
| user_log_sink |
+---------------+
3 rows in set

Flink SQL> SET table.dynamic-table-options.enabled=true;
[INFO] Session property has been set.

Flink SQL> SELECT * FROM user_log_sink /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
2021-12-31 09:48:02,948 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink-1.13.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-12-31 09:48:02,995 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at thinkpad/127.0.0.1:8032
2021-12-31 09:48:03,066 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-12-31 09:48:03,067 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-12-31 09:48:03,081 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface localhost:41255 of application 'application_1640912648992_0001'.
[INFO] Result retrieval cancelled.


iceberg upsert 功能

比較遺憾的是 iceberg 最新 release 版本 0.12.1 flink 還不支持 upsert 功能

master 版本 flink sql 已經支持流式寫入的 upsert(表設置主鍵,添加表屬性: 'format-version' = '2' 和 'write.upsert.enabled' = 'true') 功能,但是比較遺憾的是,還沒有支持流式的讀取 upsert 的表,只能 batch 讀

upsert 建表語句:


CREATE CATALOG ice WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://thinkpad:9083',
  'clients'='5',
  'property-version'='2',
  'warehouse'='hdfs://thinkpad:8020/user/hive/datalake/ice.db'
);

CREATE TABLE ice.ice.user_log_sink (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts timestamp(3)
  ,PRIMARY KEY (user_id) NOT ENFORCED
)WITH (
    'format-version' = '2'
    ,'write.upsert.enabled' = 'true'
);


-- streaming sql, insert into mysql table
insert into ice.ice.user_log_sink
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log

寫入數據:

{"category_id":97,"user_id":"2","item_id":"97","behavior":"pv","ts":"2022-01-05 10:18:50.445"}
{"category_id":11,"user_id":"3","item_id":"911","behavior":"cart","ts":"2022-01-05 10:18:51.447"}
{"category_id":7,"user_id":"4","item_id":"607","behavior":"pv","ts":"2022-01-05 10:18:52.450"}
{"category_id":31,"user_id":"5","item_id":"131","behavior":"pv","ts":"2022-01-05 10:18:53.453"}
{"category_id":79,"user_id":"6","item_id":"579","behavior":"buy","ts":"2022-01-05 10:18:54.455"}
{"category_id":79,"user_id":"7","item_id":"79","behavior":"cart","ts":"2022-01-05 10:18:55.459"}
{"category_id":85,"user_id":"8","item_id":"185","behavior":"buy","ts":"2022-01-05 10:18:56.463"}
{"category_id":74,"user_id":"9","item_id":"174","behavior":"pv","ts":"2022-01-05 10:18:57.464"}
{"category_id":50,"user_id":"10","item_id":"950","behavior":"pv","ts":"2022-01-05 10:18:58.466"}

批模式查詢:

# 開啟動態參數
SET table.dynamic-table-options.enabled=true;
# 設置 批 模式,需要關閉 checkpoint
SET execution.runtime-mode = batch;
# 查詢表
select * from ice.ice.user_log_sink;

sql 查詢結果:

繼續寫數據,看upsert 結果(看數據時間)

異常

iceberg v1 表設置主鍵,有重復數據報錯:


2021-12-29 17:15:59
java.lang.IllegalArgumentException: Cannot write delete files in a v1 table
  at org.apache.iceberg.ManifestFiles.writeDeleteManifest(ManifestFiles.java:154)
  at org.apache.iceberg.SnapshotProducer.newDeleteManifestWriter(SnapshotProducer.java:374)
  at org.apache.iceberg.MergingSnapshotProducer.lambda$newDeleteFilesAsManifests$8(MergingSnapshotProducer.java:631)
  at java.util.HashMap.forEach(HashMap.java:1289)
  at org.apache.iceberg.MergingSnapshotProducer.newDeleteFilesAsManifests(MergingSnapshotProducer.java:628)
  at org.apache.iceberg.MergingSnapshotProducer.prepareDeleteManifests(MergingSnapshotProducer.java:614)
  at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:490)
  at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:164)
  at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:283)
  at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:405)
  at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:215)
  at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:199)
  at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:191)
  at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:282)
  at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:312)
  at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:299)
  at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:218)
  at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:153)
  at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
  at java.lang.Thread.run(Thread.java:748)

解決: iceberg v1 表不支持 upsert

2022-01-05 09:11:07
java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (178025383574913414, 6013983871507259597]
  at org.apache.iceberg.IncrementalDataTableScan.snapshotsWithin(IncrementalDataTableScan.java:121)
  at org.apache.iceberg.IncrementalDataTableScan.planFiles(IncrementalDataTableScan.java:73)
  at org.apache.iceberg.BaseTableScan.planTasks(BaseTableScan.java:204)
  at org.apache.iceberg.DataTableScan.planTasks(DataTableScan.java:30)
  at org.apache.iceberg.flink.source.FlinkSplitGenerator.tasks(FlinkSplitGenerator.java:86)
  at org.apache.iceberg.flink.source.FlinkSplitGenerator.createInputSplits(FlinkSplitGenerator.java:38)
  at org.apache.iceberg.flink.source.StreamingMonitorFunction.monitorAndForwardSplits(StreamingMonitorFunction.java:143)
  at org.apache.iceberg.flink.source.StreamingMonitorFunction.run(StreamingMonitorFunction.java:121)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
  

解決: iceberg v2 表現在只支持 batch 模式讀,流讀的pr 還是 review: https://github.com/apache/iceberg/pull/3095

  • 注: v1/v2 表代表 iceberg 表 flink 寫入版本,v1 不支持 upsert,v2 是新版本支持 upsert

完整sql 參考: github sqlSubmit

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM