簡介: 本文翻譯自大數據技術公司 Databricks 針對數據湖 Delta Lake 的系列技術文章。眾所周知,Databricks 主導着開源大數據社區 Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術,而 Delta Lake 作為數據湖核心存儲引擎方案給企業帶來諸多的優勢。本系列技術文章,將詳細展開介紹 Delta Lake。
前言
本文翻譯自大數據技術公司 Databricks 針對數據湖 Delta Lake 系列技術文章。眾所周知,Databricks 主導着開源大數據社區 Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術,而 Delta Lake 作為數據湖核心存儲引擎方案給企業帶來諸多的優勢。
此外,阿里雲和 Apache Spark 及 Delta Lake 的原廠 Databricks 引擎團隊合作,推出了基於阿里雲的企業版全托管 Spark 產品——Databricks 數據洞察,該產品原生集成企業版 Delta Engine 引擎,無需額外配置,提供高性能計算能力。有興趣的同學可以搜索` Databricks 數據洞察`或`阿里雲 Databricks `進入官網,或者直接訪問 https://www.aliyun.com/product/bigdata/spark 了解詳情。
譯者:馮加亮(加亮),阿里雲計算平台事業部大數據工程師
Delta Lake 技術系列 - 流式計算
目錄
- Chapter-01 使用 Delta Lake 解決流式數據入湖的難題
- Chapter-02 使用 Delta Lake 簡化股票行情數據的分析
- Chapter-03 Tilting Point 游戲公司是如何使用 Delta Lake 處理流數據
- Chapter-04 使用 Delta Lake 構建流媒體視頻的解決方案
本文介紹內容
Delta Lake 系列電子書由 Databricks 出版,阿里雲計算平台事業部大數據生態企業團隊翻譯,旨在幫助領導者和實踐者了解 Delta Lake 的全部功能以及它所處的場景。在本文中,Delta Lake 系列-實時流處理場景(The Delta Lake Series Streaming),通過客戶最佳實踐案例,介紹使用 Delta Lake 做流式數據計算的場景。
Chapter-01 使用 Delta Lake 解決流式數據計算的難題
傳統流式數據和數倉數據可分成數據湖和數據倉庫兩部分。
數據湖的不足
- 很難合並來自不同系統的流數據。
- 在數據湖中想要更新數據幾乎是不可能的,尤其在涉及到財務對帳,數據調整,使得流式數據更新尤為重要。
- 數據湖的查詢速度通常很慢。
- 優化存儲非常困難,並且通常需要復雜的邏輯
數據倉庫的不足
- 局限於使用 SQL 分析
- 在場景需要的情況下同時訪問流數據和存儲數據是非常困難的
- 數據倉庫的可伸縮性不好
- 計算和存儲無法分離,使得數據倉庫使用起來較為昂貴
Delta Lake的優勢
(Databrick 官網 Delta Lake 指南: https://docs.databricks.com/delta/index.html)
以上內容介紹了數據湖和數據倉庫的局限性,那么 Delta Lake 是如何解決以上的問題呢:
- Delta Lake 中的表既可作為數據源進行大數據分析又可作為目的源表進行流式實時寫入,真正實現批流一體。
- Delta 表支持增刪操作
- Delta Lake 支持 ACID,使得創建兼容的數據解決方案很容易
- 具有專業的機器學習,ETL 處理,數據分析和查詢簡單高效
- 計算和存儲分離,支持更高效的解決方案
Chapter-02 Delta Lake 在股票行情分析的應用
實時分析股票數據是一項復雜的工作,這其中有大量的流數據需要實時維護同時還要處理歷史數據,事務一致性方面面臨很大挑戰,這些問題在 Delta Lake 架構中通過Apache Spark 的可伸縮性、流計算和靈活的數據分析的能力,以及 ACID 事務性可以輕松解決。
使用 Delta Lake 解決方案架構圖
- 通過此架構圖,我們清晰的看到將股票價格數據和金融數據實時數據寫入 Delta Lake 的兩張表里面。
- 然后我們通過讀取 Delta Lake 表中的數據做 ETL 數據清洗,並將清洗后的數據寫入第三個 Delta Lake 表,用於下游分析
實時的流數據有兩類:Fundamentals data 和 Price data,為了模擬這兩種數據,我們在Delta Lake 中創建了 Delta 表,使用 .format(‘delta’)並指向 OSS 數據存儲
%pyspark # Create Fundamental Data (Databricks Delta table) dfBaseFund = spark \ .read \ .format(‘delta’) \ .load(‘/delta/stocksFundamentals’)
%pyspark # Create Price Data (Databricks Delta table) dfBasePrice = spark \ .read \ .format(‘delta’) \ .load(‘/delta/stocksDailyPrices’)
接下來,我們通過開始和結束日期篩選出來有用的數據,然后將該日期范圍的價格和基本數據合並到 OSS
%pyspark # Determine start and end date of available data row = dfBasePrice.agg( func.max(dfBasePrice.price_date).alias(“maxDate”), func.min(dfBasePrice.price_date).alias(“minDate”) ).collect()[0] startDate = row[“minDate”] endDate = row[“maxDate”] # Define our date range function def daterange(start_date, end_date): for n in range(int ((end_date - start_date).days)): yield start_date + datetime.timedelta(n) # Define combinePriceAndFund information by date and def combinePriceAndFund(theDate): dfFund = dfBaseFund.where(dfBaseFund.price_date == theDate) dfPrice = dfBasePrice .where(dfBasePrice.price_date == theDate) .drop(‘price_date’) # Drop the updated column dfPriceWFund = dfPrice.join(dfFund, [‘ticker’]).drop(‘updated’) # Save data to OSS dfPriceWFund .write .format(‘delta’) .mode(‘append’) .save(‘/delta/stocksDailyPricesWFund’) # Loop through dates to complete fundamentals + price ETL process for single_date in daterange( startDate, (endDate + datetime.timedelta(days=1)) ): print ‘Starting ’ + single_date.strftime(‘%Y-%m-%d’) start = datetime.datetime.now() combinePriceAndFund(single_date) end = datetime.datetime.now() print (end - start)
現在我們有一系列價格數據寫入到 oss 中的 /delta/stocksDailyPricesWFund。我們通過讀取 OSS 指定路徑的數據.format(“Delta”)來創建 Delta Lake 表。
%pyspark dfPriceWithFundamentals = spark.readStream \ .format(“delta”) \ .load(“/delta/stocksDailyPricesWFund”) // Create temporary view of the data dfPriceWithFundamentals.createOrReplaceTempView(“priceWithFundamentals”)
創建一個視圖允許我們實時計算價格/收益比率進行分析。
%sql CREATE OR REPLACE TEMPORARY VIEW viewPE AS select ticker, price_date, first(close) as price, (close/eps_basic_net) as pe from priceWithFundamentals where eps_basic_net > 0 group by ticker, price_date, pe
實時分析股票流數據
%sql select * from viewPE where ticker == “AAPL” order by price_date
由於整合數據集源數據是 Delta Lake 表,所以這個視圖不僅僅顯示批處理數據,同時也要展示新的數據流。如下圖所示:
與此同時,使用 Structured Streaming 不僅僅只是實時的將數據寫入 Delta Lake,同時也要保持主鍵的唯一性。
最后,我們演示了如何使用 Delta Lake 簡化股票實時數據分析。通過 Spark Structured Streaming 和 Delta Lake,我們可以使用 Databricks 集成的 Workspace 來創建一個具有數據湖和數據倉庫優點的高性能、可擴展的解決方案。
Databricks 統一數據平台消除了通常與流和事務一致性相關的數據工程,使數據工程和數據科學團隊能夠專注於他們的股票數據。
Chapter-03 Tilting Point 游戲公司使用 Delta Lake 處理流數據
背景
Tilting Point 新一代游戲運營商,在為頂級開發工作室提供專家資源、服務和運營支持,優化高質量的實時游戲等方面,做的都很成功。Tilting Point 通過其用戶獲取基金和世界級的技術平台,通過績效營銷和游戲直播,幫助開發商實現盈利。通過使用Delta Lake 架構, Tilting Point 公司可以實時的利用高質量的數據進行數據分析。
業務需求分析:
Tilting Point 的團隊之前按小時進行游戲分析報告的批處理工作,由於業務需要,他們希望能夠在5-10分鍾內完成實時報告。
他們還希望根據玩家的實時行為做出游戲內的 LiveOps 決策,將實時數據提供給服務系統,提供關於 LiveOps 變化的實時告警,這些告警會對游戲體驗產生很大的影響,我們通過采集這些實時的變化參數將游戲體驗盡可能做到極致。
此外,他們必須單獨存儲加密的個人身份信息 (PII) 數據,以保持 GDPR 的合規。
實時處理數據流的挑戰
Tilting Point 有一個專有的軟件開發工具,開發人員可以與之集成,將數據從游戲服務器發送到 AWS 中托管的服務器。該服務會刪除所有 PII 數據,然后將原始數據發送到 Amazon Firehose 終端。然后 Firehose 將 JSON 格式的數據持續轉儲到 S3
為了清洗原始數據並使其快速用於分析,團隊考慮將連續數據從 Firehose 推送到消息隊列(例如 Kafka, Kinesis ),然后使用 Spark 的 Structured Streaming(Databricks Structured Streaming)來連續處理數據並寫入 Delta Lake 表。
雖然這種架構聽起來非常適合以秒為單位處理低延遲的數據,但實際上 Tilting Point 對它們的 pipeline 並沒有這么低的延遲需求。他們希望在分鍾級,就能得到可供分析的數據。因此,他們決定取消消息隊列來簡化架構,而是使用 S3 作為結構化流作業的數據源。使用 S3 作為連續流數據源的關鍵挑戰是如何實時識別最近更新的文件。
每隔幾分鍾列出所有文件有兩個主要問題
- 高延遲:列出包含大量文件的目錄中的所有文件會有很高的性能開銷,並增加處理時間。
- 高成本:每隔幾分鍾列出大量文件會迅速增加S3成本。
利用 Structured Streaming,使用 blob 作為 Delta Lake 數據存儲。
為了從 S3 雲 blob 存儲中獲取連續流數據,Tilting Point 使用了 Databricks 的“S3- sqs”數據源選項。S3- sqs 提供了從 S3 增量流數據的封裝,而不需要對最近處理的文件再編寫任何狀態管理的代碼。
以下是 Tilting Point pipeline:
- 配置 Amazon S3 事件通知,通過 SNS 向 SQS 發送新的數據。
- Tilting Point 使用 S3-SQS 方式源讀取到達S3中的新數據。方式如下:
%pyspark spark.readStream \ .format(“s3-sqs”) \ .option(“fileFormat”, “json”) \ .option(“queueUrl”, ...) \ .schema(...) \ .load()
- Tilting Point 使用 Structured Streaming 進行數據清洗和轉換,基於游戲實時流數據,使用 Spark Streaming foreachBatch API,寫入到30個不同的 Delta Lake 表中。
- 流作業過程中會生成大量小文件,這將影響下游消費者的性能,因此,每天都會運行一個優化作業來合並表中的小文件,以便 Delta Lake 表在讀取數據時具有良好的性能。
使用 Delta Lake 架構,帶來的便利:
- 增加選配項“s3-sqs”,可以增量加載 S3 中的新文件,有助於快速處理新文件,而不會在列出文件時產生太多開銷
- 不會顯示文件狀態管理:無需顯示的進行文件狀態管理。
- 更低的操作成本:由於我們使用 S3 作為 Firehose 和 Structured Streaming 作業之間的 checkpoint,停止和執行數據的操作負擔相對較低
- 數據讀寫可靠:Delta Lake 提供 ACID(optimistic concurrency control)事務保證。這有助於讀寫更加可靠。
- 文件壓縮:在流處理過程中,會產生很多臨時小文件,這會影響讀寫性能。在 Delta 之前我們必須建立一個不同的表來編寫壓縮數據。在 Delta Lake 中,由於 ACID 事務,我們可以壓縮文件並安全地將數據重寫回相同的表中。
- 快照隔離: Delta Lake 的快照允許我們在流作業執行時,修改和壓縮數據時 Delta 表也可以正常讀取。
- 回滾:寫入錯誤的情況下,Delta Lake 的 Time Travel(Time Travel) 可以幫助我們回滾到表的前一個版本。
Chapter-04 使用 Delta Lake 構建流媒體視頻的解決方案
隨着傳統的付費電視繼續停滯不前,內容所有者開始接受直接消費者( D2C )訂閱和廣告支持的流媒體服務,以從他們的內容庫中賺錢。對於那些整個商業模式都圍繞着生產優質內容,然后將其授權給分銷商的公司來說,向現在玻璃體驗的轉變需要新的創新力,比如為向消費者提供內容建立媒體供應鏈,支持各種設備和操作系統的應用程序,並執行帳單和客戶服務等客戶關系功能。
由於大多數服務都是按月更新的,訂閱服務運營商需要隨時向用戶證明其價值。流媒體視頻的一般質量問題(包括緩沖、延遲、像素化、抖動、丟包和空白屏幕)會對業務產生重大影響,無論是增加用戶流失率還是降低視頻參與度。
瀏覽頻道,點擊進入和退出應用程序,從不同的設備同時登錄等等。而且,由於電視的本質,最重要、最引人注目、吸引最多觀眾的活動往往會出問題。如果你開始在社交媒體上收到投訴,你如何判斷這些投訴是某個用戶獨有的,還是地區性或全國性的問題?如果是全國性的,它是跨所有設備還是只跨特定類型(例如,OEM 可能更新了舊設備類型上的操作系統,最終導致與客戶端的兼容性問題)?
當考慮到用戶的數量、他們正在采取的操作的數量以及體驗中的切換(服務器到CDN到ISP到家庭網絡到客戶端)的數量時,識別、糾正和防止查看者體驗質量問題成為一個大數據問題。服務質量( QoS )有助於分析這些數據流,以便您能夠理解哪里出了問題、在哪里出了問題以及為什么出了問題。最終,你可以進行預測分析,了解可能出現的問題以及如何提前作出補救措施。
服務質量解決方案概述
這個解決方案的目的是為了統一改善其 QoS 系統的流媒體視頻平台。它基於 AWS 實驗室提供的 AWS 流媒體分析解決方案,我們隨后在此基礎上添加了 Databricks 作為統一數據分析平台,用於實時分析和高級分析功能。
通過使用 Databricks(Databricks cumstomers),流媒體平台可以通過始終利用由健壯和可靠的數據管道提供的最完整和最新的數據集來獲得更快的見解。通過使用協作環境加速數據科學,這減少了新特性上市的時間。它為管理端到端機器學習生命周期提供支持,並通過為數據工程和數據科學提供統一的平台,降低軟件開發所有周期的運營成本
視頻 QoS 解決方案架構
由於低延遲的監控警報和視頻流量高峰時所需的高度可伸縮的基礎設施等復雜性,直接的架構選擇是Delta Architecture -像Lambda和Kappa架構這樣的標准大數據架構在維護多種類型管道(流和批處理)所需的操作方面都存在缺點,並且缺乏對統一的數據工程和數據科學方法的支持。
Delta 架構優勢:
- 數據工程師可以經濟有效的方式連續開發數據管道,而不必在批處理和流式之間進行選擇,真正的批流一體。
- 數據分析師可以獲得接近實時的數據分析結果,來幫助他們做 BI 查詢。
- 數據科學家可以開發更好的機器學習模型,使用更可靠的數據集,支持版本回退,便於計算和查詢。
以下是 Delta Lake 經典的三級數據表架構。我們針對每一層級的數據表分別做了如下定義:
- Bronze 表:存儲原生數據( Raw Data ),存放的表或攝入表通常是原生格式的原始數據集( JSON, CSV or txt )。
- Silver 表:該表是在對 Bronze 表的數據進行加工處理的基礎上生成的中間表,對Bronze 表做了清洗/轉換可以作為數據科學訓練的數據。
- Gold 表:基於業務數據表,表數據已經高度集成,可以用於 BI 報表展示的數據。
在完全使用流數據計算的場景里,在 Delta Lake 中間表 DataFrames 的選擇上是在延遲 /sla 成本之間做的權衡(例如實時監控報警和基於新內容的推薦系統更新)。
QoS 體系結構是集中在數據處理的解決方案,他不是一個完整的視頻點播( VoD )解決方案,通過與一些服務主件的結合例如結合亞馬遜網管服務,避免其他的運維工作為數據分析師專注於數據和分析提供保證。
數據寫入 Delta Lake
數據准備
在 QoS 解決方案中的兩個數據源(應用程序事件和 CDN 日志)都使用了 JSON 格式。
為了讓整個組織能夠直接查詢數據,Bronze to Silver Pipeline 將所有原始數據格式轉換為Delta格式。
視頻 APP 事件日志
基於該體系結構,視頻應用程序事件被直接推送到 Kinesis Stream,然后使用模式(append)寫入到 Delta Lake。
在流處理場景下會產生大量的小文件,大量小文件的存在會嚴重影響數據系統的讀性能。Delta Lake 提供了 OPTIMIZE(optimize 性能優化)命令,可以將小文件進行合並壓縮。
時間戳和消息類型都是從 JSON 事件中提取的,以便能夠對數據進行分區,以及選擇想要處理的事件類型。將事件的單個 Kinesis 流與 Delta Lake“events” 表結合在一起,降低操作難度。
CDN 日志
CDN 日志被傳送到 S3,所以處理它們的最簡單的方法是 Databricks Auto Loader,它在新數據文件到達 S3 時增量地、高效地處理它們,而不需要任何額外的設置。
%pyspark auto_loader_df = spark.readStream.format(“cloudFiles”) \ .option(“cloudFiles.format”, “json”) \ .option(“cloudFiles.region”, region) \ .load(input_location) anonymized_df = auto_loader_df.select(‘*’, ip_ anonymizer(‘requestip’).alias(‘ip’))\ .drop(‘requestip’)\ .withColumn(“origin”, map_ip_to_location(col(‘ip’))) anonymized_df.writeStream \ .option(‘checkpointLocation’, checkpoint_location)\ .format(‘delta’) \ .table(silver_database + ‘.cdn_logs’)
創建數字大屏/虛擬網絡操作中心
流媒體公司需要盡可能實時地監控網絡性能和用戶體驗,能過跟蹤到個體層面,進行分類和打標簽,如按照地理位置、設備、網絡和歷史觀看等行為進行划分。這就要求采用網絡運營中心 (NOC) 方式,監控流媒體體驗的健康狀況,並盡早對任何問題做出反應。這使得 NOC 應有一個 Dashboard(databricks dashboards),將用戶當前的體驗與性能基線進行比較,以便產品團隊能夠快速、輕松地識別和處理任何服務異常。
NOC 的聚合表基本上是我們的 Delta 體系結構的 Gold 層—— CDN 日志和應用程序事件的 join 后的寬表。支持 SQL 查詢的 Dashboard 展示;
視頻加載時間過長、糟糕的視頻質量體驗對用戶流失率有重大影響。最重要的是,廣告商也不願意在減少觀眾參與度的廣告上花錢,這對KPI改進策略有直接影響,在這種情況下,從應用程序端收集盡可能多的信息是至關重要的,這使得分析不僅可以在視頻層面上進行,還可以在瀏覽器甚至是應用程序的類型/版本上進行。
在內容方面,APP 應用程序事件可以提供關於用戶行為和整體體驗的數據。比如有多少人暫停了視頻,實際上已經看完那一集/視頻?是什么導致了中斷:內容質量或交付問題?當然,進一步的分析可以通過將所有資源(用戶行為、cdn / isp 的性能)聯系在一起來完成,這樣不僅可以創建用戶檔案,還可以預測用戶流失。
創建(近)實時告警
當處理百萬並發用戶視頻流中產生的數據的速度、數量和多樣性時,Dashboard 的復雜性可能會使 NOC 的人工操作員很難專注於當前最重要的數據並找到根源問題。通過預警機制,可以在性能超過某些閾值時設置自動警報,這些閾值可以幫助網絡的人工操作員,並通過 Lambda 函數設置自動補救協議。例如:
•如果一個 CDN 的延遲比基線高得多(例如,如果它比基線平均延遲超過10%),啟動自動CDN流量轉移。
•如果超過[某個閾值,例如5%]的客戶報告回放錯誤,提醒產品團隊可能存在特定設備的客戶問題。
•如果某個 ISP 的瀏覽者存在高於平均水平的緩沖和像素化問題,提醒前線的客戶代表應對和解決方法。
從技術角度來看,生成實時警報需要一個能夠實時處理數據的流媒體引擎和發布-訂閱服務來推送通知
QoS 解決方案通過使用 Amazon SNS 及其與 Amazon Lambda 的集成(參見下面的 web 應用更新)或為其他消費者提供 Amazon SQS 來實現 AWS 集成微服務的最佳實踐。
def send_error_notification(row): sns_client = boto3.client(‘sns’, region) error_message = ‘Number of errors for the App has exceeded the threshold {}’.format(row[‘percentage’]) response = sns_client.publish( TopicArn=, Message= error_message, Subject=, MessageStructure=‘string’) # Structured Streaming Job getKinesisStream(“player_events”)\ .selectExpr(“type”, “app_type”)\ .groupBy(“app_type”)\ .apply(calculate_error_percentage)\ .where(“percentage > {}”.format(threshold)) \ .writeStream\ .foreach(send_error_notification)\ .start()
在基本的電子郵件用例之上,演示播放器包括三個使用 AWS AppSync 實時更新的小部件:活躍用戶數量、最受歡迎的視頻和同時觀看視頻的用戶數量。
QoS 解決方案采用了類似的方法——結構化流媒體和亞馬遜 SNS ——來更新所有的值,允許使用 AWS SQS 插入額外的消費者。當需要對大量事件進行增強和分析時,這是一種常見的模式;一次性預聚合數據,允許每個服務(消費者)在下游做出自己的決定。
未來的展望
機器學習
如果我們想要在未來能夠自動做出決策,我們必須整合機器學習算法。作為一個統一的數據平台,Databricks 使數據科學家能夠使用內置支持 Hyperopt / Horvod / AutoML 的機器學習運行時或與 MLflow 集成等功能來構建更好的數據科學產品,我們已經在我們的客戶基礎上探索了一些重要的用例重點關注對 QoS 解決方案的可能擴展。
故障點預測和補救
隨着 D2C 流媒體的用戶越來越多,即使暫時沒有服務,帶來的影響也很大。ML 可以幫助運營商從報告轉向預防,通過預測可能出現的問題,並在出現問題之前進行補救(例如,並發觀看者的激增導致自動切換到一個容量更大的 cdn)。
增加用戶流量數
不斷增長的訂閱服務的關鍵是保持你擁有的訂閱者。通過理解個人級別的服務質量,您可以將 QoS 作為流失和客戶生命周期價值模型中的一個變量添加。此外,你還可以為那些存在視頻質量問題的用戶創建群組,以便測試主動發送信息和保存優惠。
快速入門Databricks在流視頻 QoS 解決方案
我們已經尋求為大多數流媒體視頻平台環境創建一個快速開始,以嵌入這種 QoS 實時流媒體分析解決方案的方式,有了這個方案,使客戶在流媒體視頻中有個好的體驗,以保持變化無常的觀眾在你的平台上有足夠的娛樂選擇。
- 可擴展到任何用戶規模。
- 快速預警質量性能問題。
- 足夠靈活和模塊化,可以輕松地針對您的受眾和您的需求進行定制,如創建新的自動警報或使數據科學預測分析和機器學習。
首先,下載 Databricks 的流視頻 QoS 解決方案的筆記本。 有關如何將批處理和流數據統一到單個系統中的更多細節指導,請查看 Delta Architecture webinar。
后續
您已經了解了 Delta Lake 及其特性,以及如何進行性能優化,本系列還包括其他內容:
- Delta Lake 技術系列-基礎和性能
- Delta Lake 技術系列-特性
- Delta Lake 技術系列-湖倉一體
- Delta Lake 技術系列-客戶用例(Use Case)
本文為阿里雲原創內容,未經允許不得轉載。