歡迎關注微信公眾號:ApacheHudi
1. 引入
Drop是一個智能的獎勵平台,旨在通過獎勵會員在他們喜愛的品牌購物時獲得的Drop積分來提升會員的生活,同時幫助他們發現與他們生活方式產生共鳴的新品牌。實現這一體驗的核心是Drop致力於在整個公司內推廣以數據為基礎的文化,Drop的數據用於多種形式,包括但不限於商業智能、測量實驗和構建機器學習模型。
為了確保有效地利用數據,工程團隊一直在尋找可以改善基礎架構以適應當前和未來的需求的方法,與許多其他高成長型初創公司的經驗類似,我們對數據的需求規模超過了基礎架構的能力,因此需要將以商業智能為中心的數據基礎架構演變為可以釋放大數據需求和能力的基礎架構。
2. 動機
Drop在成立之初就着力構建數據基礎架構,以便利用自動報告和儀表板來觀察關鍵業務的指標。我們的第一代數據基礎架構使用以AWS Redshift數據倉庫為中心的架構,使用Apache Airflow調度自定義批處理ETL作業,使用Looker構建自動化儀表板和報告。
該架構實現了我們的早期目標,但是利用數據的需求已經超出了商業智能的范圍。為向會員提供更個性化的體驗,我們需要一個支持高級功能的數據平台,例如更深入的分析、全面的實驗測量以及機器學習模型的開發,我們也意識到原來架構的技術限制可能會阻止我們解鎖那些所需的功能,一些最突出的限制包括:
- 隨着數據規模的擴大,關鍵的ETL作業的運行時間猛增,一些作業無法在24小時內運行。
- AWS Redshift將計算與存儲混合在一起,限制了提取數據到倉庫的擴展能力,在Redshift中提供的數據選擇非常有限,從而偏離了其作為“中心”的位置。
- 在Redshift集群之上構建機器學習架構效果並不理想,因為Redshift僅能處理結構化數據,並且用於填充倉庫的批處理ETL作業延遲較高。
為解決這些不足並考慮到后續的擴展性,我們需要繼續構建數據基礎架構。數據湖是一個集中式存儲庫,能夠存儲任何規模的非結構化和結構化數據,構建數據湖將使我們能夠解決第一代數據基礎架構的局限性,同時允許我們保留原始架構的關鍵組件,這些組件仍然滿足后續的長期數據基礎架構計划。
3. 構建數據湖
當開始構建Drop的數據湖時,我們遵循以下指導原則:
- 技術堆棧保持簡單:作為Drop Engineering的核心理念之一,我們旨在利用現有的和經過驗證的AWS技術來簡化我們的技術棧,已有AWS經驗能夠讓我們快速制作新功能原型以及利用AWS生態系統中其他服務的集成優勢,因此繼續使用AWS技術意義重大。我們在整個數據團隊中繼續保留Python和SQL,因此無需學習用於數據提取或處理的新語言,而是使用PySpark來構建所有Apache Spark作業。
- 避免重復造輪子:在開源和技術社區中尋找成功的數據湖實施案例和經驗,並向Uber和Airbnb尋求有關頂層設計的思路和靈感,有關更多特定技術的知識,我們依賴於已發布的AWS re:Invent的演講,通過參加2018年的AWS re:Invent並聽取Robinhood關於他們如何實現數據湖的故事,一切都得以實現,Robinhood的故事與我們技術棧非常相似,對要解決的問題以及解決問題的理念產生了深深的共鳴,所有這些故事和資源組成了我們數據湖的最初藍圖。
- 確保可擴展性:隨着我們的組織從早期的初創企業過渡到高速增長的企業,數據的規模以及利用數據的復雜性迅速增長,這意味着需要在指數增長方案中有效執行的技術,同時最小化相關技術的復雜性開銷。
4. 總體架構
5. 數據湖
5.1 調度層
調度層負責管理和執行數據湖中所有工作流程,調度程序會協調數據湖中所有數據的大部分移動,利用Airflow使我們能夠通過Airflow有向無環圖(DAG)構建所有工作流和基礎架構,這也簡化了我們的工程開發和部署流程,同時為我們的數據基礎架構提供了版本控制,Apache Airflow項目還包含大量支持AWS的集成庫和示例DAG,使我們能夠快速集成和評估新技術。
5.2 攝入層
當從多個不同的數據源(例如服務器日志,增長營銷平台和業務運營服務)拉取數據時,數據主要是從RDS Postgres數據庫,以批處理和流處理兩種形式提取到數據湖。我們選擇AWS Database Migration Services(DMS)來攝取這些表單,因為它與RDS Postgres進行了原生集成,並且能夠以批和流形式將數據提取到S3中,DMS復制Postgres“預寫日志(WAL)”的數據,以“變更數據捕獲(CDC)”任務進入數據湖,並在S3上生成一分鍾分區的Apache Parquet文件,此為流處理方式。批處理使用DMS的“全量遷移”任務以將Postgres庫表的快照導入S3數據湖,批處理管道由調度層通過Airflow以自定義DAG的形式進行管理,這允許我們控制批快照到S3的調度,並在批運行結束時關閉未使用的DMS資源以降低服務成本。
5.3 處理層
處理層負責將數據從存儲層的“原始(Raw)”部分轉換為數據湖的“列式(Columnarized)”部分中的標准化列式和分區結構。我們使用Lambda架構來協調給定數據源的批和流數據,這種數據處理模型使我們能夠將高質量的批量快照與一分鍾延遲流文件相結合來生成給定數據集的最新列式版本。我們將AWS Glue及其Data Catalog(數據目錄)用作數據湖的中央metastore管理服務,metastore包含每個數據集的元數據,例如在S3中的位置、結構定義和整體大小,也可以使用AWS Glue Crawlers捕獲和更新該元數據,整個流程如下:
- 調度層啟動處理特定數據集Lambda架構的DAG。
- DAG將PySpark應用程序加載到S3,啟動AWS EMR集群,並在EMR中運行PySpark應用程序。
- 來自存儲層“原始”部分的批和流數據均作為EMR Spark應用程序的輸入,最終輸出是存儲層“列式”部分中使用Lambda架構進行協調的Parquet數據集。
- 完成EMR步驟后終止EMR集群以便降低EMR成本。
- 運行AWS Glue Crawler程序以更新AWS Glue目錄中表的元數據,該目錄元數據充當整個數據湖的中央metastore。
5.4 存儲層
S3作為存儲平台的原因有以下三個:易用性、高可靠性和相對低成本。數據存儲在存儲層中的“原始”和“列式”兩部分都基於S3,而“數倉(Warehouse)”中的數據位於Redshift集群中。數據湖中的所有數據首先通過攝入層以各種格式進入“Raw”部分,為了使數據保存到“列式”部分中,數據會通過處理層轉換為“列式”部分,並遵守分區標准。我們選擇Apache Parquet作為標准列式文件格式,因為Apache Parquet廣泛用於數據處理服務中,並且具有性能和存儲優勢。存儲層的“數倉”部分由Redshift數據倉庫組成,其數據來自“原始”和“列式”部分。還可以通過數據“熱度”對存儲層的各部分進行分類,類比於數據的訪問頻率,“原始”數據數據最冷,“列式”數據比“原始”數據更熱,而“數倉”數據是最熱的數據,因為它經常會被自動化儀表板和報告訪問。S3的另一個優勢是我們能夠根據數據規模控制成本,對於較冷的數據,我們可以更變S3存儲類型或者將數據遷移到AWS Glacier以降低成本。
5.5 從數據湖訪問數據
技術團隊和非技術團隊都會使用數據湖,以便更好地為決策提供依據並增強整體產品體驗,非技術團隊通常通過商業智能平台Looker來訪問數據,以自動化儀表板和報告的形式監控各項指標。
技術團隊應用范圍更廣,從探索性分析中的即席查詢到開發機器學習模型和管道。可以通過AWS Athena托管的Presto服務或通過Redshift數據倉庫直接查詢數據湖中的數據。AWS Athena還與AWS Glue的數據目錄集成,這使Athena可以完全訪問我們的中央metastore,這種原生集成使得Athena可以充當S3中存儲數據的主要查詢服務。我們還能夠利用Redshift Spectrum查詢S3數據,這在即席查詢場景下特別有用,我們也希望使用S3的數據來豐富Redshift集群的數據。為了對機器學習模型進行更正式的探索性分析和開發,需要結合使用Spark和Glue目錄來查詢或直接與S3中的對象進行交互。
5.6 檢查與監控
確保數據湖的消費者完全信任數據的准確性至關重要,我們建立了一套全面的檢查和監控程序以提醒我們任何意外情況。我們通過Airflow DAG構建的絕大多數數據湖流程,並且能夠使用Datadog監視Airflow作業失敗,並通過PagerDuty將關鍵問題轉發給工程師。為了建立數據質量檢查,我們還構建了一系列Airflow DAG,以生成特定數據集的自定義數據驗證指標,我們在給定數據集上運行的一組標准的驗證檢查和指標包括但不限於空檢查、行計數檢查和數據延遲指標,這些指標都會被推送到Datadog,這樣便可以在其中監視異常並在必要時向參與者發出告警。
6. 當前情況
Drop的數據湖現在每天從Postgres數據庫中提取十億條記錄,並處理TB級的作業,該數據湖已被眾多團隊采用,並為增強了數據分析能力以及開發和交付機器學習模型的能力。總體而言,該架構效果不錯,並且我們相信數據基礎架構的發展將使我們處於更好的位置,以適應當前和未來的需求。
7. 重點總結
我們的實施過程踩過不少坑,總結如下:
- 提取列式的數據的好處:盡管存儲層的“原始”部分與文件格式無關,但是以列式提取數據對下游更有利,當從Postgres表中攝取數據時,這些表包含冗長和復雜文本Blob的列(例如jsonb列),下游解析數據就成了噩夢,進一步依賴serDe文件來協助解析數據只會增加整體復雜性,由於Parquet的固有特性,將DMS配置為以Parquet格式輸出文件可保持列的架構完整性,並且我們的下游處理層作業也可提升速度和數據質量性能,這種折衷是以增加DMS生成parquet文件所需的內存資源為代價的。
- 技術棧使用Spark:我們的工程團隊以前在內部Hadoop方面的經驗非常有限,因此采用Spark帶來了很多麻煩。盡管有大量可用的Spark資源,但我們最大的痛點還是資源管理和錯誤排除,我們選擇EMR而不是AWS Gule構建ETL作業,這會提交已利用資源的控制級別,提升了性能和節約成本節約,但代價是復雜性的提升。為每個Spark作業配置最佳EMR資源非常復雜,我們很快了解到這些優化工作需要浪費高昂的時間。當EMR作業失敗時,我們也很難解決根源問題,在很多情況下,EMR群集生成的錯誤日志不夠細致,並且有時也掩蓋了重要細節。通過改進日志記錄過程並直接引用Spark執行程序日志,我們能夠發現更詳細的信息,從而更快發現根本原因。
- 關閉空閑資源:我們可以將節省的大部分費用歸功於我們在處理臨時工作負載方面的經驗,通過將幾乎所有的數據湖操作都構造為Airflow DAG,我們可以自動化何時刪除未使用的資源,我們只有在需要通過Airflow進行批量提取作業時才啟動DMS復制實例的能力,這使我們節省了始終保持實例可用的等效成本的90%以上。類似地,當我們的Airflow調度處理層作業時,我們僅使用競價型實例啟動EMR集群,並在完成時終止集群,這與我們始終擁有可用節點相比,平均節省了70%以上。
8. 下一步計划
我們也在尋找改善數據基礎架構的方法,並且已經開始制定下一步計划,包括:
- 通過諸如Apache Hudi或Delta Lake之類的技術改善數據可用性以及存儲層中的版本控制管理。
- 通過Apache Kafka和Debezium進行事件驅動開發來調整我們的攝入層功能。
- 使用AWS Lake Formation等工具改善數據訪問治理,這樣可以對團隊訪問哪些數據進行嚴格控制。