簡介: 《實時數倉入門訓練營》由阿里雲研究員王峰、阿里雲資深技術專家金曉軍、阿里雲高級產品專家劉一鳴等實時計算 Flink 版和 Hologres 的多名技術/產品一線專家齊上陣,合力搭建此次訓練營的課程體系,精心打磨課程內容,直擊當下同學們所遇到的痛點問題。由淺入深全方位解析實時數倉的架構、場景、以及實操應用,7 門精品課程幫助你 5 天時間從小白成長為大牛!
本文整理自直播《實時計算 Flink 版 SQL 實踐-李麟(海豹)》
視頻鏈接:https://c.tb.cn/F3.0dBssY
內容簡要:
一、實時計算Flink版SQL簡介
二、實時計算Flink版SQL上手示例
三、開發常見問題和解法
實時計算Flink版SQL簡介
(一)關於實時計算Flink版SQL
實時計算Flink版選擇了SQL這種聲明式語言作為頂層API,比較穩定,也方便用戶使用。Flink SQL具備流批統一的特性,給用戶統一的開發體驗,並且語義一致。另外,Flink SQL能夠自動優化,包括屏蔽流計算里面State的復雜性,也提供了自動優化的Plan,並且還集成了AutoPilot自動調優的功能。Flink SQL的應用場景也比較廣泛,包括數據集成、實時報表、實時風控,還有在線機器學習等場景。
(二)基本操作
在基本操作上,可以看到SQL的語法和標准SQL非常類似。示例中包括了基本的SELECT、FILTER操作。,可以使用內置函數,如日期的格式化,也可以使用自定義函數,比如示例中的匯率轉換就是一個用戶自定義函數,在平台上注冊后就可以直接使用。
(三)維表 Lookup Join
在實際的數據處理過程中,維表的Lookup Join也是一個比較常見的例子。
這里展示的是一個維表INNER JOIN示例。
例子中顯示的SOURCE表是一個實時變化的訂單信息表,它通過INNER JOIN去關聯維表信息,這里標黃高亮的就是維表JOIN的語法,可以看到它和傳統的批處理有一個寫法上的差異,多了FOR SYSTEM_TIME AS OF這個子句來標明它是一個維表JOIN的操作。SOURCE表每來一條訂單消息,它都會觸發維表算子,去做一次對維表信息的查詢,所以把它叫做一個Lookup Join。
(四)Window Aggregation
Window Aggregation(窗口聚合)操作也是常見的操作,Flink SQL中內置支持了幾種常用的Window類型,比如Tumble Window,Session Window,Hop Window,還有新引入的Cumulate Window。
Tumble
Tumble Window可以理解成固定大小的時間窗口,也叫滾窗,比如說5分鍾、10分鍾或者1個小時的固定間隔的窗口,窗口之間沒有重疊。
Session
Session Window(會話窗口) 定義了一個連續事件的范圍,窗口定義中的一個參數叫做Session Gap,表示兩條數據的間隔如果超過定義的時長,那么前一個Window就結束了,同時生成了一個新的窗口。
Hop
Hop Window不同於滾動窗口的窗口不重疊,滑動窗口的窗口之間可以重疊。滑動窗口有兩個參數:size 和 slide。size 為窗口的大小,slide 為每次滑動的步長。如果slide < size,則窗口會重疊,同一條數據可能會被分配到多個窗口;如果 slide = size,則等同於 Tumble Window。如果 slide > size,窗口之間沒有重疊且有間隙。
Cumulate
Cumulate Window(累積窗口),是Flink社區1.13版本里新引入的,可以對比 Hop Window來理解,區別是從Window Start開始不斷去累積。示例中Window 1、Window 2、Window 3是在不斷地增長的。它有一個最大的窗口長度,比如我們定義Window Size是一天,然后Step步長是1個小時,那么它會在一天中的每個小時產生累積到當前小時的聚合結果。
看一個具體的Window聚合處理示例。
如上圖所示,比如說需要進行每5分鍾單個用戶的點擊數統計。
源數據是用戶的點擊日志,我們期望算出每5分鍾單個用戶的點擊總數, SQL 中使用的是社區最新的 WindowTVF語法,先對源表開窗,再 GROUP BY 窗口對應的屬性 window_start和window_end, COUNT(*)就是點擊數統計。
可以看到,當處理12:00到12:04的數據,有2個用戶產生了4次點擊,分別能統計出來用戶Mary是3次,Bob是1次。在接下來一批數據里面,又來了3條數據,對應地更新到下一個窗口中,分別是1次和2次。
(五)Group Aggregation
相對於Window Aggregation來說,Group Aggregation直接觸發計算,並不需要等到窗口結束,適用的一個場景是計算累積值。
上圖的例子是單個用戶累積到當前的點擊數統計。從Query上看,寫法相對簡單一點,直接 GROUP BY user 去計算COUNT(*),就是累積計數。
可以看到,在結果上和Window的輸出是有差異的,在與Window相同的前4條輸入數據,Group Aggregation輸出的結果是Mary的點擊數已更新到3次,具體的計算過程可能是從1變成2再變成3,Bob是1次,隨着后面3條數據的輸入,Bob對應的點擊數又會更新成2次,對結果是持續更新的過程,這和Window的計算場景是有一些區別的。
之前Window窗口里面輸出的數據,在窗口結束后結果就不會再改變,而在Group Aggregation里,同一個Group Key的結果是會產生持續更新的。
(六)Window Aggregation Vs Group Aggregation
更全面地對比一下Window和Group Aggregation的一些區別。
Window Aggregation在輸出模式上是按時輸出,是在定義的數據到期之后它才會輸出。比如定義5分鍾的窗口,結果是延遲輸出的,比如00:00~00:05這個時間段,它會等整個窗口數據都到齊之后,才完整輸出出來,並且結果只輸出一次,不會再改變。
Group Aggregation是數據觸發,比如第一條數據來它就會輸出結果,同一個Key 的第二條數據來結果會更新,所以在輸出流的性質上兩者也是不一樣的。Window Aggregation一般情況下輸出的是Append Stream,而在Group Aggregation輸出的是Update Stream。
在狀態State處理上兩者的差異也比較大。Window Aggregation會自動清理過期數據,用戶就不需要額外再去關注 State的膨脹情況。Group Aggregation是基於無限的狀態去做累積,所以需要用戶根據自己的計算場景來定義State的TTL,就是State保存多久。
比如統計一天內累計的PV和UV,不考慮數據延遲的情況,也至少要保證State的TTL要大於等於一天,這樣才能保證計算的精確性。如果State的TTL定義成半天,統計值就可能不准確了。
對輸出的存儲要求也是由輸出流的性質來決定的。在Window的輸出上,因為它是Append流,所有的類型都是可以對接輸出的。而Group Aggregatio輸出了更新流,所以要求目標存儲支持更新,可以用Hologres、MySQL或者HBase這些支持更新的存儲。
實時計算 Flink 版SQL上手示例
下面通過具體的例子來看每一種SQL操作在真實的業務場景中會怎么使用,比如SQL基本的語法操作,包括一些常見的Aggregation的使用。
(一)示例場景說明:電商交易數據 - 實時數倉場景
這里的例子是電商交易數據場景,模擬了實時數倉里分層數據處理的情況。
在數據接入層,我們模擬了電商的交易訂單數據,它包括了訂單ID,商品ID,用戶ID,交易金額,商品的葉子類目,交易時間等基本信息,這是一個簡化的表。
示例1會從接入層到數據明細層,完成一個數據清洗工作,此外還會做類目信息的關聯,然后數據的匯總層我們會演示怎么完成分鍾級的成交統計、小時級口徑怎么做實時成交統計,最后會介紹下在天級累積的成交場景上,怎么去做准實時統計。
- 示例環境:內測版
演示環境是目前內測版的實時計算Flink產品,在這個平台可以直接做一站式的作業開發,包括調試,還有線上的運維工作。
- 接入層數據
使用 SQL DataGen Connector 生成模擬電商交易數據。
接入層數據:為了方便演示,簡化了鏈路,用內置的SQL DataGen Connector來模擬電商數據的產生。
這里面order_id是設計了一個自增序列,Connector的參數沒有完整貼出來。 DataGen Connector支持幾種生成模式,比如可以用Sequence產生自增序列,Random模式可以模擬隨機值,這里根據不同的字段業務含義,選擇了不同的生成策略。
比如order_id是自增的,商品ID是隨機選取了1~10萬,用戶ID是1~1000萬,交易金額用分做單位, cate_id是葉子類目ID,這里共模擬100個葉子類目,直接通過計算列對商品ID取余來生成,訂單創建時間使用當前時間模擬,這樣就可以在開發平台上調試,而不需要去創建Kafka或者DataHub做接入層的模擬。
(二)示例1-1 數據清洗
- 電商交易數據-訂單過濾
這是一個數據清洗的場景,比如需要完成業務上的訂單過濾,業務方可能會對交易金額有最大最小的異常過濾,比如要大於1元,小於1萬才保留為有效數據。
交易的創建時間是選取某個時刻之后的,通過WHERE條件組合過濾,就可以完成這個邏輯。
真實的業務場景可能會復雜很多,下面來看下SQL如何運行。
這是使用調試模式,在平台上點擊運行按鈕進行本地調試,可以看到金額這一列被過濾,訂單創建時間也都是大於要求的時間值。
從這個簡單的清洗場景可以看到,實時和傳統的批處理相比,在寫法上包括輸出結果差異並不大,流作業主要的差異是運行起來之后是長周期保持運行的,而不像傳統批處理,處理完數據之后就結束了。
(三)示例1-2 類目信息關聯
接下來看一下怎么做維表關聯。
根據剛才接入層的訂單數據,因為原始數據里面是葉子類目信息,在業務上需要關聯類目的維度表,維度表里面記錄了葉子類目到一級類目的關聯關系,ID和名稱,清洗過程需要完成的目標是用原始表里面葉子類目ID去關聯維表,補齊一級類目的ID和Name。這里通過INNER JOIN維表的寫法,關聯之后把維表對應的字段選出來。
和批處理的寫法差異僅僅在於維表的特殊語法FOR SYSTEM_TIME AS OF。
如上所示,平台上可以上傳自己的數據用於調試,比如這里使用了1個CSV的測試數據,把100個葉子類目映射到10個一級類目上。
對應葉子類目ID的個位數就是它一級類目的ID,會關聯到對應的一級類目信息,返回它的名稱。本地調試運行優點是速度比較快,可以即時看到結果。在本地調試模式中,終端收到1000條數據之后,會自動暫停,防止結果過大而影響使用。
(四)示例2-1 分鍾級成交統計
接下來我們來看一下基於Window的統計。
第一個場景是分鍾級成交統計,這是在匯總層比較常用的計算邏輯。
分鍾級統計很容易想到Tumble Window,每一分鍾都是各算各的,需要計算幾個指標,包括總訂單數、總金額、成交商品數、成交用戶數等。成交的商品數和用戶數要做去重,所以在寫法上做了一個Distinct處理。
窗口是剛剛介紹過的Tumble Window,按照訂單創建時間去划一分鍾的窗口,然后按一級類目的維度統計每一分鍾的成交情況。
- 運行模式
上圖和剛才的調試模式有點區別,上線之后就真正提交到集群里去運行一個作業,它的輸出采用了調試輸出,直接Print到Log里。展開作業拓撲,可以看到自動開啟了Local-Global的兩階段優化。
- 運行日志 - 查看調試輸出結果
在運行一段時間之后,通過Task里面的日志可以看到最終的輸出結果。
用的是Print Sink,會直接打到Log里面。在真實場景的輸出上,比如寫到Hologres/MySQL,那就需要去對應存儲的數據庫上查看。
可以看到,輸出的數據相對於數據的原始時間是存在一定滯后的。
在19:46:05的時候,輸出了19:45:00這一個窗口的數據,延遲了5秒鍾左右輸出前1分鍾的聚合結果。
這5秒鍾實際上和定義源表時WATERMARK的設定是有關系的,在聲明WATERMARK時是相對gmt_create字段加了5秒的offset。這樣起到的效果是,當到達的最早數據是 19:46:00 時,我們認為水位線是到了19:45:55,這就是5秒的延遲效果,來實現對亂序數據的寬容處理。
(五)示例2-2 小時級實時成交統計
第二個例子是做小時級實時成交統計。
如上圖所示,當要求實時統計,直接把Tumble Window開成1小時Size的Tumble Window,這樣能滿足實時性嗎?按照剛才展示的輸出結果,具有一定的延遲效果。因此開一個小時的窗口,必須等到這一個小時的數據都收到之后,在下一個小時的開始,才能輸出上一個小時的結果,延遲在小時級別的,滿足不了實時性的要求。回顧之前介紹的 Group Aggregation 是可以滿足實時要求的。
具體來看,比如需要完成小時+類目以及只算小時的兩個口徑統計,兩個統計一起做,在傳統批處理中常用的GROUPING SETS功能,在實時Flink上也是支持的。
我們可以直接GROUP BY GROUPING SETS,第一個是小時全口徑,第二個是類目+小時的統計口徑,然后計算它的訂單數,包括總金額,去重的商品數和用戶數。
這種寫法對結果加了空值轉換處理便於查看數據,就是對小時全口徑的統計,輸出的一級類目是空的,需要對它做一個空值轉換處理。
上方為調試模式的運行過程,可以看到Datagen生成的數據實時更新到一級類目和它對應的小時上。
這里可以看到,兩個不同GROUP BY的結果在一起輸出,中間有一列ALL是通過空值轉換來的,這就是全口徑的統計值。本地調試相對來說比較直觀和方便,有興趣的話也可以到阿里雲官網申請或購買進行體驗。
(六)示例2-3 天級累積成交准實時統計
第三個示例是天級累計成交統計,業務要求是准實時,比如說能夠接受分鍾級的更新延遲。
按照剛才Group Aggregation小時的實時統計,容易聯想到直接把Query改成天維度,就可以實現這個需求,而且實時性比較高,數據觸發之后可以達到秒級的更新。
回顧下之前提到的Window和Group Aggregation對於內置狀態處理上的區別,Window Aggregation可以實現State的自動清理,Group Aggregation需要用戶自己去調整 TTL。由於業務上是准實時的要求,在這里可以有一個替代的方案,比如用新引入的Cumulate Window做累積的Window計算,天級的累積然后使用分鍾級的步長,可以實現每分鍾更新的准實時要求。
回顧一下Cumulate Window,如上所示。天級累積的話,Window的最大Size是到天,它的Window Step就是一分鍾,這樣就可以表達天級的累積統計。
具體的Query如上,這里使用新的TVF語法,通過一個TABLE關鍵字把Windows的定義包含在中間,然后 Cumulate Window引用輸入表,接着定義它的時間屬性,步長和size 參數。GROUP BY就是普通寫法,因為它有提前輸出,所以我們把窗口的開始時間和結束時間一起打印出來。
這個例子也通過線上運行的方式去看Log輸出。
- 運行模式
可以看到,它和之前Tumble Window運行的結構類似,也是預聚合加上全局聚合,它和Tumble Window的區別就是並不需要等到這一天數據都到齊了才輸出結果。
- 運行日志 – 觀察調試結果
從上方示例可以看到,在20:47:00的時候,已經有00:00:00到20:47:00的結果累積,還有對應的4列統計值。下一個輸出就是接下來的累計窗口,可以看到20:47:00到20:48:00就是一個累計的步長,這樣既滿足了天級別的累計統計需求,也能夠滿足准實時的要求。
(七)示例小結:電商交易數據-實時數倉場景
然后我們來整體總結一下以上的示例。
在接入層到明細層的清洗處理特點是相對簡單,也比較明確,比如業務邏輯上需要做固定的過濾條件,包括維度的擴展,這都是非常明確和直接的。
從明細層到匯總層,例子中的分鍾級統計,我們是用了Tumble Window,而小時級因為實時性的要求,換成了Group Aggregation,然后到天級累積分別展示Group Aggregation和新引入的Cumulate Window。
從匯總層的計算特點來說,我們需要去關注業務上的實時性要求和數據准確性要求,然后根據實際情況選擇Window聚合或者Group 聚合。
這里為什么要提到數據准確性?
在一開始比較Window Aggregation和Group Aggregation的時候,提到Group Aggregation的實時性非常好,但是它的數據准確性是依賴於State的TTL,當統計的周期大於TTL,那么TTL的數據可能會失真。
相反,在Window Aggregation上,對亂序的容忍度有一個上限,比如最多接受等一分鍾,但在實際的業務數據中,可能99%的數據能滿足這樣的要求,還有1%的數據可能需要一個小時后才來。基於WATERMARK的處理,默認它就是一個丟棄策略,超過了最大的offset的這些數據就會被丟棄,不納入統計,此時數據也會失去它的准確性,所以這是一個相對的指標,需要根據具體的業務場景做選擇。
開發常見問題和解法
(一)開發中的常見問題
上方是實時計算真實業務接觸過程中比較高頻的問題。
首先是實時計算不知道該如何下手,怎么開始做實時計算,比如有些同學有批處理的背景,然后剛開始接觸Flink SQL,不知道從哪開始。
另外一類問題是SQL寫完了,也清楚輸入處理的數據量大概是什么級別,但是不知道實時作業運行起來之后需要設定多大的資源
還有一類是SQL寫得比較復雜,這個時候要去做調試,比如要查為什么計算出的數據不符合預期等類似問題,許多同學反映無從下手。
作業跑起來之后如何調優,這也是一個非常高頻的問題。
(二)開發常見問題解法
1.實時計算如何下手?
對於上手的問題,社區有很多官方的文檔,也提供了一些示例,大家可以從簡單的例子上手,慢慢了解SQL里面不同的算子,在流式計算的時候會有一些什么樣的特性。
此外,還可以關注開發者社區實時計算 Flink 版、 ververica.cn網站、 B 站的Apache Flink 公眾號等分享內容。
逐漸熟悉了SQL之后,如果想應用到生產環境中去解決真實的業務問題,阿里雲的行業解決方案里也提供了一些典型的架構設計,可以作為參考。
2.復雜作業如何調試?
如果遇到千行級別的復雜SQL,即使對於Flink的開發同學來也不能一目了然地把問題定位出來,其實還是需要遵循由簡到繁的過程,可能需要借助一些調試的工具,比如前面演示的平台調試功能,然后做分段的驗證,把小段SQL局部的結果正確性調試完之后,再一步一步組裝起來,最終讓這個復雜作業能達到正確性的要求。
另外,可以利用SQL語法上的特性,把SQL組織得更加清晰一點。實時計算Flink產品上有一個代碼結構功能,可以比較方便地定位長SQL里具體的語句,這都是一些輔助工具。
3.作業初始資源設置,如何調優?
我們有一個經驗是根據輸入的數據,初始做小並發測試一下,看它的性能如何,然后再去估算。在大並發壓測的時候,按照需求的吞吐量,逐步逼近,然后拿到預期的性能配置,這個是比較直接但也比較可靠的方式。
調優這一塊主要是借助於作業的運行是情況,我們會去關注一些重點指標,比如說有沒有產生數據的傾斜,維表的Lookup Join需要訪問外部存儲,有沒有產生IO的瓶頸,這都是影響作業性能的常見瓶頸點,需要加以關注。
在實時計算Flink產品上集成了一個叫AutoPilot的功能,可以理解為類似於自動駕駛,在這種功能下,初始資源設多少就不是一個麻煩問題了。
在產品上,設定作業最大的資源限制后,根據實際的數據處理量,該用多少資源可以由引擎自動幫我們去調到最優狀態,根據負載情況來做伸縮。