本文源碼基於flink1.14
平台用戶在使用我們的flinkSql時經常會開啟minaBatch來優化狀態讀寫
所以從源碼的角度具體解讀一下miniBatch的原理
先看一下flinksql是如何觸發miniBatch的優化的
主要就是這個Calcite的rule了,來具體看一下
在對應的match方法中
會根據miniBatch的類型判斷,是否需要添加一個Assigner的節點
這個assigner是干嘛的呢?這個Assinger是一個execNode和窗口的assigner是不一樣的,這里主要是為了發送水印的
沒錯,miniBatch攢一批的實現原理就是通過水印,來作為一批的標識
來具體看看
分為處理時間和事件時間
先看看處理時間
邏輯比較簡單,就是當前微批的開始時間大於當前水印,就發送一個當前的微批的開始時間的水印
然后,事件時間的沒什么意思,就是水印直接往下游轉發了
接着,攢微批已經將完了,來看下具體聚合算子怎么優化微批計算的吧
來看個StreamExecGroupAggregate這個聚合ExecNode的邏輯
既然是execNode來直接看它的translateToPlanInternal()方法
原來是直接在execNode里面做了特殊處理,不過也是,每個算子的優化都不一樣也不太好抽象出來
這里還是 先看看不使用微批的時候是怎么處理的,然后來對比一下
沒用微批這里是封裝成了一個KeyedProcessOperator的算子,里面傳的aggFunction直接就是一個KeyedProcessFunction
看下具體處理groupAggFunction
這里沒有開minibatch的邏輯比較簡單
每來一條數據,先讀狀態accState是一個valueState然后,調用聚合函數的accumlate來計算,然后用新得到的累加器更新狀態
可以看到這樣做的問題還是比較大的
第一,每一條數據都要讀寫狀態開銷很大
第二,每條數據都要調用計算,有很多虛函數的調用
因此,讓我們看看MIniBatch是如何做的吧
回到上面,我們看到MiniBatch是創建的一個KeyedMapBundleOperator,里面的參數是MiniBatchGroupAggFunction
看下KeyedMapBundleOperator
先從一個bundle獲取和數據同key的數據,來看下這個bundle是什么
ok,就是一個本地map,然后走addInput()
來看下MiniBatchGroupAggFunction的addInput方法
其實就是把,來的數據加到map對應key的Value是一個list里面去了
最后來看當微批攢夠觸發onTrigger會走到finishBundle()方法
先從buffer獲取每一個key對應的value是一個list
然后讀取狀態state數據
直接for循環遍歷微批的數據
然后調用聚合函數的accumulate不停計算
最后將計算好的累加器accumulator存到狀態里面去
是不是很簡單
這樣微批處理就完成了,減少了狀態的頻繁訪問,是一個很不錯的優化