Hopsworks特征存儲庫統一了在線和批處理應用程序的特征訪問而屏蔽了雙數據庫系統的復雜性。我們構建了一個可靠且高性能的服務,以將特征物化到在線特征存儲庫,不僅僅保證低延遲訪問,而且還保證在服務時間可以訪問最新鮮的特征值。
企業機器學習模型為指導產品用戶交互提供了價值價值。通常這些 ML 模型應用於整個實體數據庫,例如由唯一主鍵標識用戶。離線應用程序的一個示例是預測客戶終身價值(Customer Lifetime Value),其中可以定期(每晚、每周)分批預測,然后用於選擇營銷活動的目標受眾。然而更先進的人工智能應用程序可以實時指導用戶交互,例如推薦系統。對於這些在線應用程序,模型輸入的某些部分(特征向量)將在應用程序本身中可用,例如最后點擊的按鈕,而特征向量的其他部分則依賴於歷史或上下文數據,必須檢索后端存儲,例如用戶在過去一小時內點擊按鈕的次數或按鈕是否為熱門按鈕。
在這篇博客中,我們將深入探討在線應用程序的需求細節,以及 Hopsworks Feature Store 如何抽象並規避雙存儲系統的復雜性。
1. 生產中的機器學習模型
雖然具有(分析)模型的批處理應用程序在很大程度上類似於模型本身的訓練,需要有效訪問將要參與評分的大量數據,但在線應用程序需要低延遲訪問給定主鍵的最新特征值,然后作為特征向量發送到模型服務實例進行推理。
據我們所知沒有單一的數據庫能夠高性能滿足這兩個要求,因此數據團隊傾向於將用於訓練和批量推理的數據保留在數據湖中,而 ML工程師更傾向於構建微服務以將微服務中的特征工程邏輯復制到在線應用程序中。
然而,這給數據科學家和機器學習工程師帶來了不必要的障礙,無法快速迭代並顯着增加機器學習模型的用於生產環境的時間
- 數據科學視角:數據和基礎設施通過微服務緊密耦合,導致數據科學家無法從開發轉向生產,也無法復用特征。
- ML 工程視角:大量工程工作以保證對生產中數據的一致訪問,正如 ML 模型在訓練過程中所看到的那樣。
2. Hopsworks特征存儲庫:透明的雙存儲系統
Hopsworks特征存儲庫是一個雙存儲系統,由高帶寬(低成本)離線存儲和低延遲在線存儲組成。離線存儲是我們 HopsFS 文件系統上的 Apache Hudi 表(由 S3 或 Azure Blob 存儲支持)和外部表(例如 Snowflake、Redshift 等),提供對大量特征數據的訪問以用於訓練或批量評分。相比在線存儲是一個低延遲的鍵值數據庫,它只存儲每個特征的最新值及其主鍵。 因此在線特征存儲充當這些特征值的低延遲緩存。
為了使該系統對數據科學家有價值並縮短生產時間,並為最終用戶提供良好的體驗,它需要滿足一些要求:
- 用於訓練和服務的一致特征:在 ML 中,為生產中的特征復制精確的特征工程邏輯非常重要,因為它用於生成模型訓練的特征。
- 特征新鮮度:低延遲、高吞吐量的在線特征存儲只有在存儲在其中的數據保持最新時才有益,特征新鮮度被定義為觸發特征重新計算的事件到達與重新計算的特征在在線特征庫中發布之間的端到端延遲。
- 延遲:在線特征庫必須提供近乎實時的低延遲和高吞吐量,以便應用程序能夠使用盡可能多的特征及其可用的SLA。
- 可訪問性:數據需要可通過直觀的 API 訪問,就像從離線特征存儲中提取數據進行訓練一樣容易。
Hopsworks在線特征庫圍繞四大支柱構建,以滿足需求,同時擴展以管理大量數據:
- HSFS API:Hopsworks 特征存儲庫是開發人員特征存儲的主要入口點,可用於 Python 和 Scala/Java。HSFS 將兩個存儲系統抽象出來,提供透明的 Dataframe API(Spark、Spark Structured Streaming、Pandas)用於在線和離線存儲的寫入和讀取。
- 元數據:Hopsworks 可以存儲大量自定義元數據,以便數據科學家發現、管理和重用特征,而且還能夠在將模型移至生產時依賴模式和數據質量。
- 引擎:在線特征存儲帶有可擴展的無狀態服務,可確保數據盡快寫入在線特征存儲,而不會從數據流(Spark 結構化流)或靜態 Spark 或 Pandas DataFrame中進行寫入放大,即不必在攝取特征之前先將特征物化到存儲中 - 可以直接寫入特征存儲。
- RonDB:在線存儲背后的數據庫是世界上最快的具有 SQL 功能的鍵值存儲。不僅為在線特征數據構建基礎,而且還處理 Hopsworks 中生成的所有元數據。
我們將在以下部分詳細介紹其中的每一部分,並提供一些用於定量比較的基准。
3. RonDB:在線特征存儲,文件系統和元數據的基礎
Hopsworks 是圍繞分布式橫向擴展元數據從頭開始構建的。這有助於確保 Hopsworks 內服務的一致性和可擴展性,以及數據和 ML 工件的注釋和可發現性。
自第一次發布以來,Hopsworks 一直使用 NDB Cluster(RonDB 的前身)作為在線特征存儲。2020 年我們創建了 RonDB 作為 NDB Cluster 的托管版本,並針對用作在線特征存儲進行了優化。
但是在 Hopsworks 中我們將 RonDB 用於不僅僅是在線特征存儲。RonDB 還存儲整個特征存儲庫的元數據,包括模式、統計信息和提交。 RonDB 還存儲了文件系統 HopsFS 的元數據,其中存儲了離線 Hudi 表。使用 RonDB 作為單個元數據數據庫,我們使用事務和外鍵來保持 Feature Store 和 Hudi 元數據與目標文件和目錄(inode)一致。Hopsworks 可通過 REST API 或直觀的 UI(包括特征目錄)訪問或通過 Hopsworks 特征存儲 API (HSFS) 以編程方式訪問。
4. OnlineFS:可擴展的在線特征物化引擎
有了底層的 RonDB 和所需的元數據,我們就能夠構建一個橫向擴展、高吞吐量的物化服務,以在在線特征存儲上執行更新、刪除和寫入——我們簡單地將其命名為 OnlineFS。
OnlineFS 是一種使用 ClusterJ 直接訪問 RonDB 數據節點的無狀態服務。 ClusterJ 被實現為原生 C++ NDB API 之上的高性能 JNI 層,提供低延遲和高吞吐量。由於 RonDB 中元數據的可用性,例如 avro 模式和特征類型,我們能夠使 OnlineFS 無狀態。 使服務無狀態允許我們通過簡單地添加或刪除服務的實例來向上和向下擴展對在線特征存儲的寫入,從而隨着實例的數量線性地增加或減少吞吐量。
讓我們完成將數據寫入在線特征存儲所需的步驟,這些步驟在下圖中編號。
- 特征作為 Pandas 或 Spark DataFrame寫入特征存儲
每個 Dataframe 更新一個稱為特征組的表(離線存儲中有一個類似的表)。一個特征組中的特征共享同一個主鍵,可以是復合主鍵。 主鍵與元數據的其余部分一起被跟蹤。 因此Hopsworks 特征存儲庫有一個 Dataframe API,這意味着特征工程的結果應該是將寫入到特征存儲的常規 Spark、Spark Structured Streaming 或 Pandas Dataframe。對於所有三種類型的DataFrame,用於寫入特征存儲的 API 幾乎相同。 通過對特征組對象的引用可以插入DataFrame。 特征組在創建時已配置為將 Dataframe 存儲到在線和離線庫或僅存儲到其中之一。
- 編碼和產生
Dataframe 的行使用 avro 進行編碼並寫入在 Hopsworks 上運行的 Kafka中。每個特性組都有自己的 Kafka 主題,具有可配置的分區數量,並按主鍵進行分區,這是保證寫入順序所必需的。
- 消費和解碼
我們使用 Kafka 來緩沖來自 Spark 特征工程作業的寫入,因為直接寫入 RonDB 的大型 Spark 集群可能會使 RonDB 過載,因為現有 Spark JDBC 驅動程序中缺乏背壓。OnlineFS 從 Kafka 讀取緩沖的消息並對其進行解碼。 重要的是OnlineFS 僅解碼原始特征類型,而嵌入等復雜特征以二進制格式存儲在在線特征存儲中。
- 基於主鍵的Upsert
OnlineFS 可以使用 ClusterJ API 將行實際更新插入到 RonDB。Upsert 分批執行(具有可配置的批量大小)以提高吞吐量。
由於管道步驟中的所有服務都可以訪問相同的元數據,因此我們能夠向用戶隱藏與編碼和模式相關的所有復雜性。 此外所有涉及的服務都是水平可擴展的(Spark、Kafka、OnlineFS),並且由於我們類似於流的設置,該過程不會創建不必要的數據副本,即沒有寫放大。 由於模式注冊表、X.509 證書管理器和 Hopsworks 中的 Kafka 等服務的可用性,這種高度可擴展的設置成為可能。 在任何時候X.509 證書都用於雙向身份驗證,而 TLS 用於加密網絡流量。
5. 可訪問性意味着透明的 API
在分布式系統中,我們經常談論透明度。 如果分布式系統對開發人員隱藏網絡訪問和實現特定知識,則它是透明的。 在 Hopsworks 特征存儲庫中,寫入是通過相同的 API 透明地完成的,如前所述(1)無論是常規的 Spark、Spark Streaming 還是 Pandas 以及(2)系統負責一致地更新在線和離線存儲
插入
HSFS 庫中的核心抽象是表示特征組、訓練數據集和特征存儲中的特征的元數據對象。 我們使用 HSFS 的目標是讓開發人員能夠使用他們喜歡的語言和框架來設計功能。 當我們在 Dataframe API 上對齊時,Dataframe 中包含的任何內容都可以寫入特征存儲。 如果您有現有的 ETL 或 ELT 管道,它們生成包含特征的數據幀,您可以通過簡單地獲取對其特征組對象的引用並使用您的數據幀作為參數調用 .insert()
來將該數據幀寫入特征存儲 . 這可以從定期安排的作業中調用(使用您選擇的任何編排器,或者,如果您想要開箱即用的編排器,則 Hopsworks 附帶 Airflow)。 但是也可以通過將批次寫入 Spark 結構化流應用程序中的數據幀來連續更新特征組對象。
# populate feature group metadata object
store_fg_meta = fs.create_feature_group(name="store_fg",
version=1,
primary_key=["store"],
description="Store related features",
online_enabled=True)
# create feature group for the first time in feature store
fg.save(Dataframe)
# replace .save with .insert for scheduled batch job
fg.insert(Dataframe)
# if required, stream data only to the online feature store in long running Spark
# Structured Streaming application
fg.insert_stream(streaming_Dataframe)
讀取
許多現有的特征存儲沒有模型的表示。 然而Hopsworks 引入了訓練數據集抽象來表示用於訓練模型的特征集和特征值。 也就是說,不可變的訓練數據集和模型之間存在一對一的映射關系,但可變特征組與不可變的訓練數據集之間是一對多的關系。 您可以通過從特征組中加入、選擇和過濾特征來創建訓練數據集。 訓練數據集包括特征的元數據,例如它們來自哪個特征組、該特征組的提交 ID 以及訓練數據集中特征的順序。 所有這些信息使 HSFS 能夠在稍后的時間點重新創建訓練數據集,並在服務時透明地構建特征向量。
# create a query
feature_join = rain_fg.select_all() \
.join(temperature_fg.select_all(), on=["location_id"]) \
.join(location_fg.select_all())
td = fs.create_training_dataset("rain_dataset",
version=1,
label=”weekly_rain”,
data_format=”tfrecords”)
# materialize query in the specified file format
td.save(feature_join)
# we can also use the training dataset for serving
# this serving code typically runs in a Python environment
td = fs.get_training_dataset(“rain_dataset”, version=1)
# get serving vector
td.get_serving_vector({“location_id”: “honolulu”})
在線特征庫的使用方要么是使用 ML 模型的應用程序,要么是模型服務基礎設施,這些基礎設施通過缺失的特征來豐富特征向量。 Hopsworks 為在線庫提供了一個基於 JDBC 的 API。 JDBC 具有提供高性能協議、網絡加密、客戶端身份驗證和訪問控制的優勢。 HSFS 為 Python 和 Scala/Java 提供語言級別的支持。 但是,如果您的服務應用程序在不同的編程語言或框架中運行,您總是可以直接使用 JDBC。
6. Benchmarks
Mikael Ronstrom(NDB 集群的發明者和邏輯時鍾的數據負責人,領導 RonDB 團隊)為 RonDB 提供了 sysbench 基准測試,並提供了針對 Redis 的比較性能評估。在本節中我們展示了 OnlineFS 服務的性能,能夠處理和維持寫入在線特征存儲的高吞吐量,以及對 Hopsworks 中典型托管 RonDB 設置的特征向量查找延遲和吞吐量的評估。
在此基准測試中,Hopsworks 設置了 3xAWS m5.2xlarge(8 個 vCPU,32 GB)實例(1 個頭,2 個工作器)。 Spark 使用 worker 將數據幀寫入在線庫。此外相同的工作人員被重新用作客戶端,在在線特征存儲上執行讀取操作以進行讀取基准測試。
RonDB 設置了 1x AWS t3.medium(2 vCPU,4 GB)實例作為管理節點,2x r5.2xlarge(8 vCPU,64 GB)實例作為數據節點,3x AWS c5.2xlarge(8 vCPU,16 GB) ) MySQL 服務器的實例。這種設置允許我們在具有 2 倍復制的在線特征存儲中存儲 64GB 的內存數據。 MySQL 服務器為在線特征存儲提供 SQL 接口,在此基准測試中,我們沒有使 RonDB 數據節點完全飽和,因此可以潛在地添加更多 MySQL 服務器和客戶端,以增加超出此處所示水平的吞吐量。
寫吞吐
我們對 OnlineFS 服務中寫入 RonDB 的吞吐量進行了基准測試。 此外,我們測量了從 Kafka 主題中獲取記錄到提交到 RonDB 之間處理記錄所需的時間。 對於這個基准測試,我們部署了兩個 OnlineFS 服務,一個在頭節點上,一個在 MySQL 服務器節點之一上。
我們通過將 20M 行從 Spark 應用程序寫入在線特征存儲來運行實驗。 經過短暫的預熱期后,兩個服務實例的吞吐量穩定在約 126K 行/秒(11 個特征)、約 90K 行/秒(51 個特征)和最大特征向量約 60K 行/秒。 由於其設計,這可以通過添加更多服務實例輕松擴展。
其次,我們輸出了在 OnlineFS 服務中處理特征向量所需的時間。 這個時間不包括一條記錄在 Kafka 中等待處理的時間,原因是等待時間在很大程度上取決於寫入 Kafka 的 Spark 執行程序的數量。 相反您應該依靠吞吐量數字將它們與您的要求進行比較。
處理時間是按行報告的,但 OnlineFS 中的部分管道是並行化的,例如,行以 1000 的批次提交給 RonDB。通過這種設置,我們實現了 11 個特征的 p99 約為 250 毫秒,行大小為 948 字節。
服務查找吞吐量和延遲
我們對與越來越多的並行執行請求的客戶端相關的不同特征向量大小的吞吐量和延遲進行了基准測試。 請注意,客戶端被分成兩個工作節點(每個 8vCPU)。
每個請求的單個向量
在這個基准測試中,每個請求都包含一個主鍵值查找(一個特征向量)。 吞吐量和延遲可線性擴展至 16 個客戶端,同時保持低延遲。 對於超過 16 個客戶端,我們觀察到運行客戶端的主機達到其最大 CPU 和網絡利用率。 此外,我們沒有看到 RonDB 數據節點或 MySQL 服務器的過度使用,這意味着我們可以通過從更大的工作實例運行客戶端或添加更多工作主機來運行客戶端來進一步線性擴展。
批處理,每個請求 100 個向量
為了證明 RonDB 每秒可擴展到更多的關鍵查找,我們運行了另一個基准測試,其中每個客戶端以 100 個批次請求特征向量。正如我們所看到的查找數量仍然線性擴展,查找吞吐量增加了 15 倍,而 每個請求的延遲僅適度增加。
7. 結論
Hopsworks 附帶托管 RonDB,為 Hopsworks 和在線特征提供統一的元數據存儲。 在這篇博客中,我們展示了一個高度可用的雙節點 RonDB 集群(r5.2xlarge VM)線性擴展到 >250k ops/sec,特征向量查找的 11 個特征的大小約為 1KB,p99 延遲為 7.5 毫秒。 因此Hopsworks 提供了當今市場上性能最高的在線特征庫。