摘要:今天和大家聊聊Flink雙流Join問題。這是一個高頻面試點,也是工作中常遇到的一種真實場景。
本文分享自華為雲社區《萬字直通面試:Flink雙流JOIN》,作者:大數據兵工廠 。
如何保證Flink雙流Join准確性和及時性、除了窗口join還存在哪些實現方式、究竟如何回答才能完全打動面試官呢。。你將在本文中找到答案。
1 引子
1.1 數據庫SQL中的JOIN
我們先來看看數據庫SQL中的JOIN操作。如下所示的訂單查詢SQL,通過將訂單表的id和訂單詳情表order_id關聯,獲取所有訂單下的商品信息。
select a.id as '訂單id', a.order_date as '下單時間', a.order_amount as '訂單金額', b.order_detail_id as '訂單詳情id', b.goods_name as '商品名稱', b.goods_price as '商品價格', b.order_id as '訂單id' from dwd_order_info_pfd a right join dwd_order_detail_pfd b on a.id = b.order_id
這是一段很簡單的SQL代碼,就不詳細展開敘述了。此處主要引出SQL中的JOIN類型,這里用到的是 right join , 即右連接。
- left join: 保留左表全部數據和右表關聯數據,右表非關聯數據置NULL
- right join: 保留右表全部數據和左表關聯數據,左表非關聯數據置NULL
- inner join: 保留左表關聯數據和右邊關聯數據
- cross join: 保留左表和右表數據笛卡爾積
基於關聯鍵值逐行關聯匹配,過濾表數據並生成最終結果,提供給下游數據分析使用。
就此打住,關於數據庫SQL中的JOIN原理不再多贅述,感興趣的話大家可自行研究,下面我們將目光轉移到大數據領域看看吧。
1.2 離線場景下的JOIN
假設存在這樣一個場景:
已知Mysql數據庫中訂單表和訂單明細表,且滿足一對多的關系,統計T-1天所有訂單的商品分布詳情。
聰明的大家肯定已經給出了答案,沒錯~就是上面的SQL:
select a.*, b.* from dwd_order_info_pfd a right join dwd_order_detail_pfd b on a.id = b.order_id
現在修改下條件:已知訂單表和訂單明細表均為億級別數據,求相同場景下的分析結果。
咋辦?此時關系型數據庫貌似不大合適了~開始放大招:使用大數據計算引擎來解決。
考慮到T-1統計場景對時效性要求很低,可以使用Hive SQL來處理,底層跑Mapreduce任務。如果想提高運行速度,換成Flink或Spark計算引擎,使用內存計算。
至於查詢SQL和上面一樣,並將其封裝成一個定時調度任務, 等系統調度運行。如果結果不正確的話,由於數據源和數據靜態不變,大不了重跑,看起來感覺皆大歡喜~
可是好景不長,產品冤家此時又給了你一個無法拒絕的需求:我要實時統計!!
2 實時場景下的JOIN
還是上面的場景,此時數據源換成了實時訂單流和實時訂單明細流,比如Kafka的兩個topic,要求實時統計每分鍾內所有訂單下的商品分布詳情。
現在情況貌似變得復雜了起來,簡單分析下:
- 數據源。實時數據流,和靜態流不同,數據是實時流入的且動態變化,需要計算程序支持實時處理機制。
- 關聯性。前面提到靜態數據執行多次join操作,左表和右表能關聯的數據是很恆定的;而實時數據流(左右表)如果進入時機不一致,原本可以關聯的數據會關聯不上或者發生錯誤。
- 延遲性。實時統計,提供分鍾甚至秒級別響應結果。
由於流數據join的特殊性,在滿足實時處理機制、低延遲、強關聯性的前提下,看來需要制定完善的數據方案,才能實現真正的流數據JOIN。
2.1 方案思路
我們知道訂單數據和訂單明細數據是一對多的關系,即一條訂單數據對應着多條商品明細數據,畢竟買一件商品也是那么多郵費,不如打包團購。。而一條明細數據僅對應一條訂單數據。
這樣,雙流join策略可以考慮如下思路:
- 當數據流為訂單數據時。無條件保留,無論當前是否關聯到明細數據,均留作后續join使用。
- 當數據流為明細數據時。在關聯到其訂單數據后,就可以say goodbye了,否則暫時保留等待下一次與訂單數據的邂逅。
- 完成所有處於同一時段內的訂單數據和訂單明細數據join, 清空存儲狀態
實際生產場景中,需要考慮更多的復雜情況,包括JOIN過程的數據丟失等異常情況的處理,此處僅示意。
好了,看起來我們已經有了一個馬馬虎虎的實時流JOIN方案雛形。
貌似可以准備動手大干一場了~ 別着急,有人已經幫我們偷偷的實現了:Apache Flink
3 Flink的雙流JOIN
Apache Flink 是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink 被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。
——來自Flink官網定義
這里我們只需要知道Flink是一個實時計算引擎就行了,主要關注其如何實現雙流JOIN。
3.1 內部運行機制
- 內存計算:Flink任務優先在內存中計算,內存不夠時保存到訪問高效的磁盤,提供秒級延遲響應。
- 狀態強一致性:Flink使用一致性快照保存狀態,並定期檢查本地狀態、持久存儲來保證狀態一致性。
- 分布式執行: Flink應用程序可以划分為無數個並行任務在集群中執行,幾乎無限量使用CPU、主內存、磁盤和網絡IO。
- 內置高級編程模型:Flink編程模型抽象為SQL、Table、DataStream|DataSet API、Process四層,並封裝成豐富功能的算子,其中就包含JOIN類型的算子。
仔細看看,我們前面章節討論的實時流JOIN方案的前提是否都滿足了呢?
- 實時處理機制: Flink天生即實時計算引擎
- 低延遲: Flink內存計算秒級延遲
- 強關聯性: Flink狀態一致性和join類算子
不由感嘆, 這個Flink果然強啊~
保持好奇心,我們去瞅瞅Flink雙流join的真正奧義!!
3.2 JOIN實現機制
Flink雙流JOIN主要分為兩大類。一類是基於原生State的Connect算子操作,另一類是基於窗口的JOIN操作。其中基於窗口的JOIN可細分為window join和interval join兩種。
- 實現原理:底層原理依賴Flink的State狀態存儲,通過將數據存儲到State中進行關聯join, 最終輸出結果。
恍然大悟, Flink原來是通過State狀態來緩存等待join的實時流。
這里給大家拋出一個問題:
用redis存儲可不可以,state存儲相比redis存儲的區別?
更多細節歡迎大家一起探討,添加個人微信: youlong525 拉您進群,還有免費Flink PDF領取~
回到正題,這幾種方式到底是如何實現雙流JOIN的?我們接着往下看。
注意: 后面內容將多以文字 + 代碼的形式呈現,避免枯燥,我放了一堆原創示意圖~
4 基於Window Join的雙流JOIN實現機制
顧名思義,此類方式利用Flink的窗口機制實現雙流join。通俗理解,將兩條實時流中元素分配到同一個時間窗口中完成Join。
- 底層原理: 兩條實時流數據緩存在Window State中,當窗口觸發計算時,執行join操作。
4.1 join算子
先看看Window join實現方式之一的join算子。這里涉及到Flink中的窗口(window)概念,因此Window Joinan按照窗口類型區分的話某種程度來說可以細分出3種:
- Tumbling Window Join (滾動窗口)
- Sliding Window Join (滑動窗口)
- Session Widnow Join(會話窗口)
兩條流數據按照關聯主鍵在(滾動、滑動、會話)窗口內進行inner join, 底層基於State存儲,並支持處理時間和事件時間兩種時間特征,看下源碼:
源碼核心總結:windows窗口 + state存儲 + 雙層for循環執行join()
現在讓我們把時間軸往回拉一點點,在實時場景JOIN那里我們收到了這樣的需求:統計每分鍾內所有訂單下的商品明細分布。
OK, 使用join算子小試牛刀一下。我們定義60秒的滾動窗口,將訂單流和訂單明細流通過order_id關聯,得到如下的程序:
val env = ... // kafka 訂單流 val orderStream = ... // kafka 訂單明細流 val orderDetailStream = ... orderStream.join(orderDetailStream) .where(r => r._1) //訂單id .equalTo(r => r._2) //訂單id .window(TumblingProcessTimeWindows.of( Time.seconds(60))) .apply {(r1, r2) => r1 + " : " + r2} .print()
整個代碼其實很簡單,概要總結下:
- 定義兩條輸入實時流A、B
- A流調用join(b流)算子
- 關聯關系定義: where為A流關聯鍵,equalTo為B流關聯鍵,都是訂單id
- 定義window窗口(60s間隔)
- apply方法定義邏輯輸出
這樣只要程序穩定運行,就能夠持續不斷的計算每分鍾內訂單分布詳情,貌似解決問題了奧~
還是別高興太早,別忘了此時的join類型是inner join。復習一下知識: inner join指的是僅保留兩條流關聯上的數據。
這樣雙流中沒關聯上的數據豈不是都丟掉了?別擔心,Flink還提供了另一個window join操作: coGroup算子。
4.2 coGroup算子
coGroup算子也是基於window窗口機制,不過coGroup算子比Join算子更加靈活,可以按照用戶指定的邏輯匹配左流或右流數據並輸出。
換句話說,我們通過自己指定雙流的輸出來達到left join和right join的目的。
現在來看看在相同場景下coGroup算子是如何實現left join:
#這里看看java算子的寫法 orderDetailStream .coGroup(orderStream) .where(r -> r.getOrderId()) .equalTo(r -> r.getOrderId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() { @Override public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector) { for (OrderDetail orderDetaill : orderDetailRecords) { boolean flag = false; for (Order orderRecord : orderRecords) { // 右流中有對應的記錄 collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price())); flag = true; } if (!flag) { // 右流中沒有對應的記錄 collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null)); } } } }) .print();
這里需要說明幾點:
- join算子替換為coGroup算子
- 兩條流依然需要在一個window中且定義好關聯條件
- apply方法中自定義判斷,此處對右值進行判斷:如果有值則進行連接輸出,否則右邊置為NULL。
可以這么說,現在我們已經徹底搞定了窗口雙流JOIN。
只要你給我提供具體的窗口大小,我就能通過join或coGroup算子鼓搗出各種花樣join,而且使用起來特別簡單。
但是假如此時我們親愛的產品又提出了一個小小問題:
大促高峰期,商品數據某時段會寫入不及時,時間可能比訂單早也可能比訂單晚,還是要計算每分鍾內的訂單商品分布詳情,沒問題吧~
當然有問題:兩條流如果步調不一致,還用窗口來控制能join的上才怪了~ 很容易等不到join流窗口就自動關閉了。
還好,我知道Flink提供了Interval join機制。
5 基於Interval Join的雙流JOIN實現機制
Interval Join根據右流相對左流偏移的時間區間(interval)作為關聯窗口,在偏移區間窗口中完成join操作。
有點不好理解,我畫個圖看下:
stream2.time ∈ (stream1.time +low, stream1.time +high)
滿足數據流stream2在數據流stream1的 interval(low, high)偏移區間內關聯join。interval越大,關聯上的數據就越多,超出interval的數據不再關聯。
- 實現原理:interval join也是利用Flink的state存儲數據,不過此時存在state失效機制ttl,觸發數據清理操作。
這里再引出一個問題:
state的ttl機制需要怎么設置?不合理的ttl設置會不會撐爆內存?
我會在后面的文章中深入講解下State的ttl機制,歡迎大家一起探討~
下面簡單看下interval join的代碼實現過程:
val env = ... // kafka 訂單流 val orderStream = ... // kafka 訂單明細流 val orderDetailStream = ... orderStream.keyBy(_.1) // 調用intervalJoin關聯 .intervalJoin(orderDetailStream._2) // 設定時間上限和下限 .between(Time.milliseconds(-30), Time.milliseconds(30)) .process(new ProcessWindowFunction()) class ProcessWindowFunction extends ProcessJoinFunction...{ override def processElement(...) { collector.collect((r1, r2) => r1 + " : " + r2) } }
訂單流在流入程序后,等候(low,high)時間間隔內的訂單明細流數據進行join, 否則繼續處理下一個流。
從代碼中我們發現,interval join需要在兩個KeyedStream之上操作,即keyBy(),並在between()方法中指定偏移區間的上下界。
需要注意的是interval join實現的也是inner join,且目前只支持事件時間。
6 基於Connect的雙流JOIN實現機制
前面在使用Window join或者Interval Join來實現雙流join的時候,我發現了其中的共性:
無論哪種實現方式,Flink內部都將join過程透明化,在算子中封裝了所有的實現細節。
這是什么?是編程語言中的抽象概念~ 隱藏底層細節,對外暴露統一API, 大幅簡化程序編碼。
可是這樣會引來一個問題:如果程序報錯或者數據異常,如何快速進行調優排查,直接看源碼嗎?不大現實。。
這里介紹基於Connect算子實現的雙流JOIN方法,我們可自己控制雙流JOIN處理邏輯,同時保持過程時效性和准確性。
6.1 Connect算子原理
對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以調用不同方法在兩個實時流上執行,且雙流之間可以共享狀態。
圖上我們可以看到,兩個數據流被connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式,兩個流相互獨立。
[DataStream1, DataStream2] -> ConnectedStreams[1,2]
這樣,我們可以在Connect算子底層的ConnectedStreams中編寫代碼,自行實現雙流JOIN的邏輯處理。
6.2 技術實現
1.調用connect算子,根據orderid進行分組,並使用process算子分別對兩條流進行處理。
orderStream.connect(orderDetailStream) .keyBy("orderId", "orderId") .process(new orderProcessFunc());
2.process方法內部進行狀態編程, 初始化訂單、訂單明細和定時器的ValueState狀態。
private ValueState<OrderEvent> orderState; private ValueState<TxEvent> orderDetailState; private ValueState<Long> timeState; // 初始化狀態Value orderState = getRuntimeContext().getState( new ValueStateDescriptor<Order> ("order-state",Order.class)); ····
3.為每個進入的數據流保存state狀態並創建定時器。在時間窗口內另一個流達到時進行join並輸出,完成后刪除定時器。
@Override public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){ if (orderDetailState.value() == null){ //明細數據未到,先把訂單數據放入狀態 orderState.update(value); //建立定時器,60秒后觸發 Long ts = (value.getEventTime()+10)*1000L; ctx.timerService().registerEventTimeTimer( ts); timeState.update(ts); }else{ //明細數據已到,直接輸出到主流 out.collect(new Tuple2<>(value,orderDetailS tate.value())); //刪除定時器 ctx.timerService().deleteEventTimeTimer (timeState.value()); //清空狀態,注意清空的是支付狀態 orderDetailState.clear(); timeState.clear(); } } ... @Override public void processElement2(){ ... }
4.未及時達到的數據流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) { // 實現左連接 if (orderState.value() != null){ ctx.output(new OutputTag<String>("left-jo in") {}, orderState.value().getTxId()); // 實現右連接 }else{ ctx.output(new OutputTag<String>("left-jo in") {}, orderDetailState.value().getTxId()); } orderState.clear(); orderDetailState.clear(); timeState.clear(); }
總體思想: 基於數據時間實現訂單數據及訂單明細數據的關聯,超時或者缺失則由側輸出流輸出。
在connect中針對訂單流和訂單明細流,先創建定時器並保存state狀態,處於窗口內就進行join, 否則進入側輸出流。
7 雙流JOIN的優化與總結
- 為什么我的雙流join時間到了卻不觸發,一直沒有輸出
檢查一下watermark的設置是否合理,數據時間是否遠遠大於watermark和窗口時間,導致窗口數據經常為空
- state數據保存多久,會內存爆炸嗎
state自帶有ttl機制,可以設置ttl過期策略,觸發Flink清理過期state數據。建議程序中的state數據結構用完后手動clear掉。
- 我的雙流join傾斜怎么辦
join傾斜三板斧: 過濾異常key、拆分表減少數據、打散key分布。當然可以的話我建議加內存!加內存!加內存!!
- 想實現多流join怎么辦
目前無法一次實現,可以考慮先union然后再二次處理;或者先進行connnect操作再進行join操作,僅建議~
- join過程延遲、沒關聯上的數據會丟失嗎
這個一般來說不會,join過程可以使用側輸出流存儲延遲流;如果出現節點網絡等異常,Flink checkpoint也可以保證數據不丟失。
某日
面試官: Flink雙流join了解嗎? 簡單說說其實現原理。
某君: Flink雙流JOIN是。。。