一、背景
事實表通常存儲在kafka中,維表通常存儲在外部設備中(比如MySQL,HBase)。對於每條流式數據,可以關聯一個外部維表數據源,為實時計算提供數據關聯查詢。維表可能是會不斷變化的,在維表JOIN時,需指明這條記錄關聯維表快照的時刻。需要注意是,目前Flink SQL的維表JOIN僅支持對當前時刻維表快照的關聯(處理時間語義),而不支持事實表rowtime所對應的的維表快照。
二、維表Join
預加載維表
將維表全量預加載到內存里去做關聯,具體的實現方式就是我們定義一個類,去實現 RichFlatMapFunction,然后在 open 函數中讀取維度數據庫,再將數據全量的加載到內存,然后在 probe 流上使用算子 ,運行時與內存維度數據做關聯。
這個方案的優點就是實現起來比較簡單,缺點也比較明顯,因為我們要把每個維度數據都加載到內存里面,所以它只支持少量的維度數據。同時如果我們要去更新維表的話,還需要重啟作業,所以它在維度數據的更新方面代價是有點高的,而且會造成一段時間的延遲。對於預加載維表來說,它適用的場景就是小維表,變更頻率訴求不是很高,且對於變更的及時性的要求也比較低的這種場景。
改進:open()新建一個線程定時加載維表,這樣就不需要人工的去重啟 Job 來讓維度數據做更新,可以實現一個周期性的維度數據的更新。
distributed cache
通過 Distributed cash 的機制去分發本地的維度文件到 Task Manager 后再加載到內存做關聯。實現方式可以分為三步:
- 通過 env.registerCached 注冊文件。
- 實現 RichFunction,在 open 函數里面通過 RuntimeContext 來獲取 Cache 文件。
- 解析和使用這部分文件數據。
因為數據要加載到內存中,所以支持的數據量比較小。而且如果維度數據需要更新,也是需要重啟作業的。
那么它適用的場景就是維度數據是文件形式的、數據量比較小、並且更新的頻率也比較低的一些場景,比如說我們讀一個靜態的碼表、配置文件等等。
熱存儲關聯
把維度數據導入到像 Redis、Tair、HBase 這樣的一些熱存儲中,然后通過異步 IO 去查詢,並且疊加使用 Cache 機制,還可以加一些淘汰的機制,最后將維度數據緩存在內存里,來減輕整體對熱存儲的訪問壓力。
如上圖展示的這樣的一個流程。在 Cache 這塊的話,比較推薦谷歌的 Guava Cache,它封裝了一些關於 Cache 的一些異步的交互,還有 Cache 淘汰的一些機制,用起來是比較方便的。
異步 IO 可以並行發出多個請求,整個吞吐是比較高的,延遲會相對低很多。如果使用異步 IO 的話,它對於外部存儲的吞吐量上升以后,會使得外部存儲有比較大的壓力,有時也會成為我們整個數據處理上延遲的瓶頸。所以引入 Cache 機制是希望通過 Cache 來去減少我們對外部存儲的訪問量。
這個方案的優點就是維度數據不用全量加載到內存中,不受限於內存大小。但是需要依賴熱存儲資源,再加上cache過期時間,所以最后結果會有一定的延遲。適用於維度數據量比較大,能接受維度更新有一定延遲的情況。
廣播維表
利用 Broadcast State 將維度數據流廣播到下游 Task 做 Join。
- 將維度數據發送到 Kafka 作為廣播原始流 S1
- 定義狀態描述符 MapStateDescriptor。調用 S1.broadcast(),獲得 broadCastStream S2
- 調用非廣播流 S3.connect(S2),得到 BroadcastConnectedStream S4
- 在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 實現關聯處理邏輯,並作為參數調用 S4.process()
廣播維表維度的變更可以及時的更新到結果,但是數據還是需要保存在內存中,因為它是存在 State 里的,所以支持維表數據量仍然不是很大。適用的場景就是我們需要時時的去感知維度的變更,且維度數據又可以轉化為實時流。
Temporal table function join
首先說明一下什么是 Temporal table?它其實是一個概念:就是能夠返回持續變化表的某一時刻數據內容的視圖,持續變化表也就是 Changingtable,可以是一個實時的 Changelog 的數據,也可以是放在外部存儲上的一個物化的維表。
它的實現是通過 UDTF 去做 probe 流和 Temporal table 的 join,稱之 Temporal table function join。這種 Join 的方式,它適用的場景是維度數據為 Changelog 流的形式,而且我們有需要按時間版本去關聯的訴求。
- 在 Changelog 流上面去定義 TemporalTableFunction,這里面有兩個關鍵的參數是必要的。第1個參數就是能夠幫我們去識別版本信息的一個 Time attribute,第 2 個參數是需要去做關聯的組件。
- 在 tableEnv 里面去注冊 TemporalTableFunction 的名字。
維表Join方案對比
三、雙流Join
批處理有兩種方式處理兩個表的Join,一種是基於排序的Sort-Merge Join,更一種是轉化為Hash Table 加載到內存里做Hash Join。
在雙流Join的場景中,Join的對象是兩個流,數據是不斷進入的,所以我們Join的結果也是需要持續更新的。基本思路是將一個無線的數據流,盡可能拆分成有限數據集去做Join。
- Regular Join
這種 Join 方式需要去保留兩個流的狀態,持續性地保留並且不會去做清除。兩邊的數據對於對方的流都是所有可見的,所以數據就需要持續性的存在 State 里面,那么 State 又不能存的過大,因此這個場景的只適合有界數據流。
- Interval Join
加入了一個時間窗口的限定,要求在兩個流做 Join 的時候,其中一個流必須落在另一個流的時間戳的一定時間范圍內,並且它們的 Join key 相同才能夠完成 Join。加入了時間窗口的限定,就使得我們可以對超出時間范圍的數據做一個清理,這樣的話就不需要去保留全量的 State。
Interval Join 是同時支持 processing time 和 even time去定義時間的。如果使用的是 processing time,Flink 內部會使用系統時間去划分窗口,並且去做相關的 state 清理。如果使用 even time 就會利用 Watermark 的機制去划分窗口,並且做 State 清理。
Window join
將兩個流中有相同 key 和處在相同 window 里的元素去做 Join。它的執行的邏輯比較像 Inner Join,必須同時滿足 Join key 相同,而且在同一個 Window 里元素才能夠在最終結果中輸出。