數據湖 Iceberg




數據湖(datalake)

傳統數據庫或數據倉庫的特點

  • 存儲和計算綁定
  • 專有數據格式
  • 數據結構類型較為單一
  • 對可靠性、一致性、數據事務要求較高
  • 細粒度的數據權限控制
  • 由於存儲和計算綁定,容易優化,性能較高
  • 擴展性能較一般
  • 預先建模,寫入型 schema,按預設的 schema 讀寫
  • 主要存儲計算處理后的數據
  • 主要用於 Report、BI(有實時性要求)

數據湖的特點

  • 存儲和計算解耦分離,支持多種存儲,支持多種計算引擎
  • 開放通用的數據格式
  • 支持多種數據格式,包括結構化(行列)、非結構化(email、文檔等)、半結構化(CSV、日志、XML、JSON 等)、二進制(圖像、音頻、視頻)
  • 對可靠性、一致性、數據事務要求不高
  • 對數據權限控制要求不高
  • 優化和性能,比存儲計算一體化的方案要差些
  • 低存儲成本,高擴展性
  • 無須提前建模,讀取型 schema,可以在讀取時才確定 schema,靈活度高
  • 主要存儲原始數據,也存儲計算處理后的數據
  • 用於數據分析、機器學習,也可以用於 Report、BI 但性能差些

可以看到,數據湖像是一個,包括多種存儲、多種計算、並解耦了存儲和計算的、主要用於存儲原始數據、做數據分析的、靈活的系統

如果沒有節制地、不加選擇地往數據湖里灌數據,可能會造成數據沼澤,導致混亂,難以挖掘有用信息

數據湖產品有 Iceberg、Delta lake、Hudi 等

對象存儲

傳統的存儲

  • DAS(Direct Attached Storage):直接讀寫本地的磁盤或磁盤陣列
  • SAN(Storage Area Network):多個節點組成專用存儲網絡,能統一管理存儲資源,使存儲和服務隔離
  • NAS(Network Attached Storage): SAN 的存儲資源在網絡,文件系統在本地,而 NAS 的文件系統也在遠端,關聯后訪問遠端文件就像訪問本地文件一樣

對象存儲,起源於雲計算,就是把要存儲的東西作為一個對象 Object 存到雲端,不能直接打開或修改,只能做 PUT(上傳)、GET(下載)、DELETE(刪除)等操作,每個對象有相應的 metadata 來描述這個對象,可以理解為像網盤那樣,可以上傳下載任何東西,但不能直接編輯運行,並且可以對存儲的東西做一些圖片文字的描述(就是 metadata)

對象存儲的優勢(比如對比 HDFS)

  • 易於集群擴展,HDFS 有單點問題,元數據都在 NameNode,只有 DataNode 容易擴展,而對象的存儲,和元數據的管理都是分布式可擴展的,可以無限擴展
  • 對大量小文件的支持好,HDFS 還是因為 NameNode 的限制,大量小文件可能導致 DataNode 尚有空間時也有可能因為沒地方存元數據導致數據無法存儲,而對象存儲就沒有這樣的問題
  • 對象存儲支持多站點部署(異地備份、多機房備份等),而 HDFS 不支持
  • 低存儲開銷,HDFS 需要 3 個副本,而對象存儲通過 EC 糾刪碼等方式只增加 20% 額外空間就能有效備份

可以看到雲對象存儲比起 HDFS 還是有一定優勢的

而以前的數據倉庫數據湖經常使用 HDFS 作為存儲

Iceberg

Apache Iceberg 是一個用於海量數據分析的開源的表格式,相當於一個中間層,使計算和存儲解耦分離,支持多種底層存儲,支持多種上層計算引擎

https://iceberg.apache.org/
https://github.com/apache/iceberg

功能

  • 支持 Parquet、Orc、S3(Simple Storage Service,AWS)、GCP(Google)、Aliyun 等存儲
  • 支持 Spark、Flink、Hive、PrestoDB 等計算引擎
  • 支持 schema 的變更,包括 add、drop、update、rename 等操作
  • 支持數據的 ACID 操作,支持行級數據變更
  • 支持隱式分區 (Hidden Partitioning)
  • 支持分區布局變更 (Partition layout evolution)
  • 支持查詢特定版本的數據
  • 支持版本回滾
  • 支持事務
  • 支持 python 和 java

有些功能可能會受引擎制約導致不支持,比如 hive 不支持數據的 update

感覺文檔不是很全

Schema 變更

Iceberg 支持 Schema 變更操作

  • Add: 添加新的列到表,或添加新的域到內嵌結構
  • Drop: 從表移除列,或從內嵌結構移除域
  • Rename: 重命名列,或內嵌結構的域
  • Update: 改變列的類型,或內嵌結構的域的類型
  • Reorder: 改變列的排序,或內嵌結構的域的排序

schema 的更改只改變 metadata,數據文件不需要改變

隱式分區和分區布局變更

https://iceberg.apache.org/#partitioning/
https://iceberg.apache.org/#evolution/#partition-evolution

感覺說的不清楚,好像是說了兩個事

一是自動管理關聯字段,比如 event_time, format_time(event_time, 'YYYY-MM-dd') as event_date,正常說如果 partition 只指定 event_date,那么 where 條件只用到 event_time 的時候不會用到分區機制,而 iceberg 分區會管理這兩個字段,只使用 event_time 也可以

二是可以對已有分區的表修改分區配置,並且不需要遷移數據,新舊數據分別存儲,查詢的時候會產生兩個 plan

查詢特定版本和版本回滾

查詢歷史

// time travel to October 26, 1986 at 01:21:00
spark.read
     .option("as-of-timestamp", "499162860000")
     .format("iceberg")
     .load("path/to/table")

// time travel to snapshot with ID 10963874102873L
spark.read
     .option("snapshot-id", 10963874102873L)
     .format("iceberg")
     .load("path/to/table")

回滾到某個時間

CALL catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000')

回滾到某個版本

CALL catalog_name.system.set_current_snapshot('db.sample', 1)

https://iceberg.apache.org/#spark-procedures/#rollback_to_timestamp

Iceberg in Spark

iceberg 需要配合計算引擎使用,這里以 spark 為例子

啟動時帶上 iceberg package 或者把 jar 包放到 spark 的 jar 目錄

spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.12.1

添加 catalogs

// 這里創建兩個 catalog,其名字分別是 spark_catalog 和 local
// 下面創建表,讀寫表的操作,都需要指定 catalog 的名字
// 支持兩種實現
//   org.apache.iceberg.spark.SparkCatalog : supports a Hive Metastore or a Hadoop warehouse
//   org.apache.iceberg.spark.SparkSessionCatalog : to Spark’s built-in catalog
// type
//   hive or hadoop or left unset if using a custom catalog
// uri
//   hive thrift://host:port or default from hive-site.xml
// warehouse
//   hadoop path like hdfs://nn:8020/warehouse/path

spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.12.1\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse

// hadoop 還可以進一步配置,比如
--conf spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000

默認 type 只支持 hive 或 hadoop,如果需要使用其他的比如 aws s3,需要添加額外的包,以及需要使用 custom catalog

# add Iceberg dependency
ICEBERG_VERSION=0.12.1
DEPENDENCIES="org.apache.iceberg:iceberg-spark3-runtime:$ICEBERG_VERSION"

# add AWS dependnecy
AWS_SDK_VERSION=2.15.40
AWS_MAVEN_GROUP=software.amazon.awssdk
AWS_PACKAGES=(
    "bundle"
    "url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
    DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
done

# start Spark SQL client shell
spark-sql --packages $DEPENDENCIES \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
    --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
    --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
    --conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager \
    --conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable

如果是 JDBC 同樣需要用 custom catalog

# uri 是真正存數據的地方,warehouse 是存 iceberg 元數據的地方
spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.12.1 \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/my/key/prefix \
    --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
    --conf spark.sql.catalog.my_catalog.uri=jdbc:mysql://test.1234567890.us-west-2.rds.amazonaws.com:3306/default \
    --conf spark.sql.catalog.my_catalog.jdbc.verifyServerCertificate=true \
    --conf spark.sql.catalog.my_catalog.jdbc.useSSL=true \
    --conf spark.sql.catalog.my_catalog.jdbc.user=admin \
    --conf spark.sql.catalog.my_catalog.jdbc.password=pass

使用 spark-sql shell 或 spark.sql(...) 函數創建表和讀寫數據

建表

CREATE TABLE local.db.table (id bigint, data string) USING iceberg

CREATE TABLE ... PARTITIONED BY
CREATE TABLE ... AS SELECT
ALTER TABLE
DROP TABLE

partition

CREATE TABLE ... PARTITIONED BY years(ts)

years(ts): partition by year
months(ts): partition by month
days(ts) or date(ts): equivalent to dateint partitioning
hours(ts) or date_hour(ts): equivalent to dateint and hour partitioning
bucket(N, col): partition by hashed value mod N buckets
truncate(L, col): partition by value truncated to L

修改表的存儲格式

ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'write.format.default'='orc'
)

默認是 parquet,支持 parquet, avro, orc 等格式

https://iceberg.apache.org/#configuration/
https://iceberg.apache.org/#spark-ddl/#alter-table

寫數據

INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;

或使用 dataframe 的 接口

// v2 接口
dataframe.writeTo("local.db.table").append()

// v1 接口
dataframe.write
         .format("iceberg")
         .mode("append")
         .save("local.db.table")

讀數據

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data

或使用 dataframe 接口

val df = spark.table("local.db.table")

val df = spark.table("local.db.table").select("id", "data")

使用 streaming 寫

data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()

https://iceberg.apache.org/#spark-queries/

表格式說明

https://iceberg.apache.org/#spec/

講了 metadata 文件和 data 文件是如何組織的

https://help.aliyun.com/document_detail/312246.html

和 hive 和 clickhouse 的比較,和適用場景




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM