簡介: 本文由 T3 出行大數據平台負責人楊華和資深大數據平台開發工程師王祥虎介紹 Flink、Kylin 和 Hudi 湖倉一體的大數據生態體系以及在 T3 的相關應用場景。
本文由 T3 出行大數據平台負責人楊華和資深大數據平台開發工程師王祥虎介紹 Flink、Kylin 和 Hudi 湖倉一體的大數據生態體系以及在 T3 的相關應用場景,內容包括:
- 湖倉一體的架構
- Flink/Hudi/Kylin 介紹與融合
- T3 出行結合湖倉一體的實踐
這個分享有三個部分,首先探討湖倉一體的架構,然后交流如何融合三個框架以及 T3 如何實踐湖倉一體這個架構。
一、湖倉一體的架構
數據湖和數據倉庫
既然聊湖倉一體,我們先了解一下什么是湖,什么是倉。數據湖是一個很老的概念,在近些年又被熱炒。業界對於數據湖到現在也沒有一個統一的定義。AWS 是最早在雲上推出數據湖解決方案的雲服務提供商,在這里我們便引用 AWS 對數據湖的定義:“數據湖是一個集中式的存儲庫,允許存儲任意結構的數據並且能將它應用於大數據處理,以及進行實時分析和機器學習等相關的應用場景。” 同樣我們也借助於 AWS 對數據倉庫做這樣的定義:“數據倉庫是信息的一個中央存儲庫。” 這里的信息是可對其進行分析,並且可做出更明智的決策。
這個定義還有詳細的展開。AWS 這張圖通過展示了從湖到倉的數據流向的關系,來演示數據湖與數據倉庫之間的區別和聯系。首先數據最初是存在於數據湖或是數據庫中,然后經過數據篩選和准備之后,就會流向數據倉庫來進行一些高價值的分析。這個對比表格很直觀的從數據、Schema、性價比、數據質量、用戶和分析這 6 個維度給出數據湖和倉的對比。
湖倉一體的先例
今年我們聽說阿里巴巴提及的“湖倉一體”的概念。不知道大家有沒有想過湖倉一體在業界是否有成功的先例?我個人認為是有的。今年 (2020年)9 月份,一家叫 Snowflake 的公司在紐交所上市。Snowflake 是一家做雲數倉的公司,基於雲廠商提供的基礎設施提供 SaaS 平台,面向中小企業提供數據的托管和分析服務。Snowflake 自稱自己是一家雲數倉公司,並且在 16 年的數據頂會上發表了一篇論文來介紹他們彈性數倉的架構以及一些技術的細節。
Snowflake 其實是基於雲上的對象存儲,一份存儲多份計算,並且計算與存儲分離的這樣一套架構。其實這就是 AWS 以及現在主流雲廠商所主推的這個數據湖的架構。Snowflake上市的首日,他的市值就飆升到了 700 億美元的規模。所以我個人認為 Snowflake 可以算是實行湖倉一體的一個最成功的先例。大家可以去了解一下剛談到的這篇論文。我摘出了這 5 個點來和大家做簡單的分享:
- 首先第一點,是沒有走現在傳統數倉所廣泛應用的 Shared-Nothing 這個架構,而是轉向 Shared-Data 這個架構。
- 其次,論文中重點提及的存儲和計算分離,是文中我覺得最有價值的一個觀點。他提出了統一存儲然后彈性計算的這樣一個觀念。
- 第三,數倉及服務是我認為他們商業化最成功的點。它將數倉提供了一個 SaaS 化的體驗,並且摒棄傳統上大家認為的數倉是大而重的偏見。
- 第四,高可用這一塊是提高用戶體驗和容錯的很關鍵的一個點。
- 最后,結構化延伸到半結構化這一塊已經體現當時他們能夠探索湖上通用數據的能力。
這雖然是 16 年的一篇論文,但里面的觀念並不算陳舊並且仍然值得我們去學習。后續我們會簡單介紹幾個被我們吸收並且將會去實踐的一些點,而且這些點也是 T3 出行在實現湖倉一體上很關鍵的地方。
Shared - Nothing 架構的優勢
首先,作為一個被很多傳統的數倉廣泛應用的一個架構,Shared-Nothing 還是有一些架構上的優勢:
- 第一點,Table 上的數據可以進行跨節點的水平分區,並且每個節點有自己的本地存儲。每個節點的計算資源,只關注處理每個節點自己存儲的數據。
- 所以它的另一個優點就是它的處理機制相對簡單,是數倉領域很典型的一個架構。
Shared - Nothing 架構的劣勢
這套架構其實也有一些不足的地方:
- 最大的一點就是他耦合了計算與存儲資源,
- 同時也帶來第二個問題,就是彈性不足。具體可以體現在 2 個方面。
a、集群在擴縮容的時候,數據需要被大量重分布
b、沒有辦法簡單地卸載不用的計算資源 - 第三個問題是,耦合計算和存儲資源同時也就造成了它的可用性是相當有限的。由於這些稱之為有狀態的計算,所以在失敗或者是升級的時候會顯著影響性能,並會導致服務整體不可用的狀態。
- 最后是同構的資源與異構的負載的問題。因為在數倉的場景中,我們有很多異構的負載,比如說批量的加載,查詢,報表的大規模計算分析等等。但 Shared-Nothing 架構的資源是同構的,所以這帶來兩者之間的碰撞。
Shared - Data 架構
基於這些問題,Snowflake 提出了一個叫做 Multi-Cluster Shared-Data 架構。這里我們對官方的圖做了一個簡單的微調。
- 這個架構的第一個優勢是它沒有數據孤島,是一個統一的存儲。這也就能夠將存儲從計算中進行解耦。
- 第二個優勢是基於現在的對象存儲去容納結構化和非結構化數據。
- 第三,它的集群規模是可以彈性作用的。
- 第四,上述特征同時也帶來了按需計算這個低成本優點。
接下來我們以分層的形式來 review 這個架構。從整體上來看,它的結構大致分為三個層次。
- 最底層是雲廠商提供的對象存儲,也就是用戶的存儲。
- 中間層是多用途多份的計算集群。
- 再往上是數據湖的管理服務,它存載的是一個大的 SaaS 化的平台,是對整個底層存儲以及計算集群的管理的角色。
Shared - Data 的持續高可用
接下來一個點是這個架構的高可用。這里可以簡單分解為 2 個方面。第一個是失敗容錯,第二個是在線升級。
-
首先,作為一個 SaaS 化的應用,它的容錯性是需要體現在整體架構上。這里我們同樣分層來回顧一下。
- 最底層的存儲層利用了雲廠商的對象存儲能力,他本身是一個跨中心復制以及接近無限擴容的一個機制,所以用戶基本無需關心。
- 再往上是多元的計算集群。每個計算集群是在同一個數據中心內,來保證它網絡傳輸的性能。這里就提到一個問題,有可能某一個計算集群會有節點失敗的問題。假如在一次查詢中有一個節點失敗,這些計算節點會將這個狀態返回上面的服務層。服務層在接受這個失敗后,會將這個計算再次傳遞到可用的節點中進行二次查詢。所以 Shared-Data 存儲和計算分離的這種架構上節點近乎是無狀態的計算。這種架構的一個節點失敗就不是一個非常大的問題。
- 再往上服務層對於元數據的存儲也是利用了對象存儲的這個能力。所以這個服務層基本上可以看做是無狀態的服務。
- 最上層是一個負載均衡器,可以進行服務的冗余和負載的均攤。
- 第二點在線升級這一塊主要利用兩個設計,其實這也並不是很新穎的做法。一個是在計算層和服務層的多方面的映射,然后灰度的切換。這里可以看到在計算層是分多版本的,並且這些版本之間會共享本地的 Cache。服務層的元數據管理也是在多方面共享。這其實也是架構內的子 Shared-Data,對於多版本之間的數據共享能做到再升級和平滑灰度的能力。
接下來我的同事(王祥虎)會跟大家介紹這 3 個框架以及它們是如何融合並最終支撐 T3 湖倉一體的實踐。在介紹第二個議題前他會先介紹我們的主框架,Hudi 和 Kylin 框架,然后再介紹他們三者之間是如何兩兩融合。最后再介紹T3是如何構建湖倉一體的。
二、Flink/Hudi/Kylin 介紹與融合
Hudi
首先來了解一下 Hudi 是什么。Hudi 最初是由 Uber 的工程師為了滿足他們的數據分析需求設計開發的一個數據湖框架。它於 2019 年 1 月份加入到 Apache 孵化器,並於 2020 年 5 月順利畢業,成為 Apache 的頂級項目。Hudi 的名字來源於 Hadoop Upserts Deletes and Incrementals 的縮寫。也就是說,Hudi 是一個支持插入、更新、刪除、以及增量處理的數據湖框架。除此之外,它還支持事務性 ACID 增量處理、存儲管理和時間管理。Hudi 能夠管理雲上超大規模上百 PB 的分析型數據集,對於所有的雲服務都開箱即用,非常的方便,而且已經在 Uber 內部穩定運行了接近 4 年。
下圖是 Hudi 的插件化架構。我們可以看到,Hudi 在存儲、數據處理引擎、表類型、索引類型、查詢視圖和查詢引擎方面都有比較寬松的支持。也就是說,他不與某一個組件綁定。
- 在存儲方面,Hudi 可以支持 HDFS,OSS 和 S3。
- 在數據處理引擎方面,Hudi 支持 Flink 和 Spark。Java 和 Python 客戶端已經在社區支持中。Hudi 支持兩種表,COW 和 MOR,這兩種表分別對應低延遲的查詢和快速攝入兩種場景。
- 在索引方面,Hudi 支持 Bloom 和 HBase 等 4 種索引類型。底層用了 Parquet 和 Avro 存儲數據,社區還正在做 ORC 格式的支持以及 SQL支持,相信不久的將來會跟大家見面。
Hudi 支持 3 種查詢,讀優化查詢,增量查詢和快照查詢。而在查詢引擎方面,有 Spark 、Presto、Hive 和 Impala,實際上一些其他的組件已經支持了。
下面詳細的介紹一下存儲模式和視圖。
- 第一個是 Copy On Write 模式,對應到 Hudi 的 COW 表。它是一種側重低延時的數據查詢場景的表,底層使用 Parquet 數據文件存儲數據,能夠支持快照查詢和增量查詢兩種查詢方式。在查詢引擎方面,大家可以看到上面有 5 個引擎,他們對快照查詢、增量查詢和讀優化 3 種視圖都有不同程度的支持。
- Merge On Read 表對 Copy On Write 有不同層面的互補,可以看到它側重於快速的數據攝入場景。使用 Parquet 文件來存儲具體的數據,使用行式 Avro 增量文件來存儲操作日志,類似於 HBase WAL。它支持 Hudi 所有 3 種視圖,可以看到 Hive,Spark SQL,Spark Datasource, Presto 和 Impala 對於讀優化查詢都是支持的。而 Hive, Spark SQL 只支持到了快照查詢。這種組件支持的信息大家以后可以到官網上查詢。
在出行業務中,訂單會有支付長尾的屬性。也就是說一個訂單開始之后,它的支付環節可能會拖的比較久。換言之,它可能會在這個用戶下一次出行前才進行支付(也或許會更久,甚至永遠不支付)。這種長尾屬性將會導致一個超長的業務閉環窗口,會導致我們無法准確預測數據的更新時機。如果存在多級更新的話,鏈路會比較長,更新成本也非常的高。
下圖是我們的長尾更新引發的冷數據頻繁更新示意圖。左側是業務庫,右側是有依賴關系的 3 張示意表。當業務庫有數據更新時,右側需要更新的數據可能已經歸檔到性能相對較差的設備上,增加數據更新成本。而且如果這次數據更新會引發長鏈路級聯更新的話,這種慢速的 I/O 還會被進一步放大。
數據的可靠性也是數據 ETL 中不可避免的問題。可能由於機器故障或者計算邏輯導致加工處理的數據失真或者完全不對,就會給運營的決策造成很大的影響。數字延遲性方面,在基於 Hive 構件的傳統架構中,由於 Hive 缺少索引機制,所以數據更新大都會導致數據分區重寫,且沒有辦法原地刪除。其次小文件問題會增加 NameNode 存儲和查詢的負擔,拖慢進程,在一定程度上增加數據延遲性。
Kylin 框架
我們再來介紹一下這個 Kylin 框架。相比較 Hudi,大家應該會對 Kylin 相對熟悉一些,它是一個開源的分布式分析型數據倉庫,能夠提供 Hadoop/Spark SQL 之上的數據查詢窗口。最初是由 eBay 開放並貢獻到開源社區,能夠在亞秒內查詢巨大的表。它的秘訣其實就是做預計算,針對一個星型拓撲結構數據立方體,預算多個維度組合的度量把結果寫出到輸出表,對外暴露查詢接口實現實時查詢,也就是用空間來換取存取時間。
Kylin 在今年的 9 月份發布了 4.0 alpha 版本,這是在 Kylin3 之后一個重大架構升級。使用 Parquet 代替 Hbase 存儲,從而提升了文件的掃描性能,也減輕甚至消除了 Hbase 的維護負擔。Kylin4 重新實現 Spark 構建引擎和查詢引擎,使得計算和存儲分離,也更加適用雲原生的技術趨勢。
Flink/Hudi/Kylin 框架之間的融合
伴隨 Kylin3.1 發布,Kylin 與 Flink 就融合已經完成。這個特性是在 2019 年完成的,Kylin 與 Flink 的集成開始於去年 1 月,通過 Flink Batch 實現。關於 Hudi 融合,可以說 Kylin 和 Hudi 天生就是兼容的,因為 Hudi 可以將自己暴露成一張 Hive 表,用戶可以像讀取 Hive 一樣使用 Hudi 的數據,這樣對Kylin會非常友好。因為 Kylin 可以把 Hudi 當成一張 Hive 表無縫使用數據。Hudi 和 Flink 融合這個特性是我今年對社區的主要貢獻。這個兩張截圖對應 Hudi 和 Flink 融合路上的2個里程碑式的PR。
- 第一個 Hudi client 支持多引擎,將 Hudi 與 Spark 解耦,讓 Hudi 支持多引擎成為可能。
- 第二個是 Flink 客戶端基本實現貢獻到社區,讓 Hudi 可以真正意義上寫入 Flink 數據表。這 2 個改動非常大,加在一起已經超過了 1 萬行的代碼,也可以說是今年 Hudi 社區比較亮眼的一個特性。
Hudi 和 Flink 的融合過程
下面來詳細介紹下 Hudi 和 Flink 融合過程。Hudi 原本只支持 Spark 引擎,所以第一步是將 Hudi 與 Spark 解耦之后再去集成我們想要的引擎。
解耦的難點在於 Hudi 最初沒有考慮多引擎的支持,所以從數據源讀取數據到最終將數據寫出到 Hudi 表,RDD 無處不在。連普通的工具類都會使用 RDD 作為基本的操作單元。與 Spark 解耦,我們評估到他的改動非常的大。其次是 Flink 與 Spark 核心抽象上的差異。Spark 認為數據是有限的數據集,而 Flink 認為數據是無界的,是一種數據流。這種抽象上的差異導致我們很難統一出一個通用的抽象。
這次改動對於 Hudi 來說是傷筋動骨的,因此我們決定要優先保證原版 Hudi 的功能和性能,當然也犧牲了部分 Flink Stream API。讓 Flink 來操作 list,而用Spark 操作 RDD。這樣就可以抽取一個泛型出來形成一個統一的抽象層。
抽象原則:
- 統一使用泛型 I、K、O 代替。
- 去 Spark 化,抽象層 API 都是引擎無關的,難以在抽象層實現的,我們會把它改為抽象方法下推到 Spark 子類實現。
- 不影響原版,抽象層盡量的減少改動,以保證固定的功能性。
- 引入 HoodieEngineContext 代替 JavaSparkContext, 提供運行時的上下文。
下面說 Flink Client DAG,這里主要分了 5 部分,
- 第一部分是 Kafka Streaming Source,主要用來接收Kafka數據並轉換成 List。
- 第二個是 InstantGeneratorOperator,一個 Flink 算子, 用來生成全局唯一的 instant。
- 第三是 KeyBy 分區操作,根據 partitionPath 分區避免多個子任務將數據寫入同一個分區造成沖突。
- 第四個是 WriteProcessOperator,這也是我們自定義的一個算子。這個算子是寫操作實際發生的地方。
- 第五個是 CommitSink,他會接受上游 WriteProcessOperator 發來的數據,根據上游數據判斷是否提交事務。
下面是 Flink 更新的代碼示例。左側是原版里面 HoodieWriteClient 簡化的版本,
可以看到 insert 函數的入參是 RDD,返回值也是 RDD。右側抽象之后的 abstract 可以看到它的入參變成了泛型I,返回值變成了 O,有興趣的話大家可以去了解一下。
下面是我們對 Flink 如何融合的另外一個想法,就是希望做出一個 streaming source,使用 Flink 構建一個完整的從 Hudi 表讀數據,再寫出到 Hudi 表的 ETL 管道。
然后是我們初步的設想。左側灰色的圖里面有 5 列的 Hudi 元數據。最左側是 hoodie_commit_time 事務列表。每一個 hoodie_commit_time 對應一個事務,每一個事務對應一批的數據。每一批數據中的每一條記錄都會有一個提交的序列號,就是第 2 列 hoodie_commit_seqno 序列號。hoodie_commit_time 和 hoodie_commit_seqno 這種映射關系跟 Kafka 中的分區和 offset 的這種映射關系非常類似。后期我們可能會基於這種特點實現一個 Hoodie Streaming Source。
基於這 3 個框架之間的融合關系,我們發現分別用於計算、分析、存儲的這 3 個引擎之間是相互兼容的。並且他們能夠支持湖倉一體,向雲原生體系靠攏。
三、T3 出行結構湖倉一體的實踐
最后我們來看一看 T3 出行是如何構建湖倉一體的。這是我們 T3 出行車聯網的架構,可以看到是從底向上,從基礎支持到上層不停的賦能,並與車企的信息系統、國家信息平台做交互。作為一家車聯網驅動的出行公司,我們收集到了人、車、路等相關的數據,每一種數據都有它自己的應用場景, 數據之間並不孤立,相互賦能,共同支持 T3 智慧出行。
這是我們的存儲和計算分離的數據庫架構,整個架構分為了兩層,一層是計算層,一層是存儲層。
- 計算層我們用到了 Flink、Spark、Kylin 和 Presto 並且搭配 ES 做任務調度。數據分析和展示方面用到了達芬奇和 Zeppelin。
- 在存儲層,我們使用了阿里雲 OSS 並搭配 HDFS 做數據存儲。數據格式方面使用 Hudi 作為主要的存儲格式,並配合 Parquet、ORC 和 Json 文件。在計算和存儲之前,我們加了一個 Alluxio 來加速提升數據處理性能。資源管理方面我用到了 Yarn,在后期時機成熟的時候也會轉向 K8s。
在當前存儲計算分離的趨勢下,我們也是以湖存儲為核心,在它周圍構建了湖加速湖計算、OLAP 分析、交互式查詢、可視化等等一整套的大數據生態體系。
T3對 Hudi 的應用場景
下面是我們 T3 內部對 Hudi 的幾個應用場景。
- 一個是近實時的流數據管道。我們可以從左側通過 Log、MySQL 或者直接讀取業務數據的 Kafka,把數據導入到數據管道中,再使用 Flink 或者原版的 DeltaStreamer 將流式數據輸入到列表中。
近實時的流式數據處理的 Flink UI 界面上可以看到之前介紹的 DAG 的幾個算子都在里面,比如 source、instant_generator 等。
- 另一個是近實時的數據分析場景。我們使用 Hive、Spark 或 Presto 查詢數據,並最終用達芬奇或者 Zeppelin 做最終的數據報表。
這是我們用 Hudi 構建的增量數據管道。最左側 CDC 數據捕獲之后要更新到后面的一系列的表。有了 Hudi 之后,因為 Hudi 支持索引和增量數據處理,我們只需要去更新需要更新的數據就可以了,不需要再像以前那樣去更新整個分區或者更新整個表。
- 最后的一個場景是將前面介紹的用 Flink 將線上或者業務數據訂閱 ETL 到 Hudi 表中供機器學習使用。但是機器學習是需要有數據基礎的,所以我們利用 Hudi 將線上的數據增量發布到線下環境,進行模型訓練或者調參。之后再將模型發布到線上為我們的業務提供服務。