使用Apache Hudi + Amazon S3 + Amazon EMR + AWS DMS構建數據湖


1. 引入

數據湖使組織能夠在更短的時間內利用多個源的數據,而不同角色用戶可以以不同的方式協作和分析數據,從而實現更好、更快的決策。Amazon Simple Storage Service(amazon S3)是針對結構化和非結構化數據的高性能對象存儲服務,可以用來作為數據湖底層的存儲服務。

然而許多用例,如從上游關系數據庫執行變更數據捕獲(CDC)到基於Amazon S3的數據湖,都需要在記錄級別處理數據,執行諸如從數據集中插入、更新和刪除單條記錄的操作需要處理引擎讀取所有對象(文件),進行更改,並將整個數據集重寫為新文件。此外為使數據湖中的數據以近乎實時的方式被訪問,通常會導致數據被分割成許多小文件,從而導致查詢性能較差。Apache Hudi是一個開源的數據管理框架,它使您能夠在Amazon S3 數據湖中以記錄級別管理數據,從而簡化了CDC管道的構建,並使流數據攝取變得高效,Hudi管理的數據集使用開放存儲格式存儲在Amazon S3中,通過與Presto、Apache Hive、Apache Spark和AWS Glue數據目錄的集成,您可以使用熟悉的工具近乎實時地訪問更新的數據。Amazon EMR已經內置Hudi,在部署EMR集群時選擇Spark、Hive或Presto時自動安裝Hudi。

本篇文章將展示如何構建一個CDC管道,該管道使用AWS數據庫遷移服務(AWS DMS)從Amazon關系數據庫服務(Amazon RDS)for MySQL數據庫中捕獲數據,並將這些更改應用到Amazon S3中的一個數據集上。Apache Hudi包含了HoodieDeltaStreamer實用程序,它提供了一種從許多源(如分布式文件系統DFS或Kafka)攝取數據的簡單方法,它可以自己管理檢查點、回滾和恢復,因此不需要跟蹤從源讀取和處理了哪些數據,這使得使用更改數據變得很容易,同時還可以在接收數據時對數據進行基於SQL的輕量級轉換,有關詳細信息,請參見寫Hudi表。ApacheHudi版本0.5.2提供了對帶有HoodieDeltaStreamer的AWS DMS支持,並在Amazon EMR 5.30.x和6.1.0上可用。

2. 架構

下圖展示了構建CDC管道而部署的體系結構。

在該架構中,我們在Amazon RDS上有一個MySQL實例,AWS-DMS將完整的增量數據(使用AWS-DMS的CDC特性)以Parquet格式存入S3中,EMR集群上的HoodieDeltaStreamer用於處理全量和增量數據,以創建Hudi數據集,當更新MySQL數據庫中的數據后,AWS-DMS任務將獲取這些更改並將它們變更到原始的S3存儲桶中。HoodieDeltastreamer作業可以在EMR集群上以特定的頻率或連續模式運行,以將這些更改應用於Amazon S3數據湖中的Hudi數據集,然后可以使用SparkSQL、Presto、運行在EMR集群上的Apache Hive和Amazon Athena等工具查詢這些數據。

3. 部署解決方案資源

使用AWS CloudFormation在AWS帳戶中部署這些組件,選擇一個AWS區域部署以下服務:

  • Amazon EMR
  • AWS DMS
  • Amazon S3
  • Amazon RDS
  • AWS Glue
  • AWS Systems Manager

在部署CloudFormation模板之前需要先滿足如下條件:

  • 擁有一個至少有兩個公共子網的專有網絡(VPC)。
  • 有一個S3存儲桶來從EMR集群收集日志,需要在同一個AWS區域。
  • 具有AWS身份和訪問管理(IAM)角色DMS VPC角色dms-vpc-role
  • 如果要使用AWS Lake Formation權限模型在帳戶中部署,請驗證以下設置:
    • 用於部署技術棧的IAM用戶需要被添加為Lake Formation下的data lake administrator,或者用於部署堆棧的IAM用戶具有在AWS Glue data Catalog中創建數據庫的IAM權限。
    • Lake Formation下的數據目錄(Data Catalog)設置配置為僅對新數據庫和新數據庫中的新表使用IAM訪問控制,這將確保僅使用IAM權限控制對數據目錄(Data Catalog)中新創建的數據庫和表的所有訪問權限。

  • IAMAllowedPrincipals在Lake Formation database creators頁面上被授予數據庫創建者權限。

如果此權限不存在,請通過選擇授予並選擇授予創建數據庫權限。

這些設置是必需的,以便僅使用IAM控制對數據目錄對象的所有權限。

4. 啟動CloudFormation

要啟動CloudFormation棧,請完成以下步驟

  • 選擇啟動CloudFormation棧

  • 在Parameters部分提供必需的參數,包括一個用於存儲Amazon EMR日志的S3 Bucket和一個您想要訪問Amazon RDS for MySQL的CIDR IP范圍。

  • 遵循CloudFormation創建向導,保持其余默認值不變。

  • 在最后一個頁面上,選擇允許AWS CloudFormation可能會使用自定義名稱創建IAM資源。

  • 選擇創建

  • 當創建完成后,在CloudFormation堆棧的Outputs選項卡上記錄S3 Bucket、EMR集群和Amazon RDS for MySQL的詳細信息。

CloudFormation模板為EMR集群使用m5.xlarge和m5.2xlarge實例,如果這些實例類型在你選擇用於部署的區域或可用性區域中不可用,那么CloudFormation將會創建失敗。如果發生這種情況,請選擇實例類型可用的區域或子網。

CloudFormation還使用必要的連接屬性(如dataFormattimestampColumnNameparquetTimestampInMillisecond)創建和配置AWS DMS端點和任務。

作為CloudFormation棧的一部分部署的數據庫實例已經被創建,其中包含AWS-DMS在數據庫的CDC模式下工作所需的設置。

  • binlog_format=ROW
  • binlog_checksum=NONE

另外在RDS DB實例上啟用自動備份,這是AWS-DMS進行CDC所必需的屬性。

5. 運行端到端數據流

CloudFormation部署好后就可以運行數據流,將MySQL中的完整和增量數據放入數據湖中的Hudi數據集。

  1. 作為最佳實踐,請將binlog保留至少24小時。使用SQL客戶端登錄Amazon RDS for MySQL數據庫並運行以下命令:
call mysql.rds_set_configuration('binlog retention hours', 24)
  1. 在dev數據庫中創建表:
create table dev.retail_transactions(
tran_id INT,
tran_date DATE,
store_id INT,
store_city varchar(50),
store_state char(2),
item_code varchar(50),
quantity INT,
total FLOAT);
  1. 創建表時,將一些測試數據插入數據庫:
insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);
insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);
insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);
insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);
insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);
insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);
insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);
insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);
insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);
insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);
insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);
insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);
insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);

現在使用AWS DMS將這些數據推送到Amazon S3。

  1. 在AWS DMS控制台上,運行hudiblogload任務。

此任務將表的全量數據加載到Amazon S3,然后開始寫增量數據。

如果第一次啟動AWS-DMS任務時系統提示測試AWS-DMS端點,那么可以先進行測試,在第一次啟動AWS-DMS任務之前測試源和目標端點通常是一個好的實踐。

幾分鍾后,任務的狀態將變更為"加載完成"、"復制正在進行",這意味着已完成全量加載,並且正在進行的復制已開始,可以轉到由CloudFormation創建的S3 Bucket,應該會在S3 Bucket的dmsdata/dev/retail_transactions文件夾下看到一個.parquet文件。

  1. 在EMR集群的Hardware選項卡上,選擇主實例組並記錄主實例的EC2實例ID。

  2. 在Systems Manager控制台上,選擇會話管理器。

  3. 選擇"啟動會話"以啟動與群集主節點的會話。

  4. 通過運行以下命令將用戶切換到Hadoop

    sudo su hadoop
    

在實際用例中,AWS DMS任務在全量加載完成后開始向相同的Amazon S3位置寫入增量文件,區分全量加載和增量加載文件的方法是,完全加載文件的名稱以load開頭,而CDC文件名具有日期時間戳,如在后面步驟所示。從處理的角度來看,我們希望將全部負載處理到Hudi數據集中,然后開始增量數據處理。為此,我們將滿載文件移動到同一S3存儲桶下的另一個S3文件夾中,並在開始處理增量文件之前處理這些文件。

  1. 在EMR集群的主節點上運行以下命令(將<s3-bucket-name>替換為實際的bucket name):

    aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/  --exclude "*" --include "LOAD*.parquet" --recursive
    

有了datafull文件夾中的全量表轉儲,接着使用EMR集群上的HoodieDeltaStreamer實用程序來向Amazon S3上寫入Hudi數據集。

  1. 運行以下命令將Hudi數據集填充到同一個S3 bucket中的Hudi文件夾中(將<S3 bucket name>替換為CloudFormation創建的S3 bucket的名稱):
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
  --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
  --master yarn --deploy-mode cluster \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.hive.convertMetastoreParquet=false \
/usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
  --table-type COPY_ON_WRITE \
  --source-ordering-field dms_received_ts \
  --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \
  --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
  --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
  --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
    --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --enable-hive-sync

前面的命令運行一個運行HoodieDeltaStreamer實用程序的Spark作業。有關此命令中使用的參數的詳細信息,請參閱編寫Hudi表。

當Spark作業完成后,可以導航到AWS Glue控制台,找到在hudiblogdb數據庫下創建的名為retail_transactions的表,表的input format是org.apache.hudi.hadoop.HoodieParquetInputFormat.

接下來查詢數據並查看目錄中retail_transactions表的數據。

  1. 在先前建立的Systems Manager會話中,運行以下命令(確保已完成post的所有先前條件,包括在Lake Formation中將IAMAllowedPrincipals添加為數據庫創建者):

    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" \
    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
    --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar
    
    
  2. 對retail_transactions表運行以下查詢:

    spark.sql("select * from hudiblogdb.retail_transactions order by tran_id").show()
    

接着可以在表中看到與MySQL數據庫相同的數據,其中有幾個列是由HoodieDeltaStreamer自動添加Hudi元數據。

現在在MySQL數據庫上運行一些DML語句,並將這些更改傳遞到Hudi數據集。

  1. 在MySQL數據庫上運行以下DML語句
insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;
delete from dev.retail_transactions where tran_id=2;

幾分鍾后將看到在S3存儲桶中的dmsdata/dev/retail_transactions文件夾下創建了一個新的.parquet文件。

  1. 在EMR集群上運行以下命令,將增量數據獲取到Hudi數據集(將<s3 bucket name>替換為CloudFormation模板創建的s3 bucket的名稱):

    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync \
    --checkpoint 0
    

    此命令與上一個命令之間的關鍵區別在於屬性文件,該文件包含–-props和--checkpoint參數,對於先前執行全量加載的命令,我們使用dfs-source-retail-transactions-full.properties進行全量加載、dfs-source-retail-transactions-incremental.properties進行增量加載,這兩個屬性文件之間的區別是:

    • 源數據的位置在AmazonS3中的全量數據和增量數據之間發生變化。
    • SQL transformer查詢包含了一個全量任務的Op字段,因為AWS DMS首次全量加載不包括parquet數據集的Op字段,Op字段可有I、U和D值,表示插入、更新和刪除。

本文后面的"部署到生產環境時的注意事項"部分討論--checkpoint參數的詳細信息。

  1. 作業完成后,在spark shell中運行相同的查詢。

將會看到這些更新應用於Hudi數據集。

另外還可以使用Hudi Cli來管理Hudi數據集,以查看有關提交、文件系統、統計信息等的信息。

  1. 為此在Systems Manager會話中,運行以下命令

    /usr/lib/hudi/cli/bin/hudi-cli.sh
    
  2. 在Hudi Cli中,運行以下命令(將<s3 bucket name>替換為CloudFormation模板創建的s3 bucket的名稱):

    connect --path s3://<s3-bucket-name>/hudi/retail_transactions
    
  3. 要檢查Hudi數據集上的提交(commit),請運行以下命令

    commits show
    

還可以從Hudi數據集查詢增量數據,這對於希望將增量數據用於下游處理(如聚合)時非常有用,Hudi提供了多種增量提取數據的方法,Hudi快速入門指南中提供了如何使用此功能的示例。

6. 部署到生產環境時的注意事項

前面展示了一個如何從關系數據庫到基於Amazon S3的數據湖構建CDC管道的示例,但如果要將此解決方案用於生產,則應考慮以下事項:

  • 為了確保高可用性,可以在多AZ配置中設置AWS-DMS實例。
  • CloudFormation將deltastreamer實用程序所需的屬性文件部署到S3://<s3bucket name>/properties/處的S3 bucket中,你可以根據需求定制修改,其中有幾個參數需要注意
    • deltastreamer.transformer.sql – 此屬性是Deltastreamer實用程序的一個非常強大的特性:它使您能夠在數據被攝取並保存在Hudi數據集中時動態地轉換數據,在本文中,我們展示了一個基本的轉換,它將tran_date列強制轉換為字符串,但是您可以將任何轉換作為此查詢的一部分應用。
    • parquet.small.file.limit– 此字段以字節為單位,是一個關鍵存儲配置,指定Hudi如何處理Amazon S3上的小文件,由於每個分區的每個插入操作中要處理的記錄數,可能會出現小文件,設置此值允許Hudi繼續將特定分區中的插入視為對現有文件的更新,從而使文件的大小小於此值small.file.limit被重寫。
    • parquet.max.file.size – 這是Hudi數據集中單個parquet文件的最大文件大小,之后將創建一個新文件來存儲更多數據。對於Amazon S3的存儲和數據查詢需求,我們可以將其保持在256MB-1GB(256x1024x1024=268435456)。
    • [Insert|Upsert|bulkinsert].shuffle.parallelism。本篇文章中我們只處理了少量記錄的小數據集。然而,在實際情況下可能希望在第一次加載時引入數億條記錄,然后增量CDC數據達百萬,當希望對每個Hudi數據集分區中的文件數量進行非常可預測的控制時,需要設置一個非常重要的參數,這也需要確保在處理大量數據時,不會達到Apache Spark對數據shuffle的2GB限制。例如,如果計划在第一次加載時加載200 GB的數據,並希望文件大小保持在大約256 MB,則將此數據集的shuffle parallelism參數設置為800(200×1024/256)。有關詳細信息,請參閱調優指南
  • 在增量加載deltastreamer命令中,我們使用了一個附加參數:--checkpoint 0。當Deltastreamer寫Hudi數據集時,它將檢查點信息保存在.hoodie文件夾下的.commit文件中,它在隨后的運行中使用這些信息,並且只從Amazon S3讀取數據,后者是在這個檢查點時間之后創建的,在生產場景中,在啟動AWS-DMS任務之后,只要完成全量加載,該任務就會繼續向目標S3文件夾寫入增量數據。在接下來的步驟中,我們在EMR集群上運行了一個命令,將全量文件手動移動到另一個文件夾中,並從那里處理數據。當我們這樣做時,與S3對象相關聯的時間戳將更改為最新的時間戳,如果在沒有checkpoint參數的情況下運行增量加載,deltastreamer在手動移動滿載文件之前不會提取任何寫入Amazon S3的增量數據,要確保Deltastreamer第一次處理所有增量數據,請將檢查點設置為0,這將使它處理文件夾中的所有增量數據。但是,只對第一次增量加載使用此參數,並讓Deltastreamer從該點開始使用自己的檢查點方法。
  • 對於本文,我們手動運行Spark submit命令,但是在生產集群中可以運行這一步驟
  • 可以使用調度或編排工具安排增量數據加載命令以固定間隔運行,也可以通過向spark submit命令傳遞附加參數--min-sync-interval-seconds *XX* –continuous,以特定的頻率以連續方式運行它,其中XX是數據拉取每次運行之間的秒數。例如,如果要每5分鍾運行一次處理,請將XX替換為300。

7. 清理

當完成對解決方案的探索后,請完成以下步驟以清理CloudFormation部署的資源

  • 清空CloudFormation堆棧創建的S3 bucket
  • 刪除在s3://<EMR-Logs-s3-Bucket>/HudiBlogEMRLogs/下生成的任何Amazon EMR日志文件。
  • 停止AWS DMS任務Hudiblogload。
  • 刪除CloudFormation。
  • 刪除CloudFormation模板后保留的所有Amazon RDS for MySQL數據庫快照。

8. 結束

越來越多的數據湖構建在Amazon S3,當對數據湖的數據進行變更時,使用傳統方法處理數據刪除和更新涉及到許多繁重的工作,在這篇文章中,我們看到了如何在Amazon EMR上使用AWS DMS和HoodieDeltaStreamer輕松構建解決方案。我們還研究了在將數據集成到數據湖時如何執行輕量級的記錄級轉換,以及如何將這些數據用於聚合等下游流程。我們還討論了使用的重要設置和命令行選項,以及如何修改它們以滿足個性化的需求。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM