阿里雲實時計算負責人 - 王峰(莫問)/ FFA_2020-Flink as a Unified Engine - Now and Next-V4
2020年Flink
基於Flink 的流批一體數倉
基於Flink流批一體數據湖架構
PyFlink
Flink Native on K8S
雙鏈路數據實時化
Flag:
- 流批一體Source API、Sink API
- K8S Native HA
- Sql CDC 原生支持
- 開源實時數倉:ClickHouse
- 阿里:Hologres
美團
數據集成
組件對比
Flag:
- HIDI、Hudi、Iceberg
- Doris
有贊
Flink on K8S VS Yarn 優勢
Flink on K8S 痛點問題
Flag:
- Native on K8S
知乎
技術選型
Flag:
- ETL
- TiDB
B站
生態場景
Flag:
- ETL
- ClickHouse
- AIFlow
阿里雲開放平台
Flink on Zeppelin
作業調度
Flag:
- Flink + Zepplin + Airflow
阿里:Flink在線機器學習
架構
開源
Flag :
- Flink AI Flow 開源
湖倉一體 - 融合趨勢下基於 Flink Kylin Hudi 湖倉一體的大數據生態體系
Multi-Cluster, Shared-Data架構
Multi-Cluster, Shared-Data架構2
失敗容錯
貝殼
數倉架構
Flag:
- clickhouse
- doris
順豐
實時數倉建設思路
Hudi 關鍵特性
加速寬表
實時數倉寬表
Flag:
- Hudi 加速寬表
- clickhouse
騰訊實時數倉
痛點總結
實時數倉建設的需求
Apache Iceberg 是什么
Apache Iceberg 的能力
實時數倉-數據湖分析系統
實時數倉-數據湖分析系統
Flag:
- Iceberg
- clickhouse
騰訊看點基於Flink構建萬億數據量下的實時數倉及實時查詢系統
實時數倉
基於 Flink SQL 構建流批一體的 ETL 數據集成
傳統數倉
-
傳統的數據倉庫,實時和離線數倉是比較割裂的兩套鏈路,比如實時鏈路通過 Flume和 Canal 實時同步日志和數據庫數據到 Kafka 中,然后在 Kafka 中做數據清理和打寬。
-
離線鏈路通過 Flume 和 Sqoop 定期同步日志和數據庫數據到 HDFS 和 Hive。然后在 Hive 里做數據清理和打寬。
-
這里我們主要關注的是數倉的前半段的構建,也就是到 ODS、DWD 層,我們把這一塊看成是廣義的 ETL 數據集成的范圍。
-
那么在這一塊,傳統的架構主要存在的問題就是這種割裂的數倉搭建這會造成很多重復工作,重復的資源消耗,並且實時、離線底層數據模型不一致,會導致數據一致性和質量難以保障。
-
同時兩個鏈路的數據是孤立的,數據沒有實現打通和共享。
流批一體的 ETL 數據集成
基於 Flink SQL 我們現在可以方便地構建流批一體的 ETL 數據集成,與傳統數倉架構的核心區別主要是這幾點:
- Flink SQL 原生支持了 CDC 所以現在可以方便地同步數據庫數據,不管是直連數據庫,還是對接常見的 CDC工具。
- Flink SQL 在最近的版本中持續強化了維表 join 的能力,不僅可以實時關聯數據庫中的維表數據,現在還能關聯 Hive 和 Kafka 中的維表數據,能靈活滿足不同工作負載和時效性的需求。
- 基於 Flink 強大的流式 ETL 的能力,我們可以統一在實時層做數據接入和數據轉換,然后將明細層的數據回流到離線數倉中。
- 現在 Flink 流式寫入 Hive,已經支持了自動合並小文件的功能,解決了小文件的痛苦。
所以基於流批一體的架構,我們能獲得的收益:
- 統一了基礎公共數據
- 保障了流批結果的一致性
- 提升了離線數倉的時效性
- 減少了組件和鏈路的維護成本
CDC Connector
數據入 OLAP
流式數據入湖(Iceberg)
Flink 數據集成能力矩陣
Flag:
- 打寬
- clickhouse
參考總結及規划:
1、以實時數倉、K8S、PyFlink 等場景的分布;
2、Flink+ClickHouse
MySQL CDC connector 非常受用戶的歡迎,尤其是結合 OLAP 引擎,可以快速構建實時 OLAP 架構。實時 OLAP 架構的一個特點就是將數據庫數據同步到 OLAP 中做即席查詢,這樣就無需離線數倉了。
-
現在很多公司都在用 Flink+ClickHouse 來快速構建實時 OLAP 架構。
-
我們只需要在 Flink 中定義一個 mysql-cdc source,一個 ClickHouse sink,然后提交一個 insert into query 就完成了從 MySQL 到 ClickHouse 的實時同步工作,非常方便。
-
而且,ClickHouse 有一個痛點就是 join 比較慢,所以一般我們會把 MySQL 數據打成一張大的明細寬表數據,再寫入 ClickHouse。這個在 Flink 中一個 join 操作就完成了。而在 Flink 提供 MySQL CDC connector 之前,要在全量+增量的實時同步過程中做 join 是非常麻煩的。
- Flag :Sqlserver-cdc source?
3、實時數倉相關技術組件對比
4、流批一體的ETL打寬:數據打寬是數據集成中最為常見的業務加工場景,數據打寬最主要的手段就是 Join,Flink SQL 提供了豐富的 Join 支持,包括 Regular Join、Interval Join、Temporal Join。
Regular Join(雙流關聯) 就是大家熟知的雙流 Join,語法上就是普通的 JOIN 語法。如下圖:
-
圖中案例是通過廣告曝光流關聯廣告點擊流將廣告數據打寬,打寬后可以進一步計算廣告費用。
-
從圖中可以看出,曝光流和點擊流都會存入 join 節點的 state,join 算子通過關聯曝光流和點擊流的 state 實現數據打寬。
-
Regular Join 的特點是,任意一側流都會觸發結果的更新,比如案例中的曝光流和點擊流。同時 Regular Join 的語法與傳統批 SQL 一致,用戶學習門檻低。
-
但需要注意的是,Regular join 通過 state 來存儲雙流已經到達的數據,state 默認永久保留,所以 Regular join 的一個問題是默認情況下 state 會持續增長,一般我們會結合 state TTL 使用。
- Flag:
Interval Join(區間關聯) 是一條流上需要有時間區間的 join,比如剛剛的廣告計費案例中,它有一個非常典型的業務特點在里面,就是點擊一般發生在曝光之后的 10 分鍾內。
-
此相對於 Regular Join,我們其實只需要關聯這10分鍾內的曝光數據,所以 state 不用存儲全量的曝光數據,它是在 Regular Join 之上的一種優化。
-
要轉成一個 Interval Join,需要在兩個流上都定義時間屬性字段(如圖中的 click_time 和 show_time)。
-
並在 join 條件中定義左右流的時間區間,比如這里我們增加了一個條件:點擊時間需要大於等於曝光時間,同時小於等於曝光后 10 分鍾。
-
與 Regular Join 相同, Interval Join 任意一條流都會觸發結果更新,但相比 Regular Join,Interval Join 最大的優點是 state 可以自動清理,根據時間區間保留數據,state 占用大幅減少。
-
Interval Join 適用於業務有明確的時間區間,比如曝光流關聯點擊流,點擊流關聯下單流,下單流關聯成交流。
Temporal join (時態表關聯) 是最常用的數據打寬方式,它常用來做我們熟知的維表 Join。Flink 支持非常豐富的 Temporal join 功能,包括關聯 lookup DB,關聯 changelog,關聯 Hive 表。在以前,大家熟知的維表 join 一般都是關聯一個可以查詢的數據庫,因為維度數據在數據庫里面,但實際上維度數據可能有多種物理形態,比如 binlog 形式,或者定期同步到 Hive 中變成了 Hive 分區表的形式。在 Flink 1.12 中,現在已經支持關聯這兩種新的維表形態。
-
Temporal Join Lookup DB 是最常見的維表 Join 方式,比如在用戶點擊流關聯用戶畫像的案例中,用戶點擊流在 Kafka 中,用戶實時畫像存放在 HBase 數據庫中,每個點擊事件通過查詢並關聯 HBase 中的用戶實時畫像完成數據打寬。
-
Temporal Join Lookup DB 的特點是,維表的更新不會觸發結果的更新,維度數據存放在數據庫中,適用於實時性要求較高的場景,使用時我們一般會開啟 Async IO 和內存 cache 提升查詢效率。
再看一個 Lookup DB 的例子,這是一個直播互動數據關聯直播間維度的案例。
-
案例中直播互動數據(比如點贊、評論)存放在 Kafka 中,直播間實時的維度數據(比如主播、直播間標題)存放在 MySQL 中,直播互動的數據量是非常大的,為了加速訪問,常用的方案是加個高速緩存,比如把直播間的維度數據通過 CDC 同步,再存入 Redis 中,再做維表關聯。
- 這種方案的問題是,直播的業務數據比較特殊,直播間的創建和直播互動數據基本是同時產生的,因此互動數據可能早早地到達了 Kafka 被 Flink 消費,但是直播間的創建消息經過了 Canal, Kafka,Redis, 這個鏈路比較長,數據延遲比較大,可能導致互動數據查詢 Redis 時,直播間數據還未同步完成,導致關聯不上直播間數據,造成下游統計分析的偏差。
針對這類場景,Flink 1.12 支持了 Temporal Join Changelog,通過從 changelog在 Flink state 中物化出維表來實現維表關聯。
-
剛剛的場景有了更簡潔的解決方案,我們可以通過 Flink CDC connector 把直播間數據庫表的 changelog 同步到 Kafka 中,注意我們看下右邊這段 SQL,我們用了 upsert-kafka connector 來將 MySQL binlog 寫入了 Kafka,也就是 Kafka 中存放了直播間變更數據的 upsert 流。然后我們將互動數據 temporal join 這個直播間 upsert 流,便實現了直播數據打寬的功能。
- 注意我們這里 FOR SYSTEM_TIME AS OF 不是跟一個 processing time,而是左流的 event time,它的含義是去關聯這個 event time 時刻的直播間數據,同時我們在直播間 upsert 流上也定義了 watermark,所以 temporal join changelog 在執行上會做 watermark 等待和對齊,保證關聯上精確版本的結果,從而解決先前方案中關聯不上的問題。
我們詳細解釋下 temporal join changelog 的過程
-
左流是互動流數據,右流是直播間 changelog。
-
直播間 changelog 會物化到右流的維表 state 中,state 相當於一個多版本的數據庫鏡像, 主流互動數據會暫時緩存在左流的 state 中,等到 watermark 到達對齊后再去查維表 state 中的數據。
-
比如現在互動流和直播流的 watermark 都到了10:01分,互動流的這條 10:01 分評論數據就會去查詢維表 state,並關聯上 103 房間的信息。當 10:05 這條評論數據到來時,它不會馬上輸出,不然就會關聯上空的房間信息。它會一直等待,等到左右兩流的 watermark 都到 10:05 后,才會去關聯維表 state 中的數據並輸出。這個時候,它能關聯上准確的 104 房間信息。
- 總結下,Temporal Join Changelog 的特點是實時性高,因為是按照 event time 做的版本關聯,所以能關聯上精確版本的信息,且維表會做 watermark 對齊等待,使得用戶可以通過 watermark 控制遲到的維表數。Temporal Join Changelog 中的維表數據都是存放在 temporal join 節點的 state 中,讀取非常高效,就像是一個本地的 Redis 一樣,用戶不再需要維護額外的 Redis 組件。
在數倉場景中,Hive 的使用是非常廣泛的,Flink 與 Hive 的集成非常友好,現在已經支持 Temporal Join Hive 分區表和非分區表。
-
我們舉個典型的關聯 Hive 分區表的案例:訂單流關聯店鋪數據。店鋪數據一般是變化比較緩慢的,所以業務方一般會按天全量同步店鋪表到 Hive 分區中,每天會產生一個新分區,每個分區是當天全量的店鋪數據。
- 為了關聯這種 Hive 數據,只需我們在創建 Hive 分區表時指定右側這兩個紅圈中的參數,便能實現自動關聯 Hive 最新分區功能,partition.include = latest 表示只讀取 Hive 最新分區,partition-name 表示選擇最新分區時按分區名的字母序排序。
- 到 10 月 3 號的時候,Hive 中已經產生了 10 月 2 號的新分區, Flink 監控到新分區后,就會重新加載10月2號的數據到 cache 中並替換掉10月1號的數據作為最新的維表。之后的訂單流數據關聯上的都是 cache 10 月 2 號分區的數據。
- Temporal join Hive 的特點是可以自動關聯 Hive 最新分區,適用於維表緩慢更新,高吞吐的業務場景。
總結一下我們剛剛介紹的幾種在數據打寬中使用的 join:
- Regular Join(雙流Join) 的實效性非常高,吞吐一般,因為 state 會保留所有到達的數據,適用於雙流關聯場景;
- Interval Jon(區間Join) 的時效性非常好,吞吐較好,因為 state 只保留時間區間內的數據,適用於有業務時間區間的雙流關聯場景;
- Temporal Join Lookup DB 的時效性比較好,吞吐較差,因為每條數據都需要查詢外部系統,會有 IO 開銷,適用於維表在數據庫中的場景;
- Temporal Join Changelog 的時效性很好,吞吐也比較好,因為它沒有 IO 開銷,適用於需要維表等待,或者關聯准確版本的場景;
- Temporal Join Hive 的時效性一般,但吞吐非常好,因為維表的數據存放在cache 中,適用於維表緩慢更新的場景,高吞吐的場景。
5、Multi-Cluster, Shared-Data架構
6、Native K8S HA / vs Yarn
- 統一運維 公司統一化運維,有專門的部門運維 K8S
- CPU 隔離 K8S Pod 之間 CPU 隔離,實時任務不相互影響,更加穩定
-
存儲計算分離:Flink 計算資源和狀態存儲分離,計算資源能夠和其他組件資源進行混部,提升機器使用率
- 彈性擴縮容 大促期間能夠彈性擴縮容,更好的節省人力和物力成
7、Flink 交互
- NoteBook
- Zeppline
8、AI
- Flink AI Flow : https://github.com/alibaba/flink-ai-extended
- analytics-zoo: https://github.com/intel-analytics/analytics-zoo
Flag:
- 對照一下分鍾曲線的打寬過程?
- 對照業務 能梳理出一個 lookup DB (HBase、Redis)的場景?
- 對照現在有的系統,大部分場景適合於用 Join Hive?
參考資料