在上一篇文章中我們主要講解了iceberg各個元數據文件中的數據組織形式,那么這些元數據是怎么生成的呢?如何通過spark寫入iceberg?本文將帶大家簡單了解一下使用spark 2.4.7 batch寫入iceberg的整體流程。
spark寫入示例
本文主要演示如何使用iceberg hadoopTable寫入數據,hadoopCatalog和hiveCatalog在使用上大同小異。
import org.apache.iceberg.hadoop.HadoopTables import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.Schema import org.apache.iceberg.types._ import org.apache.spark.sql.types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import spark.implicits._ val order_item_schema = StructType(List( StructField("id", LongType, true), StructField("order_id", LongType, true), StructField("product_id", LongType, true), StructField("product_price", DecimalType(7,2), true), StructField("product_quantity", IntegerType, true), StructField("product_name", StringType, true) )) val order_item_action = Seq( Row(1L, 1L, 1L, Decimal.apply(50.00, 7, 2), 2, "table lamp"), Row(2L, 1L, 2L, Decimal.apply(100.5, 7, 2), 1, "skirt"), Row(3L, 2L, 1L, Decimal.apply(50.00, 7, 2), 1, "table lamp"), Row(4L, 3L, 3L, Decimal.apply(0.99, 7, 2), 1, "match") ) val iceberg_schema = new Schema( Types.NestedField.optional(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "order_id", Types.LongType.get()), Types.NestedField.optional(3, "product_id", Types.LongType.get()), Types.NestedField.optional(4, "product_price", Types.DecimalType.of(7, 2)), Types.NestedField.optional(5, "product_quantity", Types.IntegerType.get()), Types.NestedField.optional(6, "product_name", Types.StringType.get()) ) val iceberg_partition = PartitionSpec.builderFor(iceberg_schema).identity("id").build() val hadoopTable = new HadoopTables(sc.hadoopConfiguration); val location = "hdfs://10.242.199.202:9000/hive/empty_order_item"; hadoopTable.create(iceberg_schema, iceberg_partition, location) val df = spark.createDataFrame(sc.makeRDD(order_item_action), order_item_schema) df.write.format("iceberg").mode("overwrite").save("hdfs://10.242.199.202:9000/hive/empty_order_item")
spark寫入iceberg主要分為兩步:
- Executor寫入數據
- Driver commit生成元數據
Executor寫入邏輯

由上圖可以看到IcebergSource實現了spark ReadSupport、WriteSupport、StreamWriteSupport等接口,WriteFactory根據寫入表的類型:(1) 分區表 (2) 非分區表,生成不同的writer,最后通過write方法寫入數據。
我們以寫入分區表為例簡單介紹一下executor端iceberg寫入數據的流程:
- 根據file format生成對應的FileAppender,FileAppender完成實際的寫文件操作。目前支持3種文件格式的寫入:Parquet、Avro以及Orc

- iceberg分區數據不直接寫入數據文件中,而是通過目錄樹結構來進行存儲,分區目錄結構與hive類型,都是以key1=value1/key2=value2的形式進行組織。在寫入數據之前,partitionWriter首先根據partition transform函數得到對應的partition value,然后創建對應的分區目錄
- fileAppender通過調用不同的file format組件將數據寫入到文件中。iceberg寫入時可以通過設置write.target-file-size-bytes table property調整寫入文件target大小,默認為LONG_MA
- 當所有數據寫入完成后,iceberg會收集寫入的統計信息,例如record_count, lower_bound, upper_bound, value_count等用於driver端生成對應的manifest文件,最后executor端將這些信息傳回driver端。
Driver commit邏輯
iceberg snapshot中的統計信息實際是累計更新的結果,相較於上次commit,本次commit發生了哪些變化,例新增了多少條記錄,刪除了多少條記錄,新增了多少文件,刪除了多少文件等等。既然是累計更新,首先需要知道上次snapshot的信息,然后計算最后的結果。iceberg讀取當前最新snapshot數據過程如下:
- 讀取version.hint中記錄的最新metadata版本號versionNumber
- 讀取version[versionNumber].metadata.json文件,根據metadata中記錄的snpshots信息以及current snapshot id得到最新snapshot的location
- 最后根據獲得的location讀取對應的snapshot文件得到最新的snapshot數據
在本篇文章中,我們使用了overwrite的寫入方式,overwrite實際上可以等價划分成兩個步驟:
- delete
- insert
那么我們如何知道需要刪除哪些數據呢?這里就要用到剛剛讀取的current snapshot數據以及executor傳回的信息,根據這些信息,我們可以計算得到哪些分區文件是需要通過覆蓋刪除的,實際上是將manifest中的對應DataFileEntry標記成刪除寫入到新的manifest文件中,沒有被刪除的DataFileEntry則標記成Existing寫入到manifest文件中
在完成了delete操作之后,insert操作就相對比較簡單,只要將GenericDataFile全部寫入到新的manifest中即可
iceberg默認開啟merge manifest功能,當manifest文件數量超過commit.manifest.min-count-to-merge時(默認100),將多個small manifest文件合並成large manifest(large manifest文件大小由commit.manifest.target-size-bytes指定,默認為8M)
最后iceberg根據這些Added/Deleted/Existing DataFileEntry得到本次commit的差值統計信息,與前一次snapshot統計信息累加最終得到本次snapshot的統計信息(added_data_files_count, added_rows_count等)。生成snapshot的整個過程如下圖所示:
在生成新的snapshot后,只剩最后一步那就是生成新版本的version.metadata.json文件,同時將版本號寫入到version.hint中,至此完成了所有iceberg數據的寫入。
總結
本文簡單介紹了iceberg數據寫入的整個流程,可以看到整個過程中比較重要的地方在於如何生成元數據,元數據的這種管理方式支持iceberg能夠進行快速高效的查詢,並且保證了多個snapshot可以共用同一份數據文件避免了數據的冗余性。
原文鏈接:https://blog.csdn.net/u012794915/article/details/111831471
