Apache Iceberg作為一款新興的數據湖解決方案在實現上高度抽象,在存儲上能夠對接當前主流的HDFS,S3文件系統並且支持多種文件存儲格式,例如Parquet、ORC、AVRO。相較於Hudi、Delta與Spark的強耦合,Iceberg可以與多種計算引擎對接,目前社區已經支持Spark讀寫Iceberg、Impala/Hive查詢Iceberg。本文基於Apache Iceberg 0.10.0,介紹Iceberg文件的組織方式以及不同文件的存儲格式。
Iceberg Table Format
從圖中可以看到iceberg將數據進行分層管理,主要分為元數據管理層和數據存儲層。元數據管理層又可以細分為三層:
- VersionMetadata
- Snapshot
- Manifest
VersionMetadata存儲當前版本的元數據信息(所有snapshot信息);Snapshot表示當前操作的一個快照,每次commit都會生成一個快照,一個快照中包含多個Manifest,每個Manifest中記錄了當前操作生成數據所對應的文件地址,也就是data files的地址。基於snapshot的管理方式,iceberg能夠進行time travel(歷史版本讀取以及增量讀取),並且提供了serializable isolation。
數據存儲層支持不同的文件格式,目前支持Parquet、ORC、AVRO。
下面以HadoopTableOperation commit生成的數據為例介紹各層的數據格式。iceberg生成的數據目錄結構如下所示:
├── data
│ ├── id=1
│ │ ├── 00000-0-04ae60eb-657d-45cb-bb99-d1cb7fe0ad5a-00001.parquet
│ │ └── 00000-4-487b841b-13b4-4ae8-9238-f70674d5102e-00001.parquet
│ ├── id=2
│ │ ├── 00001-1-e85b018b-e43a-44d7-9904-09c80a9b9c24-00001.parquet
│ │ └── 00001-5-0e2be766-c921-4269-8e1e-c3cff4b98a5a-00001.parquet
│ ├── id=3
│ │ ├── 00002-2-097171c5-d810-4de9-aa07-58f3f8a3f52e-00001.parquet
│ │ └── 00002-6-9d738169-1dbe-4cc5-9a87-f79457a9ec0b-00001.parquet
│ └── id=4
│ ├── 00003-3-b0c91d66-9e4e-4b7a-bcd5-db3dc1b847f2-00001.parquet
│ └── 00003-7-68c45a24-21a2-41e8-90f1-ef4be42f3002-00001.parquet
└── metadata
├── 1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro
├── 1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro
├── f475511f-877e-4da5-90aa-efa5928a7759-m0.avro
├── snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro
├── snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro
├── v1.metadata.json
├── v2.metadata.json
├── v3.metadata.json
└── version-hint.text
其中metadata目錄存放元數據管理層的數據:
- version-hint.text:存儲version.metadata.json的版本號,即下文的number
- version[number].metadata.json
- snap-[snapshotID]-[attemptID]-[commitUUID].avro(snapshot文件)
- [commitUUID]-m-[manifestCount].avro(manifest文件)
data目錄組織形式類似於hive,都是以分區進行目錄組織(上圖中id為分區列),最終數據可以使用不同文件格式進行存儲:
- [sparkPartitionID]-[sparkTaskID]-[UUID]-[fileCount].[parquet | avro | orc]
VersionMetadata
//
{
// 當前文件格式版本信息
// 目前為version 1
// 支持row-level delete等功能的version 2還在開發中
"format-version" : 1,
"table-uuid" : "a9114f94-911e-4acf-94cc-6d000b321812",
// hadoopTable location
"location" : "hdfs://10.242.199.202:9000/hive/empty_order_item",
// 最新snapshot的創建時間
"last-updated-ms" : 1608810968725,
"last-column-id" : 6,
// iceberg schema
"schema" : {
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false, // 類似probuf中的required
"type" : "long"
}, {
"id" : 2,
"name" : "order_id",
"required" : false,
"type" : "long"
}, {
"id" : 3,
"name" : "product_id",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "product_price",
"required" : false,
"type" : "decimal(7, 2)"
}, {
"id" : 5,
"name" : "product_quantity",
"required" : false,
"type" : "int"
}, {
"id" : 6,
"name" : "product_name",
"required" : false,
"type" : "string"
} ]
},
"partition-spec" : [ {
"name" : "id",
"transform" : "identity", // transform類型
"source-id" : 1,
"field-id" : 1000
} ],
"default-spec-id" : 0,
// 分區信息
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "id",
// transform類型:目前支持identity,year,bucket等
"transform" : "identity",
// 對應schema.fields中相應field的ID
"source-id" : 1,
"field-id" : 1000
} ]
} ],
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
// hive創建該表存儲的一些hive property信息
"properties" : {
"totalSize" : "0",
"rawDataSize" : "0",
"numRows" : "0",
"COLUMN_STATS_ACCURATE" : "{\"BASIC_STATS\":\"true\"}",
"numFiles" : "0"
},
// 當前snapshot id
"current-snapshot-id" : 2080639593951710914,
// snapshot信息
"snapshots" : [ {
"snapshot-id" : 5178718682852547007,
// 創建snapshot時間
"timestamp-ms" : 1608809818168,
"summary" : {
// spark寫入方式,目前支持overwrite以及append
"operation" : "overwrite",
"spark.app.id" : "local-1608809790982",
"replace-partitions" : "true",
// 本次snapshot添加的文件數量
"added-data-files" : "4",
// 本次snapshot添加的record數量
"added-records" : "4",
// 本次snapshot添加的文件大小
"added-files-size" : "7217",
// 本次snapshot修改的分區數量
"changed-partition-count" : "4",
// 本次snapshot中record總數 = lastSnapshotTotalRecord - currentSnapshotDeleteRecord + currentSnapshotAddRecord
"total-records" : "4",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro"
}, {
"snapshot-id" : 2080639593951710914,
// 上次snapshotID
"parent-snapshot-id" : 5178718682852547007,
"timestamp-ms" : 1608810968725,
"summary" : {
"operation" : "overwrite",
"spark.app.id" : "local-1608809790982",
"replace-partitions" : "true",
"added-data-files" : "4",
"deleted-data-files" : "4",
"added-records" : "4",
"deleted-records" : "4",
"added-files-size" : "7217",
"removed-files-size" : "7217",
"changed-partition-count" : "4",
"total-records" : "4",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
// snapshot文件路徑
"manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro"
} ],
// snapshot記錄
"snapshot-log" : [ {
"timestamp-ms" : 1608809818168,
"snapshot-id" : 5178718682852547007
}, {
"timestamp-ms" : 1608810968725,
"snapshot-id" : 2080639593951710914
} ],
// metada記錄
"metadata-log" : [ {
"timestamp-ms" : 1608809758229,
"metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v1.metadata.json"
}, {
"timestamp-ms" : 1608809818168,
"metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v2.metadata.json"
} ]
}
上例展示的是v3.metadata.json中的數據,該文件保存了iceberg table schema、partition、snapshot信息,partition中的transform信息使得iceberg能夠根據字段進行hidden partition,而無需像hive一樣顯示的指定分區字段。由於VersionMetadata中記錄了每次snapshot的id以及create_time,我們可以通過時間或snapshotId查詢相應snapshot的數據,實現Time Travel。
Snapshot
// Snapshot: 2080639593951710914
// Location: hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro
// manifest entry
{
"manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro",
"manifest_length" : 5291,
"partition_spec_id" : 0,
// 該manifest entry所屬的snapshot
"added_snapshot_id" : {
"long" : 2080639593951710914
},
// 該manifest中添加的文件數量
"added_data_files_count" : {
"int" : 4
},
// 創建該manifest時已經存在且
// 沒有被這次創建操作刪除的文件數量
"existing_data_files_count" : {
"int" : 0
},
// 創建manifest刪除的文件
"deleted_data_files_count" : {
"int" : 0
},
// 該manifest中partition字段的范圍
"partitions" : {
"array" : [ {
"contains_null" : false,
"lower_bound" : {
"bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
"upper_bound" : {
"bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}
} ]
},
"added_rows_count" : {
"long" : 4
},
"existing_rows_count" : {
"long" : 0
},
"deleted_rows_count" : {
"long" : 0
}
}
// manifest entry
{
"manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro",
"manifest_length" : 5289,
"partition_spec_id" : 0,
"added_snapshot_id" : {
"long" : 2080639593951710914
},
"added_data_files_count" : {
"int" : 0
},
"existing_data_files_count" : {
"int" : 0
},
"deleted_data_files_count" : {
"int" : 4
},
"partitions" : {
"array" : [ {
"contains_null" : false,
"lower_bound" : {
"bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
"upper_bound" : {
"bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}
} ]
},
"added_rows_count" : {
"long" : 0
},
"existing_rows_count" : {
"long" : 0
},
"deleted_rows_count" : {
"long" : 4
}
}
一個snapshot中可以包含多個manifest entry,一個manifest entry表示一個manifest,其中重點需要關注的是每個manifest中的partitions字段,在根據filter進行過濾時可以首先通過該字段表示的分區范圍對manifest進行過濾,避免無效的查詢。
Manifest
// DataFileEntry
{
// 表示對應數據文件status
// 0: EXISTING, 1: ADDED,2: DELETED
"status" : 1,
"snapshot_id" : {
"long" : 2080639593951710914
},
"data_file" : {
"file_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/data/id=1/00000-4-487b841b-13b4-4ae8-9238-f70674d5102e-00001.parquet",
"file_format" : "PARQUET",
// 對應的分區值
"partition" : {
"id" : {
"long" : 1
}
},
// 文件中record數量
"record_count" : 1,
// 文件大小
"file_size_in_bytes" : 1823,
"block_size_in_bytes" : 67108864,
// 不同column存儲大小
"column_sizes" : {
"array" : [ {
"key" : 1,
"value" : 52
}, {
"key" : 2,
"value" : 52
}, {
"key" : 3,
"value" : 52
}, {
"key" : 4,
"value" : 53
}, {
"key" : 5,
"value" : 51
}, {
"key" : 6,
"value" : 61
} ]
},
// 不同列對應的value數量
"value_counts" : {
"array" : [ {
"key" : 1,
"value" : 1
}, {
"key" : 2,
"value" : 1
}, {
"key" : 3,
"value" : 1
}, {
"key" : 4,
"value" : 1
}, {
"key" : 5,
"value" : 1
}, {
"key" : 6,
"value" : 1
} ]
},
// 列值為null的數量
"null_value_counts" : {
"array" : [ {
"key" : 1,
"value" : 0
}, {
"key" : 2,
"value" : 0
}, {
"key" : 3,
"value" : 0
}, {
"key" : 4,
"value" : 0
}, {
"key" : 5,
"value" : 0
}, {
"key" : 6,
"value" : 0
} ]
},
// 不同列的范圍
"lower_bounds" : {
"array" : [ {
"key" : 1,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 2,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 3,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 4,
"value" : "\u0013ˆ"
}, {
"key" : 5,
"value" : "\u0002\u0000\u0000\u0000"
}, {
"key" : 6,
"value" : "table lamp"
} ]
},
"upper_bounds" : {
"array" : [ {
"key" : 1,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 2,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 3,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 4,
"value" : "\u0013ˆ"
}, {
"key" : 5,
"value" : "\u0002\u0000\u0000\u0000"
}, {
"key" : 6,
"value" : "table lamp"
} ]
},
"key_metadata" : null,
// parquet block offset/ orc stripe offset
"split_offsets" : {
"array" : [ 4 ]
}
}
}
{
...
}
Manifest管理多個data文件,一條DataFileEntry對應一個data文件,DataFileEntry中記錄了所屬partition,value bounds等信息,value_counts和null_value_counts可以用於過濾null列,例:column a所對應的value_count為3,且對應的null_value_count也為3,此時如果select a,則可以根據value_count-null_value_count=0判斷a全為null直接返回而無需再進行parquet文件的查詢;除此之外,可以根據value bounds進行過濾,加速查詢。
總結
本文主要介紹了Iceberg不同文件的存儲格式,講解了不同字段中的作用,正是這些元數據管理保證了iceberg能夠進行高效快速的查詢,后續會根據這些文件進一步分析iceberg寫入和查詢過程。
原文鏈接:https://blog.csdn.net/u012794915/article/details/111831471