引言
近些年,企業對數據服務實時化服務的需求日益增多。本文整理了常見實時數據組件的性能特點和適用場景,介紹了美團如何通過 Flink 引擎構建實時數據倉庫,從而提供高效、穩健的實時數據服務。此前我們美團技術博客發布過一篇文章《流計算框架 Flink 與 Storm 的性能對比》,對 Flink 和 Storm 倆個引擎的計算性能進行了比較。本文主要闡述使用 Flink 在實際數據生產上的經驗。
實時平台初期架構
在實時數據系統建設初期,由於對實時數據的需求較少,形成不了完整的數據體系。我們采用的是“一路到底”的開發模式:通過在實時計算平台上部署 Storm 作業處理實時數據隊列來提取數據指標,直接推送到實時應用服務中。

但是,隨着產品和業務人員對實時數據需求的不斷增多,新的挑戰也隨之發生。
- 數據指標越來越多,“煙囪式”的開發導致代碼耦合問題嚴重。
- 需求越來越多,有的需要明細數據,有的需要 OLAP 分析。單一的開發模式難以應付多種需求。
- 缺少完善的監控系統,無法在對業務產生影響之前發現並修復問題。
實時數據倉庫的構建
為解決以上問題,我們根據生產離線數據的經驗,選擇使用分層設計方案來建設實時數據倉庫,其分層架構如下圖所示:

該方案由以下四層構成:
- ODS 層:Binlog 和流量日志以及各業務實時隊列。
- 數據明細層:業務領域整合提取事實數據,離線全量和實時變化數據構建實時維度數據。
- 數據匯總層:使用寬表模型對明細數據補充維度數據,對共性指標進行匯總。
- App 層:為了具體需求而構建的應用層,通過 RPC 框架對外提供服務。
通過多層設計我們可以將處理數據的流程沉淀在各層完成。比如在數據明細層統一完成數據的過濾、清洗、規范、脫敏流程;在數據匯總層加工共性的多維指標匯總數據。提高了代碼的復用率和整體生產效率。同時各層級處理的任務類型相似,可以采用統一的技術方案優化性能,使數倉技術架構更簡潔。
技術選型
1.存儲引擎的調研
實時數倉在設計中不同於離線數倉在各層級使用同種儲存方案,比如都存儲在 Hive 、DB 中的策略。首先對中間過程的表,采用將結構化的數據通過消息隊列存儲和高速 KV 存儲混合的方案。實時計算引擎可以通過監聽消息消費消息隊列內的數據,進行實時計算。而在高速 KV 存儲上的數據則可以用於快速關聯計算,比如維度數據。 其次在應用層上,針對數據使用特點配置存儲方案直接寫入。避免了離線數倉應用層同步數據流程帶來的處理延遲。 為了解決不同類型的實時數據需求,合理的設計各層級存儲方案,我們調研了美團內部使用比較廣泛的幾種存儲方案。
方案 | 優勢 | 劣勢 |
---|---|---|
MySQL | 1. 具有完備的事務功能,可以對數據進行更新。2. 支持 SQL,開發成本低。 | 1. 橫向擴展成本大,存儲容易成為瓶頸; 2. 實時數據的更新和查詢頻率都很高,線上單個實時應用請求就有 1000+ QPS;使用 MySQL 成本太高。 |
Elasticsearch | 1. 吞吐量大,單個機器可以支持 2500+ QPS,並且集群可以快速橫向擴展。2. Term 查詢時響應速度很快,單個機器在 2000+ QPS時,查詢延遲在 20 ms以內。 | 1. 沒有原生的 SQL 支持,查詢 DSL 有一定的學習門檻;2. 進行聚合運算時性能下降明顯。 |
Druid | 1. 支持超大數據量,通過 Kafka 獲取實時數據時,單個作業可支持 6W+ QPS;2. 可以在數據導入時通過預計算對數據進行匯總,減少的數據存儲。提高了實際處理數據的效率;3. 有很多開源 OLAP 分析框架。實現如 Superset。 | 1. 預聚合導致無法支持明細的查詢;2. 無法支持 Join 操作;3. Append-only 不支持數據的修改。只能以 Segment 為單位進行替換。 |
Cellar | 1. 支持超大數據量,采用內存加分布式存儲的架構,存儲性價比很高;2. 吞吐性能好,經測試處理 3W+ QPS 讀寫請求時,平均延遲在 1ms左右;通過異步讀寫線上最高支持 10W+ QPS。 | 1. 接口僅支持 KV,Map,List 以及原子加減等;2. 單個 Key 值不得超過 1KB ,而 Value 的值超過 100KB 時則性能下降明顯。 |
根據不同業務場景,實時數倉各個模型層次使用的存儲方案大致如下:

- 數據明細層 對於維度數據部分場景下關聯的頻率可達 10w+ TPS,我們選擇 Cellar(美團內部存儲系統) 作為存儲,封裝維度服務為實時數倉提供維度數據。
- 數據匯總層 對於通用的匯總指標,需要進行歷史數據關聯的數據,采用和維度數據一樣的方案通過 Cellar 作為存儲,用服務的方式進行關聯操作。
- 數據應用層 應用層設計相對復雜,再對比了幾種不同存儲方案后。我們制定了以數據讀寫頻率 1000 QPS 為分界的判斷依據。對於讀寫平均頻率高於 1000 QPS 但查詢不太復雜的實時應用,比如商戶實時的經營數據。采用 Cellar 為存儲,提供實時數據服務。對於一些查詢復雜的和需要明細列表的應用,使用 Elasticsearch 作為存儲則更為合適。而一些查詢頻率低,比如一些內部運營的數據。 Druid 通過實時處理消息構建索引,並通過預聚合可以快速的提供實時數據 OLAP 分析功能。對於一些歷史版本的數據產品進行實時化改造時,也可以使用 MySQL 存儲便於產品迭代。
2.計算引擎的調研
在實時平台建設初期我們使用 Storm 引擎來進行實時數據處理。Storm 引擎雖然在靈活性和性能上都表現不錯。但是由於 API 過於底層,在數據開發過程中需要對一些常用的數據操作進行功能實現。比如表關聯、聚合等,產生了很多額外的開發工作,不僅引入了很多外部依賴比如緩存,而且實際使用時性能也不是很理想。同時 Storm 內的數據對象 Tuple 支持的功能也很簡單,通常需要將其轉換為 Java 對象來處理。對於這種基於代碼定義的數據模型,通常我們只能通過文檔來進行維護。不僅需要額外的維護工作,同時在增改字段時也很麻煩。綜合來看使用 Storm 引擎構建實時數倉難度較大。我們需要一個新的實時處理方案,要能夠實現:
- 提供高級 API,支持常見的數據操作比如關聯聚合,最好是能支持 SQL。
- 具有狀態管理和自動支持久化方案,減少對存儲的依賴。
- 便於接入元數據服務,避免通過代碼管理數據結構。
- 處理性能至少要和 Storm 一致。
我們對主要的實時計算引擎進行了技術調研。總結了各類引擎特性如下表所示:
項目/引擎 | Storm | Flink | spark-treaming |
---|---|---|---|
API | 靈活的底層 API 和具有事務保證的 Trident API | 流 API 和更加適合數據開發的 Table API 和 Flink SQL 支持 | 流 API 和 Structured-Streaming API 同時也可以使用更適合數據開發的 Spark SQL |
容錯機制 | ACK 機制 | State 分布式快照保存點 | RDD 保存點 |
狀態管理 | Trident State狀態管理 | Key State 和 Operator State兩種 State 可以使用,支持多種持久化方案 | 有 UpdateStateByKey 等 API 進行帶狀態的變更,支持多種持久化方案 |
處理模式 | 單條流式處理 | 單條流式處理 | Mic batch處理 |
延遲 | 毫秒級 | 毫秒級 | 秒級 |
語義保障 | At Least Once,Exactly Once | Exactly Once,At Least Once | At Least Once |
從調研結果來看,Flink 和 Spark Streaming 的 API 、容錯機制與狀態持久化機制都可以解決一部分我們目前使用 Storm 中遇到的問題。但 Flink 在數據延遲上和 Storm 更接近,對現有應用影響最小。而且在公司內部的測試中 Flink 的吞吐性能對比 Storm 有十倍左右提升。綜合考量我們選定 Flink 引擎作為實時數倉的開發引擎。
更加引起我們注意的是,Flink 的 Table 抽象和 SQL 支持。雖然使用 Strom 引擎也可以處理結構化數據。但畢竟依舊是基於消息的處理 API ,在代碼層層面上不能完全享受操作結構化數據的便利。而 Flink 不僅支持了大量常用的 SQL 語句,基本覆蓋了我們的開發場景。而且 Flink 的 Table 可以通過 TableSchema 進行管理,支持豐富的數據類型和數據結構以及數據源。可以很容易的和現有的元數據管理系統或配置管理系統結合。通過下圖我們可以清晰的看出 Storm 和 Flink 在開發統過程中的區別。

在使用 Storm 開發時處理邏輯與實現需要固化在 Bolt 的代碼。Flink 則可以通過 SQL 進行開發,代碼可讀性更高,邏輯的實現由開源框架來保證可靠高效,對特定場景的優化只要修改 Flink SQL 優化器功能實現即可,而不影響邏輯代碼。使我們可以把更多的精力放到到數據開發中,而不是邏輯的實現。當需要離線數據和實時數據口徑統一的場景時,我們只需對離線口徑的 SQL 腳本稍加改造即可,極大地提高了開發效率。同時對比圖中 Flink 和 Storm 使用的數據模型,Storm 需要通過一個 Java 的 Class 去定義數據結構,Flink Table 則可以通過元數據來定義。可以很好的和數據開發中的元數據,數據治理等系統結合,提高開發效率。
Flink使用心得
在利用 Flink-Table 構建實時數據倉庫過程中。我們針對一些構建數據倉庫的常用操作,比如數據指標的維度擴充,數據按主題關聯,以及數據的聚合運算通過 Flink 來實現總結了一些使用心得。
1.維度擴充
數據指標的維度擴充,我們采用的是通過維度服務獲取維度信息。雖然基於 Cellar 的維度服務通常的響應延遲可以在 1ms 以下。但是為了進一步優化 Flink 的吞吐,我們對維度數據的關聯全部采用了異步接口訪問的方式,避免了使用 RPC 調用影響數據吞吐。
對於一些數據量很大的流,比如流量日志數據量在 10W 條/秒這個量級。在關聯 UDF 的時候內置了緩存機制,可以根據命中率和時間對緩存進行淘汰,配合用關聯的 Key 值進行分區,顯著減少了對外部服務的請求次數,有效的減少了處理延遲和對外部系統的壓力。
2.數據關聯
數據主題合並,本質上就是多個數據源的關聯,簡單的來說就是 Join 操作。Flink 的 Table 是建立在無限流這個概念上的。在進行 Join 操作時並不能像離線數據一樣對兩個完整的表進行關聯。采用的是在窗口時間內對數據進行關聯的方案,相當於從兩個數據流中各自截取一段時間的數據進行 Join 操作。有點類似於離線數據通過限制分區來進行關聯。同時需要注意 Flink 關聯表時必須有至少一個“等於”關聯條件,因為等號兩邊的值會用來分組。
由於 Flink 會緩存窗口內的全部數據來進行關聯,緩存的數據量和關聯的窗口大小成正比。因此 Flink 的關聯查詢,更適合處理一些可以通過業務規則限制關聯數據時間范圍的場景。比如關聯下單用戶購買之前 30 分鍾內的瀏覽日志。過大的窗口不僅會消耗更多的內存,同時會產生更大的 Checkpoint ,導致吞吐下降或 Checkpoint 超時。在實際生產中可以使用 RocksDB 和啟用增量保存點模式,減少 Checkpoint 過程對吞吐產生影響。對於一些需要關聯窗口期很長的場景,比如關聯的數據可能是幾天以前的數據。對於這些歷史數據,我們可以將其理解為是一種已經固定不變的"維度"。可以將需要被關聯的歷史數據采用和維度數據一致的處理方法:"緩存 + 離線"數據方式存儲,用接口的方式進行關聯。另外需要注意 Flink 對多表關聯是直接順序鏈接的,因此需要注意先進行結果集小的關聯。
3.聚合運算
使用聚合運算時,Flink 對常見的聚合運算如求和、極值、均值等都有支持。美中不足的是對於 Distinct 的支持,Flink-1.6 之前的采用的方案是通過先對去重字段進行分組再聚合實現。對於需要對多個字段去重聚合的場景,只能分別計算再進行關聯處理效率很低。為此我們開發了自定義的 UDAF,實現了 MapView 精確去重、BloomFilter 非精確去重、 HyperLogLog 超低內存去重方案應對各種實時去重場景。但是在使用自定義的 UDAF 時,需要注意 RocksDBStateBackend 模式對於較大的 Key 進行更新操作時序列化和反序列化耗時很多。可以考慮使用 FsStateBackend 模式替代。另外要注意的一點 Flink 框架在計算比如 Rank 這樣的分析函數時,需要緩存每個分組窗口下的全部數據才能進行排序,會消耗大量內存。建議在這種場景下優先轉換為 TopN 的邏輯,看是否可以解決需求。
下圖展示一個完整的使用 Flink 引擎生產一張實時數據表的過程:

實時數倉成果
通過使用實時數倉代替原有流程,我們將數據生產中的各個流程抽象到實時數倉的各層當中。實現了全部實時數據應用的數據源統一,保證了應用數據指標、維度的口徑的一致。在幾次數據口徑發生修改的場景中,我們通過對倉庫明細和匯總進行改造,在完全不用修改應用代碼的情況下就完成全部應用的口徑切換。在開發過程中通過嚴格的把控數據分層、主題域划分、內容組織標准規范和命名規則。使數據開發的鏈路更為清晰,減少了代碼的耦合。再配合上使用 Flink SQL 進行開發,代碼加簡潔。單個作業的代碼量從平均 300+ 行的 JAVA 代碼 ,縮減到幾十行的 SQL 腳本。項目的開發時長也大幅減短,一人日開發多個實時數據指標情況也不少見。
除此以外我們通過針對數倉各層級工作內容的不同特點,可以進行針對性的性能優化和參數配置。比如 ODS 層主要進行數據的解析、過濾等操作,不需要 RPC 調用和聚合運算。 我們針對數據解析過程進行優化,減少不必要的 JSON 字段解析,並使用更高效的 JSON 包。在資源分配上,單個 CPU 只配置 1GB 的內存即可滿需求。而匯總層主要則主要進行聚合與關聯運算,可以通過優化聚合算法、內外存共同運算來提高性能、減少成本。資源配置上也會分配更多的內存,避免內存溢出。通過這些優化手段,雖然相比原有流程實時數倉的生產鏈路更長,但數據延遲並沒有明顯增加。同時實時數據應用所使用的計算資源也有明顯減少。
展望
我們的目標是將實時倉庫建設成可以和離線倉庫數據准確性,一致性媲美的數據系統。為商家,業務人員以及美團用戶提供及時可靠的數據服務。同時作為到餐實時數據的統一出口,為集團其他業務部門助力。未來我們將更加關注在數據可靠性和實時數據指標管理。建立完善的數據監控,數據血緣檢測,交叉檢查機制。及時對異常數據或數據延遲進行監控和預警。同時優化開發流程,降低開發實時數據學習成本。讓更多有實時數據需求的人,可以自己動手解決問題。
參考文獻
關於作者
偉倫,美團到店餐飲技術部實時數據負責人,2017年加入美團,長期從事數據平台、實時數據計算、數據架構方面的開發工作。在使用 Flink 進行實時數據生產和提高生產效率上,有一些心得和產出。同時也積極推廣 Flink 在實時數據處理中的實戰經驗。
招聘信息
對數據工程和將數據通過服務業務釋放價值感興趣的同學,可以發送簡歷到 huangweilun@meituan.com。我們在實時數據倉庫、實時數據治理、實時數據產品開發框架、面向銷售和商家側的數據型創新產品層面,都有很多未知但有意義的領域等你來開拓。
發現文章有錯誤、對內容有疑問,都可以關注美團技術團隊微信公眾號(meituantech),在后台給我們留言。我們每周會挑選出一位熱心小伙伴,送上一份精美的小禮品。快來掃碼關注我們吧!