簡介: 一文了解快手基於 Flink 構建的實時數倉架構,以及一些難題的解決方案。
本文整理自快手數據技術專家李天朔在 5 月 22 日北京站 Flink Meetup 分享的議題《快手基於 Flink 構建實時數倉場景化實踐》,內容包括:
- 快手實時計算場景
- 快手實時數倉架構及保障措施
- 快手場景問題及解決方案
- 未來規划
一、快手實時計算場景
快手業務中的實時計算場景主要分為四塊:
- 公司級別的核心數據:包括公司經營大盤,實時核心日報,以及移動版數據。相當於團隊會有公司的大盤指標,以及各個業務線,比如視頻相關、直播相關,都會有一個核心的實時看板;
- 大型活動實時指標:其中最核心的內容是實時大屏。例如快手的春晚活動,我們會有一個總體的大屏去看總體活動現狀。一個大型的活動會分為 N 個不同的模塊,我們對每一個模塊不同的玩法會有不同的實時數據看板;
- 運營部分的數據:運營數據主要包括兩方面,一個是創作者,另一個是內容。對於創作者和內容,在運營側,比如上線一個大 V 的活動,我們想看到一些信息如直播間的實時現狀,以及直播間對於大盤的牽引情況。基於這個場景,我們會做各種實時大屏的多維數據,以及大盤的一些數據。
此外,這塊還包括運營策略的支撐,比如我們可能會實時發掘一些熱點內容和熱點創作者,以及目前的一些熱點情況。我們基於這些熱點情況輸出策略,這個也是我們需要提供的一些支撐能力;
最后還包括 C 端數據展示,比如現在快手里有創作者中心和主播中心,這里會有一些如主播關播的關播頁,關播頁的實時數據有一部分也是我們做的。
- 實時特征:包含搜索推薦特征和廣告實時特征。
二、快手實時數倉架構及保障措施
1. 目標及難點
1.1 目標
- 首先由於我們是做數倉的,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的數據差異在 1% 以內,這是最低標准。
- 其次是數據延遲,其 SLA 標准是活動期間所有核心報表場景的數據延遲不能超過 5 分鍾,這 5 分鍾包括作業掛掉之后和恢復時間,如果超過則意味着 SLA 不達標。
- 最后是穩定性,針對一些場景,比如作業重啟后,我們的曲線是正常的,不會因為作業重啟導致指標產出一些明顯的異常。
1.2 難點
- 第一個難點是數據量大。每天整體的入口流量數據量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達到億 / 秒。
- 第二個難點是組件依賴比較復雜。可能這條鏈路里有的依賴於 Kafka,有的依賴 Flink,還有一些依賴 KV 存儲、RPC 接口、OLAP 引擎等,我們需要思考在這條鏈路里如何分布,才能讓這些組件都能正常工作。
- 第三個難點是鏈路復雜。目前我們有 200+ 核心業務作業,50+ 核心數據源,整體作業超過 1000。
2. 實時數倉 - 分層模型
基於上面三個難點,來看一下數倉架構:
如上所示:
- 最下層有三個不同的數據源,分別是客戶端日志、服務端日志以及 Binlog 日志;
-
在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細數據,另一個是 DWS 層,做公共聚合數據,DIM 是我們常說的維度。我們有一個基於離線數倉的主題預分層,這個主題預分層可能包括流量、用戶、設備、視頻的生產消費、風控、社交等。
- DWD 層的核心工作是標准化的清洗;
- DWS 層是把維度的數據和 DWD 層進行關聯,關聯之后生成一些通用粒度的聚合層次。
- 再往上是應用層,包括一些大盤的數據,多維分析的模型以及業務專題數據;
- 最上面是場景。
整體過程可以分為三步:
- 第一步是做業務數據化,相當於把業務的數據接進來;
- 第二步是數據資產化,意思是對數據做很多的清洗,然后形成一些規則有序的數據;
- 第三步是數據業務化,可以理解數據在實時數據層面可以反哺業務,為業務數據價值建設提供一些賦能。
3. 實時數倉 - 保障措施
基於上面的分層模型,來看一下整體的保障措施:
保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩定保障。
-
我們先看藍色部分的質量保障。針對質量保障,可以看到在數據源階段,做了如數據源的亂序監控,這是我們基於自己的 SDK 的采集做的,以及數據源和離線的一致性校准。研發階段的計算過程有三個階段,分別是研發階段、上線階段和服務階段。
- 研發階段可能會提供一個標准化的模型,基於這個模型會有一些 Benchmark,並且做離線的比對驗證,保證質量是一致的;
- 上線階段更多的是服務監控和指標監控;
- 在服務階段,如果出現一些異常情況,先做 Flink 狀態拉起,如果出現了一些不符合預期的場景,我們會做離線的整體數據修復。
-
第二個是時效性保障。針對數據源,我們把數據源的延遲情況也納入監控。在研發階段其實還有兩個事情:
- 首先是壓測,常規的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;
- 通過壓測之后,會有一些任務上線和重啟性能評估,相當於按照 CP 恢復之后,重啟的性能是什么樣子。
-
最后一個是穩定保障,這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基於之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況。之后我們會有兩種不同的標准,一種是冷備雙機房,另外一種是熱備雙機房。
- 冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;
- 熱備雙機房:相當於同樣一份邏輯在兩個機房各部署一次。
以上就是我們整體的保障措施。
三、快手場景問題及解決方案
1. PV/UV 標准化
1.1 場景
第一個問題是 PV/UV 標准化,這里有三個截圖:
第一張圖是春晚活動的預熱場景,相當於是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖。
在活動進行過程中,我們發現 60~70% 的需求是計算頁面里的信息,如:
- 這個頁面來了多少人,或者有多少人點擊進入這個頁面;
- 活動一共來了多少人;
- 頁面里的某一個掛件,獲得了多少點擊、產生了多少曝光。
1.2 方案
抽象一下這個場景就是下面這種 SQL:
簡單來說,就是從一張表做篩選條件,然后按照維度層面做聚合,接着產生一些 Count 或者 Sum 操作。
基於這種場景,我們最開始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機制,從 Source 數據源取數據,之后做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之后會有一個叫做 Local Window Agg 的東西,相當於數據分完桶之后把相同類型的數據相加。Local Window Agg 之后再按照維度進行 Global Window Agg 的合桶,合桶的概念相當於按照維度計算出最終的結果。Early Fire 機制相當於在 Local Window Agg 開一個天級的窗口,然后每分鍾去對外輸出一次。
這個過程中我們遇到了一些問題,如上圖左下角所示。
在代碼正常運行的情況下是沒有問題的,但如果整體數據存在延遲或者追溯歷史數據的情況,比如一分鍾 Early Fire 一次,因為追溯歷史的時候數據量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的數據,而 14:01 的那個點就被丟掉了,丟掉了以后會發生什么?
在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史數據的結果。橫坐標是分鍾,縱坐標是截止到當前時刻的頁面 UV,我們發現有些點是橫着的,意味着沒有數據結果,然后一個陡增,然后又橫着的,接着又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線。
為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。
數據開一個大的天級窗口,大窗口下又開了一個小的分鍾級窗口,數據按數據本身的 Row Time 落到分鍾級窗口。
- Watermark 推進過了窗口的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,數據本身落在真實的窗口, Watermark 推進,在窗口結束后觸發。
- 此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序數據本身是一個不丟棄的狀態,會記錄到最新的累計數據。
- 最后是語義一致性,它會基於事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的。
以上是 PV/UV 一個標准化的解決方案。
2. DAU 計算
2.1 背景介紹
下面介紹一下 DAU 計算:
我們對於整個大盤的活躍設備、新增設備和回流設備有比較多的監控。
- 活躍設備指的是當天來過的設備;
- 新增設備指的是當天來過且歷史沒有來過的設備;
- 回流設備指的是當天來過且 N 天內沒有來過的設備。
但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標。
我們看一下離線過程中,邏輯應該怎么算。
首先我們先算活躍設備,把這些合並到一起,然后做一個維度下的天級別去重,接着再去關聯維度表,這個維度表包括設備的首末次時間,就是截止到昨天設備首次訪問和末次訪問的時間。
得到這個信息之后,我們就可以進行邏輯計算,然后我們會發現新增和回流的設備其實是活躍設備里打的一個子標簽。新增設備就是做了一個邏輯處理,回流設備是做了 30 天的邏輯處理,基於這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?
其實我們最開始是這么做的,但遇到了一些問題:
- 第一個問題是:數據源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的過程之中都要改,單作業的穩定性會非常差;
- 第二個問題是:數據量是萬億級,這會導致兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 存儲,任何一個這樣的 RPC 服務接口,都不可能在萬億級數據量的場景下保證服務穩定性;
- 第三個問題是:我們對於時延要求比較高,要求時延小於一分鍾。整個鏈路要避免批處理,如果出現了一些任務性能的單點問題,我們還要保證高性能和可擴容。
2.2 技術方案
針對以上問題,介紹一下我們是怎么做的:
如上圖的例子,第一步是對 A B C 這三個數據源,先按照維度和 DID 做分鍾級別去重,分別去重之后得到三個分鍾級別去重的數據源,接着把它們 Union 到一起,然后再進行同樣的邏輯操作。
這相當於我們數據源的入口從萬億變到了百億的級別,分鍾級別去重之后再進行一個天級別的去重,產生的數據源就可以從百億變成了幾十億的級別。
在幾十億級別數據量的情況下,我們再去關聯數據服務化,這就是一種比較可行的狀態,相當於去關聯用戶畫像的 RPC 接口,得到 RPC 接口之后,最終寫入到了目標 Topic。這個目標 Topic 會導入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等。
這個方案有三個方面的優勢,分別是穩定性、時效性和准確性。
- 首先是穩定性。松耦合可以簡單理解為當數據源 A 的邏輯和數據源 B 的邏輯需要修改時,可以單獨修改。第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現了如流量問題,不會影響后面的部分,所以它擴容比較簡單,除此之外還有服務化后置和狀態可控。
- 其次是時效性,我們做到毫秒延遲,並且維度豐富,整體上有 20+ 的維度做多維聚合。
- 最后是准確性,我們支持數據驗證、實時監控、模型出口統一等。
此時我們遇到了另外一個問題 - 亂序。對於上方三個不同的作業,每一個作業重啟至少會有兩分鍾左右的延遲,延遲會導致下游的數據源 Union 到一起就會有亂序。
2.3 延遲計算方案
遇到上面這種有亂序的情況下,我們要怎么處理?
我們總共有三種處理方案:
- 第一種解決方案是用 “did + 維度 + 分鍾” 進行去重,Value 設為 “是否來過”。比如同一個 did,04:01 來了一條,它會進行結果輸出。同樣的,04:02 和 04:04 也會進行結果輸出。但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出。
這個解決方案存在一些問題,因為我們按分鍾存,存 20 分鍾的狀態大小是存 10 分鍾的兩倍,到后面這個狀態大小有點不太可控,因此我們又換了解決方案 2。
- 第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在數據源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示。
04:01 來了一條數據,進行結果輸出。04:02 來了一條數據,如果是同一個 did,那么它會更新時間戳,然后仍然做結果輸出。04:04 也是同樣的邏輯,然后將時間戳更新到 04:04,如果后面來了一條 04:01 的數據,它發現時間戳已經更新到 04:04,它會丟棄這條數據。
這樣的做法大幅度減少了本身所需要的一些狀態,但是對亂序是零容忍,不允許發生任何亂序的情況,由於我們不好解決這個問題,因此我們又想出了解決方案 3。
- 方案 3 是在方案 2 時間戳的基礎之上,加了一個類似於環形緩沖區,在緩沖區之內允許亂序。
比如 04:01 來了一條數據,進行結果輸出;04:02 來了一條數據,它會把時間戳更新到 04:02,並且會記錄同一個設備在 04:01 也來過。如果 04:04 再來了一條數據,就按照相應的時間差做一個位移,最后通過這樣的邏輯去保障它能夠容忍一定的亂序。
綜合來看這三個方案:
- 方案 1 在容忍 16 分鍾亂序的情況下,單作業的狀態大小在 480G 左右。這種情況雖然保證了准確性,但是作業的恢復和穩定性是完全不可控的狀態,因此我們還是放棄了這個方案;
- 方案 2 是 30G 左右的狀態大小,對於亂序 0 容忍,但是數據不准確,由於我們對准確性的要求非常高,因此也放棄了這個方案;
- 方案 3 的狀態跟方案 1 相比,它的狀態雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鍾,我們正常更新一個作業的話,10 分鍾完全足夠重啟,因此最終選擇了方案 3。
3. 運營場景
3.1 背景介紹
運營場景可分為四個部分:
- 第一個是數據大屏支持,包括單直播間的分析數據和大盤的分析數據,需要做到分鍾級延遲,更新要求比較高;
- 第二個是直播看板支持,直播看板的數據會有特定維度的分析,特定人群支持,對維度豐富性要求比較高;
- 第三個是數據策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的數據,更新要求比較低;
- 第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定。
下面進行分析這 4 種不同的狀態產生的一些不同的場景。
前 3 種基本沒有什么差別,只是在查詢模式上,有的是特定業務場景,有的是通用業務場景。
針對第 3 種和第 4 種,它對於更新的要求比較低,對於吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高。
3.2 技術方案
針對上方 4 種不同的場景,我們是如何去做的?
- 首先看一下基礎明細層 (圖中左方),數據源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費信息,還有觀看 / 點贊 / 評論。經過一輪基礎清洗,然后做維度管理。上游的這些維度信息來源於 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 存儲里邊,包括一些用戶的維度。
這些維度關聯了之后,最終寫入 Kafka 的 DWD 事實層,這里為了做性能的提升,我們做了二級緩存的操作。
- 如圖中上方,我們讀取 DWD 層的數據然后做基礎匯總,核心是窗口維度聚合生成 4 種不同粒度的數據,分別是大盤多維匯總 topic、直播間多維匯總 topic、作者多維匯總 topic、用戶多維匯總 topic,這些都是通用維度的數據。
- 如圖中下方,基於這些通用維度數據,我們再去加工個性化維度的數據,也就是 ADS 層。拿到了這些數據之后會有維度擴展,包括內容擴展和運營維度的拓展,然后再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic。
分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度。通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發現任務經常出問題,並且分不清楚哪個任務的職責是什么,構建不出這樣的一個穩定層。
- 如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看板的場景。
四、未來規划
上文一共講了三個場景,第一個場景是標准化 PU/UV 的計算,第二個場景是 DAU 整體的解決方案,第三個場景是運營側如何解決。基於這些內容,我們有一些未來規划,分為 4 個部分。
-
第一部分是實時保障體系完善:
- 一方面做一些大型的活動,包括春晚活動以及后續常態化的活動。針對這些活動如何去保障,我們有一套規范去做平台化的建設;
- 第二個是分級保障標准制定,哪些作業是什么樣的保障級別 / 標准,會有一個標准化的說明;
- 第三個是引擎平台能力推動解決,包括 Flink 任務的一些引擎,在這上面我們會有一個平台,基於這個平台去做規范、標准化的推動。
-
第二部分是實時數倉內容構建:
- 一方面是場景化方案的輸出,比如針對活動會有一些通用化的方案,而不是每次活動都開發一套新的解決方案;
- 另一方面是內容數據層次沉淀,比如現在的數據內容建設,在厚度方面有一些場景的缺失,包括內容如何更好地服務於上游的場景。
- 第三部分是 Flink SQL 場景化構建,包括 SQL 持續推廣、SQL 任務穩定性和 SQL 任務資源利用率。我們在預估資源的過程中,會考慮比如在什么樣 QPS 的場景下, SQL 用什么樣的解決方案,能支撐到什么情況。Flink SQL 可以大幅減少人效,但是在這個過程中,我們想讓業務操作更加簡單。
- 第四部分是批流一體探索。實時數倉的場景其實就是做離線 ETL 計算加速,我們會有很多小時級別的任務,針對這些任務,每次批處理的時候有一些邏輯可以放到流處理去解決,這對於離線數倉 SLA 體系的提升十分巨大。
本文為阿里雲原創內容,未經允許不得轉載。