一、簡介
- 一般業務訴求:在第一時間拿到經過加工后的數據,以便實時監控當前業務狀態並作出運營決策,引導業務往好的方向發展。
- 按照數據的延時情況,數據時效性一般分為三種(離線、准實時、實時):
- 離線:在今天(T)處理 N 天前(T - N ≥ 1)的數據,延遲時間粒度為天;
- 准實時:在當前小時(H)處理 N 小時前(H - N,N > 0,如 0.5 小時、1 小時等)的數據,延遲時間粒度為小時;
- 實時:在當前時刻處理當前的數據,延遲時間粒度為秒;
- 離線和准實時數據可以在批處理系統中實現(如 Hadoop、MaxCompute、Spark 等系統),只是調度周期不一樣而已;而實時數據則需要在流式處理系統中完成;
- 流式數據處理技術:業務系統每產生一條數據,就會立刻被采集並實時發送到流式任務中進行處理,不需要定時調度任務來處理數據;
-
流式數據處理特質:
- 時效性高:
- 數據實時采集、實時處理,延時粒度在秒級甚至毫秒級,業務方面能夠在第一時間拿到經過加工處理后的數據。
- 常駐任務:
- 區別於離線任務的周期調度,流式任務屬於常駐進程任務,一旦啟動后就會一直運行,直到人為的終止,因此即使成本會相對較高。
- 這一特點也預示着流式任務的數據源是無界的,離線任務的數據源是有界的。
- 是實時處理和離線處理最主要的差別。
- 性能要求高
- 實時計算對數據處理的性能要求非常嚴格,如果處理吞吐量跟不上采集吞吐量,計算出來的數據就失去了實時的特性,因此,實時處理的性能優化占了任務開發的很大一部分工作。
- 應用局限性
# 計算成本較大,對於業務復雜的場景(如雙流關聯或者需要數據回滾的情況),其局限性導致支持不足;
# 由於數據源是流式的,在數據具體有上下文關系的情況下,數據到達時間的不確定性導致實時處理跟離線處理得出的結構有一定的差異;
二、流式技術架構
- 在流式計算技術中,需要各個子系統之間相互依賴形成一條數據處理鏈路,才能產出結構對外提供實時數據服務。
- 在實際技術選型時,可選的開源技術方案非常多,但是各個方案的整體架構是類似的,只是各個子系統的實現原理不太一樣。
- 流式技術架構中的系統跟離線處理是交叉的,兩套技術方案並不是完全獨立的,並且在業界中有合並的趨勢;
-
各個子系統按功能划分:
- 數據采集:數據的源頭,一般來自於各個業務的日志服務器,這些數據被實時采集到數據中間件中,供下游實時訂閱使用;
- 數據處理:數據被采集到的中間件中后,需要下游實時訂閱數據,並拉取到流式計算系統的任務中進行加工處理。(這里需要提供流式計算引擎以支持流式任務的執行)
- 數據存儲:數據被實時加工處理(比如聚合、清洗等)后,會寫到某個在線服務的存儲系統中,供下游調用方使用。(這里的寫操作是增量操作,並且是源源不斷的)
- 數據服務:在存儲系統上會架設一層統一的數據服務層(比如提供 HSF 接口、HTTP 服務等),用於獲取實時計算結果。
- 從六十技術架構圖來看,在數據采集和數據服務部分實時和離線是公用的,因為在這兩層中都不需要關系數據的時效性。這樣才能做到數據源的統一,避免六十處理和離線處理的不一致。
1、數據采集
- 數據采集是整個數據處理鏈路的源頭,是所有數據處理的根節點,需要做到實時采集。
- 所采集的數據都來自於業務服務器,可以分為兩種采集數據類型:
- 數據庫變更日志。(比如 MySQL 的 binlog 日志、HBase 的 hlog 日志、OceanBase 的變更日志、Oracle 的變更日志等)
- 引擎訪問日志。(比如用戶訪問網站產生的 Apache 引擎日志、搜索引擎的接口查詢日志等)
- 不管是數據庫變更日志還是引擎訪問日志,都會在業務服務器上落地成文件,所以只要監控文件的內容發生變化,采集工具就可以把最新的數據采集下來。
- 處於吞吐量以及系統壓力上的考慮,並不是新增一條記錄就采集一條,而是根據兩個原則進行采集:
- 數據大小限制:當達到限制條件時,把目前采集到的新數據作為一批。(例如 512 KB 寫一批)
- 時間閾值限制:當時間達到一定條件時,把目前采集到的新數據作為一批,避免在數據量少的情況下采集(例如 30 秒一批)。
- 兩個條件的參數需要根據業務的需求來設定。(當批次采集頻繁時,可以降低延時,但必然會導致吞吐量下降。)
- 采集到的數據需要一個數據交換平台(數據中間件)分發給下游。(數據中間件系統有很多實現方式,比如開源的系統有 Kafka,阿里內部采用的是 Time Tunnel ,還有 MetaQ、Notify 等消息系統)
- 消息系統是數據庫變更節點的上游,延時比數據中間件低很多,但是其支持的吞吐量有限。因此,消息系統一般用作業務數據庫變更的消息中轉。(對於較大的業務數據,一般通過數據中間件系統來中轉,但是其支持的吞吐量高。)
- 一般有些業務並沒有通過消息系統對數據庫進行更新,因此,從消息系統中獲取的數據並不是最全的,而通過數據庫變更日志拿到的業務變更過程數據肯定是全的。因此,為了和離線數據源保持一致,一般都是通過數據中間件來采集數據庫變更數據這種方式來獲取實時數據的(這需要在數據處理層對業務主鍵進行 merge 處理,比如一筆訂單可能會被變更多次,會有多條變更記錄,這時需要進行 merge 拿到最新的數據)。
- 時效性和吞吐量是數據處理中的兩個矛盾體,需要從業務的角度來權衡使用什么樣的系統來做數據中轉。
2、數據處理
- 實時計算任務部署在流式計算系統上,通過數據中間件獲取到實時源數據后進行實時加工處理。
- 業界使用較廣泛的流計算引擎系統:Twitter 開源的 Storm 系統、虎牙開源的 S4 系統、Apache 的 Spark Streaming、Flink。
- 阿里使用的是阿里雲提供的 StreamCompute:涵蓋了從數據采集到數據生產各個環節,力保流計算開發嚴謹、可靠。(提供的 SQL 語義的流式數據分析能力(StreamSQL),讓流數據分析門檻不再存在。)
- 流數據處理原理(以 Strorm 為例):
- spout:拓撲的輸入。(從數據中間件中讀取數據,並根據自定義的分發規則發送給下游的 blot,可以由多個輸入源)
- bolt:業務處理單元。(根據處理邏輯分為多個步驟,其相互之間的數據分發規則是自定義的)
- 實時數據處理應用出於性能考慮,計算任務往往是多線程的,一般會根據業務主鍵進行分桶處理,並且大部分計算過程需要的數據都會在內存中,大大提高應用的吞吐量。(為了避免內存溢出,內存中過期的數據需要定時清理,可以按照 LRU(最近最少使用)算法或者業務時間集合歸類清理)
-
實時任務中的典型問題
-
去重指標
- 在 BI(商業智能)統計類實時任務中,去重指標在對資源消耗的評判指標總非常重要。
- 由於實時任務為了追求處理性能,計算邏輯一般都在內存中完成,中間結果數據也緩存在內存中,帶來了內存消耗過多的問題。
- 在計算去重時,勢必要把去重的明細數據保存下來,當去重的明細數據達到上億甚至幾十億時,內存中放不下,此時需要分兩種情況來看:
- 精確去重:明細數據必須保存下來,當遇到內存問題是,通過數據傾斜來進行處理,把一個節點的內存壓力分到多個節點上。
- 模糊去重:在明細數據量非常大,而業務的精度要求不高時,可以使用相關的去重算法,把內存的使用量降到千分之一甚至萬分之一,以提高內存的利用率。
-
去重算法:
- 布隆過濾器:
- 位數組算法的應用,不保存真時的明細數據,只保存明細數據對應哈希值的標記位。(采用此算法存儲 1 億條數據只需要 100 多 MB 內存)
- 會出現哈希值碰撞的情況,但是誤差率可以控制,計算出來的去重值比真實值小。
- 適用場景:統計精度要求不高,統計緯度值非常多的情況。
- 技術估計:
- 利用哈希的原理,按照數據的分散程度估算現有數集的邊界,從而得出大概的去重值總和。(采用此算法存儲 1 億條數據只需要幾 KB 的內存)
- 估算的去重值可能比真實值大,也可能比真實值小。
- 適用場景:統計精度要求不高,統計維度非常粗的情況。
-
數據傾斜
- 數據傾斜:對於集群系統,一般緩存是分布式的,即不同的節點負責一定范圍的數據(一個節點上完成相關的計算任務),在數據量非常大的時候,造成有些節點上數據量非常大,而有些節點上的數據量很少(大量的緩存集中到了一台或幾台服務節點上了),但是單個節點處理能力有限,必然會遇到性能瓶頸,這種現象稱為數據傾斜。
- 數據傾斜違背了並行計算的初衷,一個或部分節點要承受很大的計算壓力,而其他節點計算完畢后要等到忙碌的節點,拖累了整體的計算時間,效率十分低下。
- 不同的數據字段可能的數據傾斜一般有兩種:
- 唯一值非常少:極少數值有非常多的記錄值(唯一值少於幾千);
- 唯一值比較多:這個字段的某些值有遠遠多於其它值的記錄值,但是它的占比小於百分之一或千分之一;
- 正常的數據分布理論上都是傾斜的;
- 二八原理:80% 的財富集中在了 20% 的人手中,80% 的用戶使用 20% 的功能,20% 的用戶貢獻了 80% 的訪問量;
- 數據傾斜在 MapReduce 編程模型中十分常見,用最通俗易懂的話來說,數據傾斜無非就是大量的相同 key 被 partition 分配到一個分區里,造成了 “一個人累死,其他人閑死” 的情況。
- 解決方法:分桶處理(和離散處理的思路一樣)
- 去重指標分桶:通過對去充值進行分桶 Hash,相同的值一定會被放在同一個桶中去重,最后再把每個桶里的值進行加和就得到總值。(這里利用了每個桶的 CPU和內存資源)
- 非去重指標分桶:數據隨機分發到每個桶中,最后再把每個桶的值匯總。(主要利用的是各個桶的 CPU 能力)
-
事務處理
- 由於實時計算是分布處理的,系統的不穩定性必然會導致數據的處理有可能出現失敗的情況。(比如網絡的抖動導致數據發送不成功、機器重啟導致數據丟失等)
- 針對這些情況,流式計算系統提供了數據自動 ACK、失敗重發以及事務信息等機制(都是為了保證數據的冪等性):
- 超時時間:由於數據處理是按照批次來進行的,當一批數據處理超時時,會從拓撲的 spout 端重發數據。另外,批次處理的數據量不宜過大,應該增加一個限流的功能(限定一批數據的記錄數或者容量等),避免數據處理超時;
- 事務信息:每批數據都會附帶一個事務 ID 的信息,在重發的情況下,讓開發者自己根據事務信息去判斷數據第一次到達和重發時不同的處理邏輯;
- 備份機制:開發人員需要保證內存數據可以通過外部存儲恢復,因此在計算中用到的中間結果數據需要備份到外出存儲中;
3、數據存儲
- 實時任務在運行過程中,會計算很多維度和指標,這些數據需要放在一個存儲系統中作為恢復或者關聯使用。其中會涉及 3 中類型的數據:
- 中間計算結果——在實時應用處理過程中,會有一些狀態的保存(比如去重指標的明細數據),用於在發送故障時,使用數據庫中的數據恢復內存現場。
- 最終結果數據——指通過 ETL 處理后的實時結果數據。這些數據是實時更新的,寫的頻率非常高,可以被下游直接使用;
- 維表數據——在離線計算系統中,通過同步工具導入到在線存儲系統中,供實時任務來關聯實時流數據。
- 實時任務是多線程處理的,數據存儲系統必須能夠比較好的支持多並發讀寫,並且延時需要在毫秒級才能滿足實時的性能需求。
- 實踐中,一般使用 HBase、Tair、MongoDB 等列式存儲系統:
- 由於這些系統在寫數據時先寫內存再落磁盤,因此寫延時在毫秒級;讀請求也有緩存機制,多並發讀是也可以達到毫秒級延時。
- 這些系統的缺點(以 HBase 為例):一張表必須有 rowkey,而 rowkey 是按照 ASCII 碼來排序的,就像關系型數據庫的索引一樣,rowkey 的規則限制了讀取數據的方式,如果業務方需要使用另一種讀取數據的方式,就必須重新輸出 rowkey。
- HBase 的一張表能夠存儲幾 TB 甚至幾十 TB 的數據,而關系型數據庫必須要分庫分表才能實現這個量級的數據存儲,因此,對於海量數據的實時計算,一般會采用非關系型數據庫,以應對大量的多並發讀寫。
-
數據統計中表名設計和 rowkey 設計的實踐經驗:
1)表名設計
- 設計規則:匯總層標識 + 數據域 + 主維度 + 實踐維度
- 例:dws_trd_slr_dtr,表示匯總層交易數據,根據賣家(slr)主維度 + 0 點截止當日(dtr)進行統計匯總;
- 優點:所有主維度相同的數據都放在一張物理表中,避免表數量過多,難以維護。另外,可以從表名上直觀的看到存儲的是什么數據內容,方便排查問題;
2)rowkey 設計
- 設計規則:MD5 + 主維度 + 維度標識 + 子維度1 + 時間維度 + 子維度2
- 例:賣家 ID 的 MD5 前四位 + 賣家ID + app + 一級類目ID + ddd + 二級類目ID;(賣家 ID 屬於主維度,在查數據時是必傳的)
- 以 MD5 的前四位作為 rowkey 的第一部分,可以把數據散列,讓服務器整體負載是均衡的,避免熱點問題。
- 每個統計維度都會生成一個維度標識,以便在 rowkey 上做區分;
4、數據服務
- 實時數據落地到存儲系統中后,使用方就可以通過統一的數據服務(如 OneService)獲取到實時數據,其好處是:
- 不需要直接連數據庫,數據源等信息在數據服務層維護,這樣當存儲器遷移時,對下游是透明的;
- 調用方只需要使用服務層暴露的接口,不需要關心底層取數據邏輯的實現;
- 屏蔽存儲系統間的差異,統一的調用日志輸出,便於分析和監控下游使用情況;
三、流式數據模型
- 數據模型設計貫穿數據處理過程,流式數據處理也一樣,需要對數據流建模分層。
- 實時建模跟離線建模非常類似,數據模型整體分為五層:ODS、DWD、DWS、ADS、DIM。
- 由於實時計算的局限性,每一層中並沒有像離線做得那么寬,維度和指標也沒有那么多,特別是涉及回溯狀態的指標,在實時數據模型中幾乎沒有。
1、數據分層
1)ODS 層
- ODS:操作數據層,是直接從業務系統采集過來的原始數據。
- ODS 數據層包含了所有業務的變更過程,數據粒度是最細的。
- 在 ODS 層,實時和離線在源頭上是統一的,好處是用同一份數據加工出來的指標,口徑基本是統一的,可以更方便進行實時和離線數據比對。
2)DWD 層
- DWD:是在 ODS 層基礎上,根據業務過程建模出來的實時事實明細層。
- 對於訪問 DWD 層的數據(沒有上下文關系,並且不需要等待過程的記錄),會回溯到離線系統,供下游使用,最大程度的保證實時和離線數據在 ODS 層和 DWD 層是一致的。
3)DWS 層
- DWS 層:訂閱明細層的數據后,在實時任務中計算各個維度的匯總指標。
- 如果維度是各個垂直業務線通用的,則會放在實時通用匯總層,作為通用的數據模型使用。
4)ADS 層
- ADS:個性化維度匯總層。
- 對於不是特別通用的統計維度數據,會放在 ADS 層,在 ADS 層,計算只有自身業務才會關注的維度和指標,跟其他業務線一般沒有交集,通常用於垂直創新業務中。
5)DIM 層
- DIM:實時維表層。
- DIM 層的數據基本上都是從離線維表層導出來的,抽取到在線系統中供實時應用調用。
- 在 DIM 層,對實時應用來說是靜態的,所有的 ETL 處理工作會在離線系統中完成。
- 舉例說明流數據模型的每一層所存儲的數據:
- ODS 層:訂單粒度的變更過程,一筆訂單有多條記錄;
- DWD 層:訂單粒度的支付記錄,一筆訂單只有一條記錄;
- DWS 層:賣家的實時成交金額,一個賣家只有一條記錄,並且指標在實時刷新;
- ADS 層:外賣地區的實時成交金額,只有外賣業務使用;
- DIM 層:訂單商品類目和行業的對應關系維表。
- ODS 層到 DIM 層的 ETL 處理是在離線系統中進行的,處理完成后會同步到實時計算所使用的存儲系統,
- ODS 層和 DWD 層會放在數據中間件中,供下游訂閱使用。
- DWS 層和 ADS 層落地到在線存儲系統中,下游通過接口調用的形式使用。
- 在每一層中,按照重要性划分為 P0、P1、P2、P3等級,P0 屬於最高優先級保障。根據不同的優先級給實時任務分配不同的計算和存儲資源,力求重要的任務可以得到最好的保障。
- 字段命名、表命名、指標命名是按照 OneData 規范來定義的,以便更好的維護和管理。
2、多流關聯
- 多流關聯:在流式計算中,常常要把兩個實時流進行主鍵關聯,以得到對應的實施明細表。
- 在離線系統中,兩個表關聯非常簡單,因為離線計算在任務啟動時已經可以獲得兩張表的全量數據,只要根據關聯鍵進行分桶關聯即可。
- 在流式系統中,流式計算的過程中,數據的到達是一個增量過程,並且數據到達的時間是不確定的和無序的,因此在數據處理過程中會涉及中間狀態的保存和恢復機制等細節問題。
- 多流關聯的一個關鍵點就是需要相互等待,只有雙方都到達了,才能關聯:
- 比如:A 表和 B 表使用 ID 進行實時關聯,由於無法知道兩個表的到達順序,因此在兩個數據流的每條新數據到來時,都需要到另一張表中進行查找,如 A 表的某條數據到達,到 B 表的全量數據中查找,如果能查找到,說明可以關聯上,拼接成一條記錄直接輸出到下游;但是如果關聯不上,則需要放在內存或外出存儲中等待,直到 B 表的記錄也到達。
- 不管是否關聯成功,內存中的數據都需要備份到外部存儲系統中,在任務重啟時,可以從外部存儲系統中恢復內存數據,這樣才能保證數據不丟失。(在重啟時,任務是續跑的,不會重新跑之前的數據)
- 訂單記錄的變更有可能發生多次(比如訂單的多個字段多次更新),在這種情況下,需要根據訂單 ID 去重,避免 A 表和 B 表多次關聯成功;否則輸出到下游就會有多條記錄,得到的數據有重復的。
- 雙流關聯過程中,在實際處理時,考慮到查找數據的新能,實時關聯這個步驟一般會把數據按照關聯主鍵進行分桶處理,並且在故障恢復時也根據分桶來進行,以降低查找數據量和提高吞吐量。
3、維表使用
- 事實表:存放實實在在數據的表;
- 維表:解釋事實表中關鍵字維度的具體內容,不存放具體數據;
- 一般一個事實表對應一個或多個維表;
-
例:
- 事實表
- 維度表(此維表用來解釋不同的機構代碼代表的具體內容):
- 在離散系統中,一般是根據業務分區來關聯事實表和維表的,因為在關聯之前維表的數據已經就緒了。
- 在實時計算中,關聯維表一般會使用當前的實時數據(T)去關聯 T - 2 的維表數據,相當於在 T 的數據到達前把維表的數據准備好,並且一般都是一份靜態的數據。
- 在實時計算中,采用這種關聯方式的原因:
1)數據無法及時准備好
- 當到達零點時,實時流數據必須去關聯維表(因為不能等待,等待會失去實時的特性),而這個時候 T-1 的維表數據一般不能再零點馬上准備就緒(因為 T-1 的數據需要在 T 這一天加工生產),因此去關聯 T-2 維表,相當於在 T-1 的一天時間里加工好 T-2 的維表數據。
2)無法准確獲取全量的最新數據
- 維表一般是全量的數據,如果需要實時獲取到當天的最新維表數據,需要 T-1 的數據 + 當天變更才能獲取到完整的維表數據。也就是說,維表也作為一個實時流輸入,這就需要使用多流實時關聯來實現。但是由於實時數據是無序的並且到達時間不確定,因此在維表關聯上游歧義。
3)數據的無序性
- 在實時計算中,維表關聯一般都統一使用 T-2 的數據,這樣對於業務來說,起碼關聯到的維表數據是確定的(雖然維表數據有一定的延時,但是許多業務的維表在兩天之間是很少的)。
-
由於實時任務是常駐進程的,因此維表的使用分為兩種形式:
- 全量加載
- 使用情況:在維表數據較少的情況下,可以一次性加載到內存中,在內存中直接和實時流數據進行關聯,效率非常高。
- 缺點:一致占用內存,並且需要定時更新。
- 增量加載
- 使用情況:維表數據很多,沒辦法全部加載到內存中,可以使用增量查找和 LRU 過期的形式,讓最熱門的數據留在內存中。
- 優點:可以控制內存的使用量;
- 缺點:需要查找外部存儲系統,運行效率會降低。
四、大促挑戰 & 保障
1、大促特征
- 毫秒級延時
- 洪峰明顯;(大促前會進行多次全鏈路壓測和預案梳理,企鵝包系統能夠承載主洪峰的沖擊)
- 高保障性;(一般采用多鏈路冗余的方式對強保障性的數據進行保障)
2、大促保障
1)如何進行實時任務優化
- 實時任務優化中經常考慮的要素:
1/1)獨占資源和共享資源的策略
- 共享資源池可以被多個實時任務搶占,如果一個任務在運行時 80% 以上的時間都需要去搶資源,這時候就需要考慮給它分配更多的獨享資源,避免搶不到 CPU 資源導致吞吐量急劇下降。
1/2)合理選擇緩存機制,盡量降低讀寫庫次數
- 內存讀寫性能是最好的,根據業務特性選擇不同的緩存機制,讓最熱和最可能使用的數據留在內存中,讀寫庫次數降低,吞吐量自然提升。
1/3)計算單元合並,降低拓撲層級
- 拓撲結構層級越深,性能越差。(因為數據在每個節點間傳輸時,大部分需要經過系列化和反序列化,而這個過程非常消耗 CPU 和時間)
1/4)內存對象共享,避免字符拷貝
- 在海量數據處理中,大部分對象都是以字符串形式存在的,在不同線程間合理共享對象,可以大幅降低字符拷貝帶來的性能消耗。(需要注意不合理使用帶來的內存溢出問題)
1/5)在高吞吐量和低延時間取平衡
- 高吞吐量和低延時,這兩個特性是一對矛盾體,當把多個讀寫庫操作或者 ACK 操作合並成一個時,可以大幅降低因為網絡請求帶來的消耗。(不過會導致延時增高,需要根據業務進行取舍)
2)如何進行數據鏈路保障
- 實時數據的處理鏈路:數據同步——數據計算——數據存儲——數據服務,每一個環節出現問題,都會導致實時數據停止更新;
- 為了保障實時數據的可用性,需要對整條計算鏈路都進行鏈路搭建,做到多機房容災,甚至異地容災:
- 查找鏈路中問題的方法:
- 通過工具比對多條鏈路計算的結果數據,當某條鏈路出現問題時,它一定會比其它鏈路計算的值小,並且差異會越來越大。
- 問題解決:一鍵切換到備鏈路,並且通過推送配置的形式讓其秒級生效,所有的接口調用會立刻切換到備鏈路。
3)如何進行壓測
- 壓測方式:模擬大促的峰值情況,驗證系統是否能夠正常運行。
- 壓測都是在線上環境中進行的,分為:數據壓測、產品壓測。
-
數據壓測:
- 主要是蓄洪壓測,就是把幾個小時甚至幾天的數據積累下來,並在某個時刻全部開放,模擬大促洪峰流量的情況。(這里面的數據是真實的)
- 產品壓測:產品本身壓測、前端頁面穩定性壓測。
- 產品本身壓測(以 “雙 11” 直播大屏為例):收集大屏服務端的所有讀操作的 URL,通過壓測平台進行壓測流量回放。(按照 QPS :500 次 / 秒的目標進行壓測,在也壓測過程中不斷的迭代優化服務端的性能,提升大屏應用處理數據的性能;)
- 前端頁面穩定性測試:將大屏頁面在瀏覽器中打開,並進行 8~24 小時的前端頁面穩定性測試。(監控大屏前端 JS 對客戶端瀏覽器的內存、CPU 等的消耗,檢測出前端 JS 內存泄漏等問題並修復,提升前端頁面的穩定性。)