源碼分析基於flink1.14
Join是flink中最常用的操作之一,但是如果濫用的話會有很多的性能問題,了解一下Flink源碼的實現原理是非常有必要的
本文的join主要是指flink sql的Regular join 也就是平時我們的雙流join中普通的full join ,left join,right join
先找到calcite的relNode轉換rule
會將邏輯節點logiceJoin轉換成flink的FlinkLogicalJoin
接着看下哪里Rule會轉換這個FlinkLogicalJoin
這里會將這種普通join也就是regularJoin給匹配上
條件是
不是這三種join,並且
也不能join表達式包含時間屬性
匹配上次rule以后,接着
返回了StreamPhysicalJoin這個StreamPhysicalRel是個物理節點
他的translateToExecNode方法會返回StreamExecJoin,這個類就是我們具體的邏輯了
來看一下
首先會根據會join的類型,確定兩個流那個需要輸出,如果是fulljoin兩個流都會輸出,left join就是左流需要outer,right join就是right流需要outer
之后創建了具體的Operator,來看下這個StreamingJoinOperator
先看一下這個類里面兩個比較重要的狀態
可以看到,左右流都會保存一個狀態
看下狀態包裝類的描述
總共就三,方法,分別是加入數據,撤回數據,獲取這個數據關聯上的所有數據
在open方法里面會根據上面計算的左右流是否需要輸出來初始化這個兩個狀態
這里狀態包裝類的創建,將根據數據類型分為三種
1. 流帶主鍵,且join條件包含了主鍵
這樣數據唯一,就只用ValueState來存
2. 流帶主鍵,但join條件沒有包含主鍵
這里就用MapState來存了,每次根據主鍵更新
3. 流不帶主鍵
就用map,直接把record當key存了
接着看processElement方法
這里詳細的代碼就不列出來了太復雜了,想看的直接看這個類
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement()
梳理邏輯我們還是來看下偽代碼
主要分為兩段
1. 如果是 +Insert / +Update 類型的數據
判斷輸入數據的流需不需要輸出
如果需要輸出
看下和另外一個流關聯的上不
關聯的上輸出 +I[record+other]s
關聯不上輸出 +I[record+null]
將數據加入狀態中
如果不需要輸出
將數據加入狀態中
如果與另外一個流的數據關聯上了
如果另外一個流要outer, 輸出 +I[record+other]s
如果另外一個流不用輸出 ,輸出 +I/+U[record+other]s
1. 如果是 -Delete / -Update 類型的數據
狀態里面先撤回這條數據
如果與另外流沒有匹配上,如果輸入數據的流需要輸出,則輸出 -D[record+null]
如果與另外一條流匹配上了
當前流outer,發送 -D[record+other]s,如果是inner join發送-D/-U[record+other]s
最后的最后
用兩個流join的key作為狀態的selecter來完成transform的構建就完成了
總結一下:
Flink會根據join的key作為狀態分流的selecter,根據表是否有主鍵,join條件是否包含主鍵,來創建對應的state數據結構,來優化狀態的讀寫
兩條流會根據join類型,來設置此流需不需要輸出outer
當數據進入,查詢另一側的流是否有數據可以關聯上,以及兩條流的outer類型,來確定向下游發送的撤回和新增的數據