Flink sql 之 TopN 與 StreamPhysicalRankRule (源碼解析)


基於flink1.14的源碼做解析

公司內有很多業務方都在使用我們Flink sql平台做TopN的計算,今天同事突然問到我,Flink sql 是怎么實現topN的 ?

蒙圈了,這塊源碼沒看過啊 ,業務要問起來怎么辦,趕快打開源碼補一下

拿到這個問題先冷靜分析一下范圍

首先肯定屬於Flink sql模塊,源碼里面肯定是在flink-table-planner包里面,接着topN那不就是ROW_NUMBER嘛,是個函數呀

既然如此那就從flink源碼的系統函數作為線索開始找起來,來到 org.apache.calcite.sql.fun.SqlStdOperatorTable類

果然找到了,那calcite的某個rule肯定有個地方判斷了它,繼續查調用鏈

不出所料,FlinkLogicalRankRuleBase這個calcite的rule里面果然根據這個function的類型來確定rank的類型了

看下這個rule的匹配條件

 這里也好理解,overAgg的時候會判斷這個rank以及對應的類型

這是只是做了一下簡單的提取了rank的字段啊,提取謂語啊,提取表達式啊這一些拿信息的操作

然后直接生成新的relNode叫FlinkLogicalRank通過transformTo直接返回了這個等價節點

既然是relNode那肯定又會有calcite的rule去處理它,來找一找

批處理的就不管了,從名字就可以看出來我們要找的類了

看個不帶window的吧

 返回StreamPhysicalRank

這個類是一個FlinkPhysicalRel是可以轉換成execNode的

這里在多說一句,

這里將partitionKey傳入了,就是sql里面的partition by后面的,后面會用這個來創建transformation的keySelecter用來分流數據

返回的這個StreamExecRank就是可以轉換成具體的Flink的算子了,具體邏輯就在里面了

接下來看下row_number的具體邏輯,找到方法translateToPlanInternal

根據策略主要分為三種類型

AppendFastStrategy  (輸入僅包含插入時)

RetractStrategy   (輸入包含update和delete)

UpdateFastStrategy     (輸入不應包含刪除且輸入有給定的primaryKeys且按字段排序時)

來看個retractStrategy的吧

先通過sort的字段獲取一個用於排序RowData的比較器 ComparableRecordComparator

根據比較器創建 RetractableTopNFunction

這個類還有兩個主要的狀態數據結構

dataState這個map用來存放當key相同的所有數據會放在同一個list里面

treeMap這個可排序的map就是通過上面我們sql里面定義的sort by 來排序數據的,Long是指這個相同的key有多少個record

!!!!!!!!!!!  那就是用java的treeMap排序唄

繼續往下看

 主邏輯就是這個了

每進入一條數據,會根據這條數據的類型划分

當數據是Insert , UPDATE_AFTER類型是會走 emitRecordsWithRowNumber()方法

當數據是UPDATE_BEFORE,DELETE類型會走 retractRecordWithRowNumber ()方法

來看下具體邏輯先看INSERT的

 遍歷treeMap

解讀一下,當數據是insert數據的時候

INSERT數據會先放到treeMap里面去,delete則不會

按順序遍歷treeMap

當遍歷過程中發現遍歷的key與當前數據的key相同時,和當前數據key相同的所有數據數據(dataState中的LIST),全部撤回並且更新他們的rowNumber+1

繼續遍歷treeMap

之后的數據全部撤回UpdateBefore,並且向下游發送UpdateAfter使rowNumber+1,遍歷直到已經到第TopN個數據循環結束

 

當數據是DELETE類型的時候,會和Insert反過來,當前key之后的數據全部撤回,然后rowNumber-1

 

整個處理流程差不多就結束了,可以看到rowNumber當N較大且排序變化頻繁的時候,性能消耗還是非常大的,極端情況下游的數據會翻很多倍

這個還需要注意在其他兩個策略中還有一個參數,table.exec.topn.cache-size

 

 影響下面這個本地lruCache的大小

 

 調大可以減少狀態的訪問,可以按需要添加

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM