貝殼基於 Flink 的實時計算演進之路


簡介: 貝殼找房在實時計算之路上的平台建設以及實時數倉應用。

摘要:貝殼找房大數據平台實時計算負責人劉力雲帶來的分享內容是貝殼找房的實時計算演進之路,內容如下:

  1. 發展歷程
  2. 平台建設
  3. 實時數倉及其應用場景
  4. 事件驅動場景
  5. 未來規划

 

GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~

 

一、發展歷程

首先是平台的發展歷程。最早是因為業務方在實時計算方面有比較多的業務場景,包括業務方自研的實時任務,需要自行開發、部署及維護,我們的大數據部門也會承接客戶大數據的實時開發需求。

這些看起來都是一些煙囪式的開發架構(即每個業務線之間由不同的開發團隊獨立建設,技術棧不同,互不聯系),缺乏統一的任務管控,也很難保留開發過程中積累的技術沉淀。因此,我們在 18 年時上線了基於 Spark Streaming 的實時計算平台,統一部署管理實時計算任務。之后我們又在此基礎上提供了任務開發功能 - 標准化的 SQL 語言(SQL 1.0),以提高數據開發效率。

image.png

隨着我們承接的任務越來越多,我們也發現了 Spark Streaming 的一些使用問題,主要是其 Checkpoint 是同步的,有時會造成比較大的延遲。此外,Kafka 消費的 Offset 數據存在 Checkpoint,很難做到任務細粒度的監控,比如消費狀態的獲取,於是我們開始轉向 Flink。

19 年,我們的平台開始支持 Flink 任務,並且很快提供了基於 Flink 1.8 的 SQL 2.0 功能,包括 DDL 定義和維表關聯。接下來,在 SQL 2.0 的基礎上,我們開始了實時數倉的建設。

今年初,在收集了業務方的需求場景后,我們認為在實時事件處理方面需求明確,而且目前的實現也存在較多的弊端,因此我們開始着手事件處理平台的開發。今年發布的 Flink 1.11 在 SQL 方面有很大的提升,我們在其基礎上正在開發一套統一的 SQL(3.0)。

image.png

目前平台支持的部門涵蓋了貝殼絕大部分的業務方,支持各種場景,包括人店相關的房源、客源、經紀人、風控以及運營等。

image.png

目前平台支持的項目有 30 多個。在 SQL2.0 后,平台上的任務數有明顯增長,達到 800 多個。由於貝殼所有的流量數據、用戶行為分析、以及數倉的建設都是通過平台來構建的,所以數據量很大,每天處理的消息達 2500 億條,單任務的消息吞吐量峰值達 3 百萬。

image.png

這是我們平台任務的增長情況,可以明顯看到 19 年 10 月 SQL 2.0 上線且支持實時數倉開發后,任務增長勢頭顯著。

二、平台建設

image.png

平台的功能概覽包括四個方面:

  • 支持任務托管的基本能力,包括任務的編輯發布、版本管理、監控報警等;
  • 支持多種語言的實時任務,包括對貝殼算法相關的 Python 實時任務的良好支持;
  • 根據業務場景不同,支持多種業務類型,如自定義任務、模板任務及場景任務(SQL 任務),內部通用配置化任務,如分流合並操作。目前 SQL 任務在平台占比較高,我們的目標是 80%;
  • 支持公共隊列(針對較數據量小的需求),對於數據量大的需求,要有穩定的資源保證,我們可以提供專有隊列,運行更為可靠。

image.png

平台的整體架構與其它公司的差不多。底層是計算和存儲層,計算支持 Flink 和 Spark,主要包括消息隊列和各種 OLAP 存儲,同時也支持 MySQL,Hive 也可以做到實時落地,維表支持 Redis,HBase 存儲。ClickHouse 是目前主要的實時 OLAP 存儲,由於 Doris 支持 update,同時對關聯查詢的支持也比較好,我們也在嘗試 Doris 存儲。

引擎層主要封裝的是 SQL 引擎、DataStream 的通用性操作。在事件處理方面,對 Flink 的 CEP,包括對其它普通規則也做了較好的封裝。

開發管理層提供了各種任務的開發、監控和資源管理。

平台之上,也是提供了對 ETL、BI、推薦、監控、風控等各種業務場景的支持。

image.png

這是平台任務生命周期的管理。可以看到,在啟動后會新建實例,從集群拿到運行狀態后會判斷是否正常運行。“是”則轉成運行中狀態。在運行過程中會對任務做延遲和心跳的監控;如果說任務發生了異常,並且在配置中設置了延遲或心跳時長的閾值,則會嘗試進行重啟。用戶可以在啟動任務時設置重啟次數,當超過該值時,則認為重啟失敗,將發送告警給任務負責人。

image.png

這是平台監控報警的架構。我們在 Spark 引入了 sdk 依賴,在用戶開發任務時用代碼顯示添加就可以監聽系統關心的指標。Flink 任務支持自定義 Reporter 的 metrics 的獲取。我們還支持 java agent 的依賴注入,通過依賴注入我們可以獲取實時任務的制定信息。在 Hermes 平台,我們可以拿到這些監控信息,來支持延時報警、心跳報警、及數據血緣基礎上的流量分析,后續的有狀態任務恢復也依賴這些監控指標。監控日志落入存儲(InfluxDB)之后可以進行可視化處理,方便的查看歷史運行狀態。

image.png

這是平台監控查看頁面,分別顯示了數據讀入、寫出、及延時的情況。

三、實時數倉

我們的實時數倉目前具備以下幾方面能力:首先是完善的元數據管理,包括連接管理和表管理;數倉開發人員共同構建了數據分層架構,包括 4 個分層:

  • 在實時側,分層越少越好,否則中間環節越多,出問題的概率越大;
  • 在 SQL 層面,支持標准的SQL語法,維表關聯,提供圖形化的SQL開發環境。另外還支持豐富的內置函數,並逐步完善支持用戶自定義函數(UDF)的開發;
  • 數據血緣方面,平台支持圖形化展示和完善的鏈路分析,而且能實時看到數據流的運行情況並對異常進行標示;
  • 最后是多源支持,對公司內部用到的各種存儲做到了較好的支持。

image.png

這是簡易的實時數倉架構圖,總體來說是屬於 Lambda 架構,包括實時流和離線流,以及離線流對實時流數據覆蓋的修復。從用戶行為日志、后端服務器日志及業務數據庫采集來的消息流,匯入並通過 ODS(Opertional Data Source)層再到 DW(Data Warehouse)層,我們支持 ODS 和 DW 層對維度進行擴充,關聯維表。

目前 DWD(Data Warehouse Detail)層的數據直接送入 ClickHouse,ClickHouse 現在是我們 OLAP 引擎的一個主力存儲。從 DWD 到 ClickHouse 的存儲只滿足了部分業務場景,還存在一些問題。比如我們需要做數據匯總,那么我們現在 DWS(Data Warehouse Service)層在這方面還稍微欠缺。目前明細數據進入了 ClickHouse,我們首先對那些應該匯總的數據存了明細,這樣會導致存儲量比較大,查詢效率較低。后續我們會考慮引入 Doris,因為它可以在實時計算側做實時聚合,依托 Doris 對 Update 的支持,就可以完善 DWS 功能。

image.png

這里展示的是我們的 SQL 編輯器。可以看到左邊是正在編輯的 SQL,我們支持 Flink 執行計划的查看、任務調試。右側一列可以定義源表、維表、輸出表。可以在自定義的數據源基礎上定義流表,並自動生產 DDL。同時,對於某些自動生成 DDL 難以支持的場景,用戶可以在左邊的編輯區域自行編寫 DDL。

image.png

任務調式分為手動和自動兩種方式。手動方式需准備樣例數據,拷貝到開發界面;自動方式則會從 SQL 任務的上游獲取樣例數據。元數據信息(kafka、HBase、ClickHouse 等)是動態獲得的,元信息和樣例共同生成的 DebugSQL 去調用 SQL 引擎的公共服務。SQL 引擎得到樣例數據后,比如,如果有關聯維表的操作,則會關聯線上維表,在 SQL 引擎中執行調試,將結果送給 UI 端進行展示。

image.png

這是一個完整的調試界面,可以看到左側是自動獲取的樣例數據,右側是下游的輸出。

image.png

根據元數據的定義及上報的指標等監控數據,我們可以生成一個實時數據血緣鏈路。圖中的箭頭展示了數據流轉的健康狀況,未來會對血緣鏈路上的數據監控做得更細致。數據血緣滿足了 4 個方面的需求:溯源分析、問題排查、數據差異分析、提升用戶體驗。在血緣鏈路上還可以進行比較復雜的異常預警,例如,數據源字段的變更對下游的影響。

image.png

這是我們 SQL2.0 引擎的大致架構,通過 Antlr4 擴展標准 SQL 的語法,從而支持 Flink 的各種源,維表和下游存儲表的定義。通過 SqljobParser 內置的 SqlStmtParser 生成 SqlContext,在邏輯計划(Logical Plan)中做解析。如果遇到維表,則經過一系列維表關聯的流程。上圖中下半部分是底層 API 架構。

image.png

這是平台 DDL 樣例。對於源表(Source),支持 Kafka,未來在新版本的 Flink 之上將可以支持更多種源。對於維表(Dim),支持 HBase、Redis、MySQL。數據存儲表(Sink)支持圖中所列五種。表格下面的是 DDL 定義的語法規則,右邊是一些表定義的樣例,分別是 Kafka 源表、維表和輸出表(輸出到控制台)。

image.png

再看我們的維表關聯,從 SQL 引擎結構可以看出,輸入的 SQL 進行解析,當有維表關聯時(包含 join 字段),我們會從語法層面做轉換。我們在表的層面定義了流和維關聯之后的表的形態,左下角是其生成過程。關聯維表、流維轉換、用異步 IO 獲取數據等過程不在這里細說。

image.png

隨着 Flink 社區新版本的發布,在 SQL 方面的支持越來越強,我們目前正在做基於 Flink1.11 的新版 SQL 引擎,也會將之前的 SQL 引擎統一。因為 Flink1.11 支持DDL,所以這部分我們不會再做,而是直接使用其新特性:

  • 解析模塊(Parse Model)將用戶原始的 SQL 解析成內部的執行計划,完全依賴於 Flink SQL。Connector Model 完成目前 Flink 尚未支持的 Connector 開發。
  • Format Model 實現數據源字段的序列化和反序列化。
  • 執行模塊(Execute Model)基於 Flink1.11 SQL API 執行解析后的執行計划。
  • UDF 模塊是專門處理 UDF 的解析,如參數調用的合法驗證、權限驗證、細致的數據權限限制。
  • SDK Model 是對外提供的標准化服務,如 SQL 文本開發的驗證,debug 功能等。

![image.png](https://ucc.alicdn.com/pic/developer-ecology/9542a09e686042e19564d3839194d526.png

這是實時數倉的一個落地場景:交易的實時大屏,也是我們第一個落地的典型業務場景。我們支持各種交易實時指標,用戶可以通過實時查詢 ClickHouse 得到交易數據的各種圖表展示。

image.png

客戶實時熱力圖是我們正在跟業務方溝通的一個需求場景,能實時獲取用戶線上的行為,使經紀人對客戶行為有一個比較全面的實時掌控,促進客戶維護的轉化率。另一方面,也使客戶更方便地了解房源熱度狀態,促使用戶做出購買決策。

四、事件驅動

image.png

先了解一下事件驅動型和數據分析型的區別:

  • 事件驅動是根據事件流中的事件實時觸發外部計算和外部狀態的更新,主要關注實時事件觸發的外部變化,重在單獨事件以及外部動作的觸發。
  • 數據分析型主要是從原始數據中提取有價值的信息,重在分析。

image.png

在我們跟業務方的溝通過程中,我們發現很多場景中他們希望實時獲取用戶的行為。比較典型的是風控場景,根據用戶線上的行為模式判斷其是否觸發風控規則。此外,我們的實時運營,根據用戶線上行為給用戶進行積分的增加及信息推送。搜索推薦也是我們非常關心的,即用戶在搜索之前的實時行為。綜合這些,我們提取出三方面問題:

  • 一是用戶行為事件缺乏統一的抽象和管理,開發效率低,周期長,各部門存在重復建設;
  • 二是規則邏輯與業務系統是耦合的,難以實現靈活的變化,對於復雜的規則或場景,業務方缺乏相關的技能和知識儲備,如對 CEP 的支持;
  • 第三是缺乏統一的下游動作觸發的配置。

基於以上三個痛點,我們構建了事件處理平台,抽象成三個模塊,事件管理,規則引擎和動作觸發。

image.png

這是事件處理平台所支持的業務場景。

image.png

這是事件處理平台的架構,總體來說就是管理模塊,引擎和動作觸發。在中間這里我們提供了一個適配層,可以跟第三方系統進行集成。

image.png

這是我們事件處理的操作流程,首先是創建數據源,與實時計算平台類似,主要支持 Kafka,在 Kafka 消息流上定義我們的數據格式。

image.png

在數據源基礎上創建事件流,事件流包含了同類事件,我們實現了一些算子,可以在數據源的基礎上做一些操作。從右側可以看到,在多個數據源上進行了一些過濾、加解密的操作,最終通過 union 算子匯總成一個統一格式的同類事件的事件流,方便后續使用。

image.png

在事件流的基礎上可以定義單個的事件,之后可以創建事件組,以對接我們的業務含義,即明確具體的業務是做什么的,如用戶的點擊、瀏覽、分享、關注等事件。創建事件組有兩種方式:

  • 一是本地方式,即可以根據事件的各個字段和維度設定條件;
  • 二是遠程方式,這與我們的埋點系統(用戶行為日志)直接連通,可以直接得到用戶事件的定義。

image.png

任務配置過程分幾個部分,這是 log 監控的任務樣例。上圖展示的是事件處理的規則設置部分。這是一個 CEP 事件,可以定義事件窗口,獲取具體事件,在此之上定義 CEP 的模式,還可以定義事件的輸出,例如需要輸出哪些字段。

image.png

這是觸發動作調用,支持消息發送,服務調用及落地 Kafka。截圖展示的是消息發送的樣例。

五、未來規划

image.png

這是我們實時計算的整體架構,下部是 Hermes 實時計算平台,主要包括任務管控、SQL 引擎、CEP 引擎等各種能力。Data Pipeline、實時數倉及事件處理平台的任務都是通過此平台進行管控。未來我們計划做的是用戶數據平台,如各業務方對用戶的線上行為的歷史查詢,以及在全平台用戶數據的綜合分析。

image.png

對未來的規划主要有以上幾個方向,包括狀態的管理及恢復、動態的資源分配(動態的配置、動態的資源調整)。為了保持任務的穩定性,我們在也計划在高可用性方面做一些調研。在流批一體方面,會借用數據湖的能力,提供對歷史和實時數據的混合查詢的支持。

原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM