Apache Hudi
Apache Hudi 在基於 HDFS/S3 數據存儲之上,提供了兩種流原語:
- 插入更新
- 增量拉取
一般來說,我們會將大量數據存儲到HDFS/S3,新數據增量寫入,而舊數據鮮有改動,特別是在經過數據清洗,放入數據倉庫的場景。而且在數據倉庫如 hive中,對於update的支持非常有限,計算昂貴。另一方面,若是有僅對某段時間內新增數據進行分析的場景,則hive、presto、hbase等也未提供原生方式,而是需要根據時間戳進行過濾分析。
在此需求下,Hudi可以提供這兩種需求的實現。第一個是對record級別的更新,另一個是僅對增量數據的查詢。且Hudi提供了對Hive、presto、Spark的支持,可以直接使用這些組件對Hudi管理的數據進行查詢。
存儲類型
我們看一下 Hudi 的兩種存儲類型:
- 寫時復制(copy on write):僅使用列式文件(parquet)存儲數據。在寫入/更新數據時,直接同步合並原文件,生成新版本的基文件(需要重寫整個列數據文件,即使只有一個字節的新數據被提交)。此存儲類型下,寫入數據非常昂貴,而讀取的成本沒有增加,所以適合頻繁讀的工作負載,因為數據集的最新版本在列式文件中始終可用,以進行高效的查詢。
- 讀時合並(merge on read):使用列式(parquet)與行式(avro)文件組合,進行數據存儲。在更新記錄時,更新到增量文件中(avro),然后進行異步(或同步)的compaction,創建列式文件(parquet)的新版本。此存儲類型適合頻繁寫的工作負載,因為新記錄是以appending 的模式寫入增量文件中。但是在讀取數據集時,需要將增量文件與舊文件進行合並,生成列式文件。
視圖
在了解這兩種存儲類型后,我們再看一下Hudi支持的存儲數據的視圖(也就是查詢模式):
- 讀優化視圖(Read Optimized view):直接query 基文件(數據集的最新快照),也就是列式文件(如parquet)。相較於非Hudi列式數據集,有相同的列式查詢性能
- 增量視圖(Incremental View):僅query新寫入數據集的文件,也就是指定一個commit/compaction,query此之后的新數據。
- 實時視圖(Real-time View):query最新基文件與增量文件。此視圖通過將最新的基文件(parquet)與增量文件(avro)進行動態合並,然后進行query。可以提供近實時的數據(會有幾分鍾的延遲)
在以上3種視圖中,“讀優化視圖”與“增量視圖”均可在“寫時復制”與“讀時合並”的存儲類型下使用。而“實時視圖“僅能在”讀時合並“模式下使用。
存儲類型 |
支持的視圖 |
寫時復制 |
讀優化 + 增量 |
讀時合並 |
讀優化 + 增量 + 近實時 |
時間軸
最后介紹一下 Hudi 的核心 —— 時間軸。Hudi 會維護一個時間軸,在每次執行操作時(如寫入、刪除、合並等),均會帶有一個時間戳。通過時間軸,可以實現在僅查詢某個時間點之后成功提交的數據,或是僅查詢某個時間點之前的數據。這樣可以避免掃描更大的時間范圍,並非常高效地只消費更改過的文件(例如在某個時間點提交了更改操作后,僅query某個時間點之前的數據,則仍可以query修改前的數據)。
使用案例
下面我們嘗試使用Hudi API 進行讀寫。
寫入數據
首先准備數據集,部分條目為:
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
…
啟動spark-shell,並指定hudi jar包:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
加載指定包:
import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.hive.MultiPartKeysValueExtractor
指定創建的Hudi表名與路徑:
val tableName = "hudi_table" val basePath = "s3://xxxx/xxx"
構造 DataFrame:
val lineRDD = sc.textFile("features.txt").map(_.split("\\|")).filter(_.length > 6) case class Record(id:Int, name:String, c_class:String, state:String, latitude:Float, longitude:String, elevation:Int) val RecordRDD = lineRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)) val featureDF=RecordRDD.toDF
插入數據到 Hudi(以及Hive):
featureDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY, "c_class"). option(PARTITIONPATH_FIELD_OPT_KEY, "state"). option(PRECOMBINE_FIELD_OPT_KEY, "id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_TABLE_OPT_KEY, "hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Overwrite). save(basePath);
我們可以看到目錄結構類似於 Hive:
hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet
hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet
hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet
…
表名為hudi_table,分區鍵為 state,真正存儲數據的文件為parquet。
查詢數據
首先載入數據格式:
val toViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*")
我們在上面插入數據的時候,同時創建了 Hive 表,所以有以下兩種方式做查詢:
- 直接查詢 Hive 表:
spark.sql("select name from hivehudi where c_class='Summit'").show()
+--------------------+
| name|
+--------------------+
| High Knob|
| White Rock Mountain|
| Open Mine Hill|
…
2. 使用臨時表:
roViewDF.registerTempTable("hudi_ro_table")
spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()
+-------+--------------------+
| id| name|
+-------+--------------------+
| 539931| Tiger Point Gully|
| 871801| Dry Brook|
| 847407| McClusky Creek|
| 637687| Shaw Drain|
| 749747| Duncan Creek|
|1502779| Brushy Lick|
…
更新數據
首先我們看一條數據:
spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
然后更新此數據(更新的數據存儲在一個新的源文件中):
val updateRDD = sc.textFile("update.txt").map(_.split("\\|")).filter(_.length>6) val updateDF = updateRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)).toDF updateDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY, "c_class"). option(PARTITIONPATH_FIELD_OPT_KEY, "state"). option(PRECOMBINE_FIELD_OPT_KEY, "id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_TABLE_OPT_KEY, "hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Append). save(basePath);
可以看到我們這里使用的模式由Overwrite 改為了 Append,也就是追加的模式,其余的基本不變。我們首先分別看一下 hive 表與 hudi 表中的數據變化。
Hive 表中:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
Hudi 表中:
val appViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*") appViewDF.registerTempTable("hudi_update_table") spark.sql("select id,name from hudi_update_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
可以看到均可以查到更新后的數據。
對數據執行 select * 加上過濾條件:
可以看到表中有2個比較有意思的字段,分別為:_hoodie_commit_time, _hoodie_commit_seqno
上文我們提到過 Hudi 有一個核心為時間軸,每次執行一個commit時,都會生成一個時間戳。這里 _hoodie_commit_time 即記錄了commit 的時間戳。進一步的,Hudi 便是基於此實現了增量查詢。
下面我們嘗試一下增量查詢:
// 獲取 commit 時間戳 val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime").map(k => k.getString(0)).take(3) // 設置起始時間戳為上次時間戳 val beginTime = commits(commits.length - 2) // 增量查詢 val incViewDF = spark. read. format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath);
incViewDF.registerTempTable("hudi_incr_table") spark.sql("select * from hudi_incr_table where c_class='Stream' and id=539931").show()
這里我們使用增量查詢的選項 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及設置了時間戳的起始時間。查詢結果為:
可以看到查詢到的數據僅為上次commit 后的數據。
當然,我們也可以指定時間段內的數據進行查詢,指定下面選項即可:
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
Hudi CLI
最后我們看下一下 Hudi CLI
// 啟動 hudi cli:
/usr/lib/hudi/cli/bin/hudi-cli.sh
// 連接hudi 數據表
connect --path s3://xxxx/hudi/hudi_table
接下來我們可以查看提交過的 commit:
甚至回滾 commit:
commit rollback --commit 20191122073858
回滾后再次對 hive 表執行查詢:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
+------+-----------------+
可以看到之前更新的數據已經被刪除。
在 Hudi Cli 下,我們也可以創建表(create)、列出commit時文件級別的信息(commit showfiles)等。更多 Hudi cli 的用法,可以在 Hudi Cli 下輸入 help 獲取更多信息。
References:
Apache Hudi 官方介紹:https://hudi.apache.org/index.html
Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html
Apache Hudi CLI: https://hudi.apache.org/admin_guide.html