1. 介紹
Apache Hudi是一個開源的數據湖框架,旨在簡化增量數據處理和數據管道開發。借助Hudi可以在Amazon S3、Aliyun OSS數據湖中進行記錄級別管理插入/更新/刪除。AWS EMR集群已支持Hudi組件,並且可以與AWS Glue Data Catalog無縫集成。此特性可使得直接在Athena或Redshift Spectrum查詢Hudi數據集。
對於企業使用AWS雲的一種常見數據流如圖1所示,即將數據實時復制到S3。

本篇文章將介紹如何使用Oracle GoldenGate來捕獲變更事件並利用Hudi格式寫入S3數據湖。
Oracle GG可以使用多個處理程序和格式輸出,請查看此處獲取更多信息。
本篇文章中不關心處理程序,我們假設使用Avro Operation格式,這種格式較為冗長,但有着廣泛應用,因為其平衡了數據完整性和性能。如圖2所示,此格式包含每個記錄的before和after版本。

即使完整且易於生成,此格式也不適合用Athena或Spectrum進行分析,從使用角度也無法替代源數據。此外你可能需要對歷史數據進行分區處理以便快速檢索。
本文我們將介紹如何利用Apache Hudi框架做到這一點,以構建易於分析的目標數據集。
2. 系統架構
我們不詳細介紹如何將avro格式文件放入Replica S3桶中,整個數據體系結構如下所示

Hudi代碼運行在EMR集群中,從Replica S3桶中讀取avro數據,並將目標數據集存儲到Target S3桶中。
EMR軟件配置如下

硬件配置如下

由於插入/更新始終保留最后一條記錄,因此Hudi作業非常具有彈性, 因此可以利用Spot Instance(搶占式實例)大大降低成本。
除此之外,還需要設置
- 源bucket(如 my-s3-sourceBucket)
- 目標bucket (如 my-s4-targetBucket)
- Glue數據庫(如 sales-db)
配置完后需要確保EMR集群有讀寫權限。
如果你需要一些樣例數據,可以點擊此處獲取。當設置好桶后,啟動EMR集群並將這些樣例數據導入Replica桶。
3. 關於分區的注意事項
為構建按時間划分的數據集,必須確定不可變的日期類型字段。參照示例數據集(銷售訂單),我們假設訂單日期永遠不會改變,因此我們將DAT_ORDER字段作為寫入Hudi數據集的分區字段。
分區方式是YYYY/MM/DD,通過該方式,所有數據將被組織在嵌套的子文件夾中。Hudi框架將提供此分區信息,並將一個特定字段添加到關聯的Hive/Glue表中。當查詢時,該字段上的過濾條件將轉換為超高效的分區修剪掃描條件。
實際上這是我們必須對數據集做的唯一強假設,所有其他信息都在avro文件中(字段名稱,字段類型,PK等)。
除此元數據外,GoldenGate通常還會添加一些其他信息,例如表名稱,操作時間戳,操作類型(插入/更新/刪除)和自定義標記。你可以利用這些字段來構造通用邏輯並構建靈活的遷移平台。
4. 步驟
啟動spark-shell
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
啟動后可以運行如下代碼:
val ggDeltaFiles = "s3://" + sourceBucket + "/" + sourceSubFolder + "/" + sourceSystem + "/" + inputTableName + "/";
val rootDataframe:DataFrame = spark.read.format("avro").load(ggDeltaFiles);
// extract PK fields name from first line
val pkFields: Seq[String] = rootDataframe.select("primary_keys").limit(1).collect()(0).getSeq(0);
// take into account the "after." fields only
val columnsPre:Array[String] = rootDataframe.select("after.*").columns;
// exclude "_isMissing" fields added by Oracle GoldenGate
// The second part of the expression will safely preserve all native "**_isMissing" fields
val columnsPost:Array[String] = columnsPre.filter { x => (!x.endsWith("_isMissing")) || (!x.endsWith("_isMissing_isMissing") && (columnsPre.filter(y => (y.equals(x + "_isMissing")) ).nonEmpty))};
val columnsFinal:ArrayBuffer[String] = new ArrayBuffer[String]();
columnsFinal += "op_ts";
columnsFinal += "pos";
// add the "after." prefix
columnsPost.foreach(x => (columnsFinal += "after." + x));
// prepare the target dataframe with the partition additional column
val preparedDataframe = rootDataframe.select("opTypeFieldName", columnsFinal.toArray:_*).
withColumn("HUDI_PART_DATE", date_format(to_date(col("DAT_ORDER"), "yyyy-MM-dd"),"yyyy/MM/dd")).
filter(col(opTypeFieldName).isin(admittedValues.toList: _*));
// write data
preparedDataframe.write.format("org.apache.hudi").
options(hudiOptions).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, pkFields.mkString(",")).
mode(SaveMode.Append).
save(hudiTablePath);
上述簡化了部分代碼,可以在此處找到完整的代碼。
5. 結果
輸出的S3對象結果如下所示

同時Glue數據目錄將使該表可用於通過外部模式在Athena或Spectrum中進行查詢分析,外部表具有我們用於分區的hudi_part_date附加字段。

