Flink sql 之 微批處理與MiniBatchIntervalInferRule (源碼分析)


本文源碼基於flink1.14

平台用戶在使用我們的flinkSql時經常會開啟minaBatch來優化狀態讀寫

所以從源碼的角度具體解讀一下miniBatch的原理

先看一下flinksql是如何觸發miniBatch的優化的

 主要就是這個Calcite的rule了,來具體看一下

在對應的match方法中

  會根據miniBatch的類型判斷,是否需要添加一個Assigner的節點

 這個assigner是干嘛的呢?這個Assinger是一個execNode和窗口的assigner是不一樣的,這里主要是為了發送水印的

沒錯,miniBatch攢一批的實現原理就是通過水印,來作為一批的標識

來具體看看

 分為處理時間和事件時間

先看看處理時間

 邏輯比較簡單,就是當前微批的開始時間大於當前水印,就發送一個當前的微批的開始時間的水印

然后,事件時間的沒什么意思,就是水印直接往下游轉發了

接着,攢微批已經將完了,來看下具體聚合算子怎么優化微批計算的吧

來看個StreamExecGroupAggregate這個聚合ExecNode的邏輯

既然是execNode來直接看它的translateToPlanInternal()方法

 原來是直接在execNode里面做了特殊處理,不過也是,每個算子的優化都不一樣也不太好抽象出來

這里還是 先看看不使用微批的時候是怎么處理的,然后來對比一下

沒用微批這里是封裝成了一個KeyedProcessOperator的算子,里面傳的aggFunction直接就是一個KeyedProcessFunction

看下具體處理groupAggFunction

 這里沒有開minibatch的邏輯比較簡單

每來一條數據,先讀狀態accState是一個valueState然后,調用聚合函數的accumlate來計算,然后用新得到的累加器更新狀態

可以看到這樣做的問題還是比較大的

第一,每一條數據都要讀寫狀態開銷很大

第二,每條數據都要調用計算,有很多虛函數的調用

因此,讓我們看看MIniBatch是如何做的吧

回到上面,我們看到MiniBatch是創建的一個KeyedMapBundleOperator,里面的參數是MiniBatchGroupAggFunction

看下KeyedMapBundleOperator

先從一個bundle獲取和數據同key的數據,來看下這個bundle是什么

 ok,就是一個本地map,然后走addInput()

來看下MiniBatchGroupAggFunction的addInput方法

其實就是把,來的數據加到map對應key的Value是一個list里面去了

最后來看當微批攢夠觸發onTrigger會走到finishBundle()方法

 先從buffer獲取每一個key對應的value是一個list

然后讀取狀態state數據

 直接for循環遍歷微批的數據

然后調用聚合函數的accumulate不停計算

最后將計算好的累加器accumulator存到狀態里面去

是不是很簡單

這樣微批處理就完成了,減少了狀態的頻繁訪問,是一個很不錯的優化

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM