Apache Hudi
Apache Hudi 在基于 HDFS/S3 数据存储之上,提供了两种流原语:
- 插入更新
- 增量拉取
一般来说,我们会将大量数据存储到HDFS/S3,新数据增量写入,而旧数据鲜有改动,特别是在经过数据清洗,放入数据仓库的场景。而且在数据仓库如 hive中,对于update的支持非常有限,计算昂贵。另一方面,若是有仅对某段时间内新增数据进行分析的场景,则hive、presto、hbase等也未提供原生方式,而是需要根据时间戳进行过滤分析。
在此需求下,Hudi可以提供这两种需求的实现。第一个是对record级别的更新,另一个是仅对增量数据的查询。且Hudi提供了对Hive、presto、Spark的支持,可以直接使用这些组件对Hudi管理的数据进行查询。
存储类型
我们看一下 Hudi 的两种存储类型:
- 写时复制(copy on write):仅使用列式文件(parquet)存储数据。在写入/更新数据时,直接同步合并原文件,生成新版本的基文件(需要重写整个列数据文件,即使只有一个字节的新数据被提交)。此存储类型下,写入数据非常昂贵,而读取的成本没有增加,所以适合频繁读的工作负载,因为数据集的最新版本在列式文件中始终可用,以进行高效的查询。
- 读时合并(merge on read):使用列式(parquet)与行式(avro)文件组合,进行数据存储。在更新记录时,更新到增量文件中(avro),然后进行异步(或同步)的compaction,创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以appending 的模式写入增量文件中。但是在读取数据集时,需要将增量文件与旧文件进行合并,生成列式文件。
视图
在了解这两种存储类型后,我们再看一下Hudi支持的存储数据的视图(也就是查询模式):
- 读优化视图(Read Optimized view):直接query 基文件(数据集的最新快照),也就是列式文件(如parquet)。相较于非Hudi列式数据集,有相同的列式查询性能
- 增量视图(Incremental View):仅query新写入数据集的文件,也就是指定一个commit/compaction,query此之后的新数据。
- 实时视图(Real-time View):query最新基文件与增量文件。此视图通过将最新的基文件(parquet)与增量文件(avro)进行动态合并,然后进行query。可以提供近实时的数据(会有几分钟的延迟)
在以上3种视图中,“读优化视图”与“增量视图”均可在“写时复制”与“读时合并”的存储类型下使用。而“实时视图“仅能在”读时合并“模式下使用。
存储类型 |
支持的视图 |
写时复制 |
读优化 + 增量 |
读时合并 |
读优化 + 增量 + 近实时 |
时间轴
最后介绍一下 Hudi 的核心 —— 时间轴。Hudi 会维护一个时间轴,在每次执行操作时(如写入、删除、合并等),均会带有一个时间戳。通过时间轴,可以实现在仅查询某个时间点之后成功提交的数据,或是仅查询某个时间点之前的数据。这样可以避免扫描更大的时间范围,并非常高效地只消费更改过的文件(例如在某个时间点提交了更改操作后,仅query某个时间点之前的数据,则仍可以query修改前的数据)。
使用案例
下面我们尝试使用Hudi API 进行读写。
写入数据
首先准备数据集,部分条目为:
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
…
启动spark-shell,并指定hudi jar包:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
加载指定包:
1
2
3
4
5
6
|
import
scala.collection.JavaConversions.
_
import
org.apache.spark.sql.SaveMode.
_
import
org.apache.hudi.DataSourceReadOptions.
_
import
org.apache.hudi.DataSourceWriteOptions.
_
import
org.apache.hudi.config.HoodieWriteConfig.
_
import
org.apache.hudi.hive.MultiPartKeysValueExtractor
|
指定创建的Hudi表名与路径:
1
2
|
val
tableName
=
"hudi_table"
val
basePath
=
"s3://xxxx/xxx"
|
构造 DataFrame:
1
2
3
4
|
val
lineRDD
=
sc.textFile(
"features.txt"
).map(
_
.split(
"\\|"
)).filter(
_
.length >
6
)
case
class
Record(id
:
Int, name
:
String, c
_
class
:
String, state
:
String, latitude
:
Float, longitude
:
String, elevation
:
Int)
val
RecordRDD
=
lineRDD.map(x
=
>Record(x(
0
).toInt, x(
1
), x(
2
), x(
3
), x(
4
).toFloat, x(
5
), x(
6
).toInt))
val
featureDF
=
RecordRDD.toDF
|
插入数据到 Hudi(以及Hive):
1
2
3
4
5
6
7
8
9
10
11
|
featureDF.write.format(
"org.apache.hudi"
).
option(RECORDKEY
_
FIELD
_
OPT
_
KEY,
"c_class"
).
option(PARTITIONPATH
_
FIELD
_
OPT
_
KEY,
"state"
).
option(PRECOMBINE
_
FIELD
_
OPT
_
KEY,
"id"
).
option(TABLE
_
NAME, tableName).
option(HIVE
_
SYNC
_
ENABLED
_
OPT
_
KEY,
"true"
).
option(HIVE
_
TABLE
_
OPT
_
KEY,
"hivehudi"
).
option(HIVE
_
PARTITION
_
FIELDS
_
OPT
_
KEY,
"state"
).
option(HIVE
_
PARTITION
_
EXTRACTOR
_
CLASS
_
OPT
_
KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Overwrite).
save(basePath);
|
我们可以看到目录结构类似于 Hive:
hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet
hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet
hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet
…
表名为hudi_table,分区键为 state,真正存储数据的文件为parquet。
查询数据
首先载入数据格式:
1
|
val
toViewDF
=
spark.read.format(
"org.apache.hudi"
).load(basePath +
"/*/*"
)
|
我们在上面插入数据的时候,同时创建了 Hive 表,所以有以下两种方式做查询:
- 直接查询 Hive 表:
spark.sql("select name from hivehudi where c_class='Summit'").show()
+--------------------+
| name|
+--------------------+
| High Knob|
| White Rock Mountain|
| Open Mine Hill|
…
2. 使用临时表:
roViewDF.registerTempTable("hudi_ro_table")
spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()
+-------+--------------------+
| id| name|
+-------+--------------------+
| 539931| Tiger Point Gully|
| 871801| Dry Brook|
| 847407| McClusky Creek|
| 637687| Shaw Drain|
| 749747| Duncan Creek|
|1502779| Brushy Lick|
…
更新数据
首先我们看一条数据:
spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
然后更新此数据(更新的数据存储在一个新的源文件中):
1
2
3
4
5
6
7
8
9
10
11
12
13
|
val
updateRDD
=
sc.textFile(
"update.txt"
).map(
_
.split(
"\\|"
)).filter(
_
.length>
6
)
val
updateDF
=
updateRDD.map(x
=
>Record(x(
0
).toInt, x(
1
), x(
2
), x(
3
), x(
4
).toFloat, x(
5
), x(
6
).toInt)).toDF
updateDF.write.format(
"org.apache.hudi"
).
option(RECORDKEY
_
FIELD
_
OPT
_
KEY,
"c_class"
).
option(PARTITIONPATH
_
FIELD
_
OPT
_
KEY,
"state"
).
option(PRECOMBINE
_
FIELD
_
OPT
_
KEY,
"id"
).
option(TABLE
_
NAME, tableName).
option(HIVE
_
SYNC
_
ENABLED
_
OPT
_
KEY,
"true"
).
option(HIVE
_
TABLE
_
OPT
_
KEY,
"hivehudi"
).
option(HIVE
_
PARTITION
_
FIELDS
_
OPT
_
KEY,
"state"
).
option(HIVE
_
PARTITION
_
EXTRACTOR
_
CLASS
_
OPT
_
KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Append).
save(basePath);
|
可以看到我们这里使用的模式由Overwrite 改为了 Append,也就是追加的模式,其余的基本不变。我们首先分别看一下 hive 表与 hudi 表中的数据变化。
Hive 表中:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
Hudi 表中:
1
2
3
|
val
appViewDF
=
spark.read.format(
"org.apache.hudi"
).load(basePath +
"/*/*"
)
appViewDF.registerTempTable(
"hudi_update_table"
)
spark.sql(
"select id,name from hudi_update_table where c_class='Stream' and id=539931"
).show()
|
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
可以看到均可以查到更新后的数据。
对数据执行 select * 加上过滤条件:
可以看到表中有2个比较有意思的字段,分别为:_hoodie_commit_time, _hoodie_commit_seqno
上文我们提到过 Hudi 有一个核心为时间轴,每次执行一个commit时,都会生成一个时间戳。这里 _hoodie_commit_time 即记录了commit 的时间戳。进一步的,Hudi 便是基于此实现了增量查询。
下面我们尝试一下增量查询:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 获取 commit 时间戳
val
commits
=
spark.sql(
"select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime"
).map(k
=
> k.getString(
0
)).take(
3
)
// 设置起始时间戳为上次时间戳
val
beginTime
=
commits(commits.length -
2
)
// 增量查询
val
incViewDF
=
spark.
read.
format(
"org.apache.hudi"
).
option(VIEW
_
TYPE
_
OPT
_
KEY, VIEW
_
TYPE
_
INCREMENTAL
_
OPT
_
VAL).
option(BEGIN
_
INSTANTTIME
_
OPT
_
KEY, beginTime).
load(basePath);
<
br
>
incViewDF.registerTempTable(
"hudi_incr_table"
)
spark.sql(
"select * from hudi_incr_table where c_class='Stream' and id=539931"
).show()
|
这里我们使用增量查询的选项 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及设置了时间戳的起始时间。查询结果为:
可以看到查询到的数据仅为上次commit 后的数据。
当然,我们也可以指定时间段内的数据进行查询,指定下面选项即可:
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
Hudi CLI
最后我们看下一下 Hudi CLI
// 启动 hudi cli:
/usr/lib/hudi/cli/bin/hudi-cli.sh
// 连接hudi 数据表
connect --path s3://xxxx/hudi/hudi_table
接下来我们可以查看提交过的 commit:
甚至回滚 commit:
commit rollback --commit 20191122073858
回滚后再次对 hive 表执行查询:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
+------+-----------------+
可以看到之前更新的数据已经被删除。
在 Hudi Cli 下,我们也可以创建表(create)、列出commit时文件级别的信息(commit showfiles)等。更多 Hudi cli 的用法,可以在 Hudi Cli 下输入 help 获取更多信息。
References:
Apache Hudi 官方介绍:https://hudi.apache.org/index.html
Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html
Apache Hudi CLI: https://hudi.apache.org/admin_guide.html
转自:https://www.cnblogs.com/zackstang/p/11912994.html