Flink 維表Join/雙流Join 方法總結


一、背景

事實表通常存儲在kafka中,維表通常存儲在外部設備中(比如MySQL,HBase)。對於每條流式數據,可以關聯一個外部維表數據源,為實時計算提供數據關聯查詢。維表可能是會不斷變化的,在維表JOIN時,需指明這條記錄關聯維表快照的時刻。需要注意是,目前Flink SQL的維表JOIN僅支持對當前時刻維表快照的關聯(處理時間語義),而不支持事實表rowtime所對應的的維表快照。

二、維表Join

預加載維表

將維表全量預加載到內存里去做關聯,具體的實現方式就是我們定義一個類,去實現 RichFlatMapFunction,然后在 open 函數中讀取維度數據庫,再將數據全量的加載到內存,然后在 probe 流上使用算子 ,運行時與內存維度數據做關聯。

這個方案的優點就是實現起來比較簡單,缺點也比較明顯,因為我們要把每個維度數據都加載到內存里面,所以它只支持少量的維度數據。同時如果我們要去更新維表的話,還需要重啟作業,所以它在維度數據的更新方面代價是有點高的,而且會造成一段時間的延遲。對於預加載維表來說,它適用的場景就是小維表,變更頻率訴求不是很高,且對於變更的及時性的要求也比較低的這種場景。

改進:open()新建一個線程定時加載維表,這樣就不需要人工的去重啟 Job 來讓維度數據做更新,可以實現一個周期性的維度數據的更新。

distributed cache

通過 Distributed cash 的機制去分發本地的維度文件到 Task Manager 后再加載到內存做關聯。實現方式可以分為三步:

  • 通過 env.registerCached 注冊文件。
  • 實現 RichFunction,在 open 函數里面通過 RuntimeContext 來獲取 Cache 文件。
  • 解析和使用這部分文件數據。

因為數據要加載到內存中,所以支持的數據量比較小。而且如果維度數據需要更新,也是需要重啟作業的。

那么它適用的場景就是維度數據是文件形式的、數據量比較小、並且更新的頻率也比較低的一些場景,比如說我們讀一個靜態的碼表、配置文件等等。

熱存儲關聯

把維度數據導入到像 Redis、Tair、HBase 這樣的一些熱存儲中,然后通過異步 IO 去查詢,並且疊加使用 Cache 機制,還可以加一些淘汰的機制,最后將維度數據緩存在內存里,來減輕整體對熱存儲的訪問壓力。

如上圖展示的這樣的一個流程。在 Cache 這塊的話,比較推薦谷歌的 Guava Cache,它封裝了一些關於 Cache 的一些異步的交互,還有 Cache 淘汰的一些機制,用起來是比較方便的。

異步 IO 可以並行發出多個請求,整個吞吐是比較高的,延遲會相對低很多。如果使用異步 IO 的話,它對於外部存儲的吞吐量上升以后,會使得外部存儲有比較大的壓力,有時也會成為我們整個數據處理上延遲的瓶頸。所以引入 Cache 機制是希望通過 Cache 來去減少我們對外部存儲的訪問量。

這個方案的優點就是維度數據不用全量加載到內存中,不受限於內存大小。但是需要依賴熱存儲資源,再加上cache過期時間,所以最后結果會有一定的延遲。適用於維度數據量比較大,能接受維度更新有一定延遲的情況。

廣播維表

利用 Broadcast State 將維度數據流廣播到下游 Task 做 Join。

  1. 將維度數據發送到 Kafka 作為廣播原始流 S1
  2. 定義狀態描述符 MapStateDescriptor。調用 S1.broadcast(),獲得 broadCastStream S2
  3. 調用非廣播流 S3.connect(S2),得到 BroadcastConnectedStream S4
  4. 在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 實現關聯處理邏輯,並作為參數調用 S4.process()

廣播維表維度的變更可以及時的更新到結果,但是數據還是需要保存在內存中,因為它是存在 State 里的,所以支持維表數據量仍然不是很大。適用的場景就是我們需要時時的去感知維度的變更,且維度數據又可以轉化為實時流。

Temporal table function join

首先說明一下什么是 Temporal table?它其實是一個概念:就是能夠返回持續變化表的某一時刻數據內容的視圖,持續變化表也就是 Changingtable,可以是一個實時的 Changelog 的數據,也可以是放在外部存儲上的一個物化的維表。

它的實現是通過 UDTF 去做 probe 流和 Temporal table 的 join,稱之 Temporal table function join。這種 Join 的方式,它適用的場景是維度數據為 Changelog 流的形式,而且我們有需要按時間版本去關聯的訴求。

  1. 在 Changelog 流上面去定義 TemporalTableFunction,這里面有兩個關鍵的參數是必要的。第1個參數就是能夠幫我們去識別版本信息的一個 Time attribute,第 2 個參數是需要去做關聯的組件。
  2. 在 tableEnv 里面去注冊 TemporalTableFunction 的名字。

維表Join方案對比

三、雙流Join

批處理有兩種方式處理兩個表的Join,一種是基於排序的Sort-Merge Join,更一種是轉化為Hash Table 加載到內存里做Hash Join。

在雙流Join的場景中,Join的對象是兩個流,數據是不斷進入的,所以我們Join的結果也是需要持續更新的。基本思路是將一個無線的數據流,盡可能拆分成有限數據集去做Join。

  • Regular Join

這種 Join 方式需要去保留兩個流的狀態,持續性地保留並且不會去做清除。兩邊的數據對於對方的流都是所有可見的,所以數據就需要持續性的存在 State 里面,那么 State 又不能存的過大,因此這個場景的只適合有界數據流。

  • Interval Join

加入了一個時間窗口的限定,要求在兩個流做 Join 的時候,其中一個流必須落在另一個流的時間戳的一定時間范圍內,並且它們的 Join key 相同才能夠完成 Join。加入了時間窗口的限定,就使得我們可以對超出時間范圍的數據做一個清理,這樣的話就不需要去保留全量的 State。

Interval Join 是同時支持 processing time 和 even time去定義時間的。如果使用的是 processing time,Flink 內部會使用系統時間去划分窗口,並且去做相關的 state 清理。如果使用 even time 就會利用 Watermark 的機制去划分窗口,並且做 State 清理。
Window join

將兩個流中有相同 key 和處在相同 window 里的元素去做 Join。它的執行的邏輯比較像 Inner Join,必須同時滿足 Join key 相同,而且在同一個 Window 里元素才能夠在最終結果中輸出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM