在Flink去重第一彈:MapState去重中介紹了使用編碼方式完成去重,但是這種方式開發周期比較長,我們可能需要針對不同的業務邏輯實現不同的編碼,對於業務開發來說也需要熟悉Flink編碼,也會增加相應的成本,我們更多希望能夠以sql的方式提供給業務開發完成自己的去重邏輯。本篇介紹如何使用sql方式完成去重。
為了與離線分析保持一致的分析語義,Flink SQL 中提供了distinct去重方式,使用方式:
-
SELECT DISTINCT devId FROM pv
表示對設備ID進行去重,得到一個明細結果,那么我們在使用distinct來統計去重結果通常有兩種方式, 仍然以統計每日網站uv為例。
第一種方式
-
SELECT datatime,count(DISTINCT devId) FROM pv group by datatime
該語義表示計算網頁每日的uv數量,其內部核心實現主要依靠DistinctAccumulator與CountAccumulator,DistinctAccumulator 內部包含一個map結構,key 表示的是distinct的字段,value表示重復的計數,CountAccumulator就是一個計數器的作用,這兩部分都是作為動態生成聚合函數的中間結果accumulator,透過之前的聚合函數的分析可知中間結果是存儲在狀態里面的,也就是容錯並且具有一致性語義的
其處理流程是:
- 將devId 添加到對應的DistinctAccumulator對象中,首先會判斷map中是否存在該devId, 不存在則插入map中並且將對應value記1,並且返回True;存在則將對應的value+1更新到map中,並且返回False
- 只有當返回True時才會對CountAccumulator做累加1的操作,以此達到計數目的
第二種方式
-
select count(*),datatime from(
-
select distinct devId,datatime from pv ) a
-
group by datatime
內部是一個對devId,datatime 進行distinct的計算,在flink內部會轉換為以devId,datatime進行分組的流並且進行聚合操作,在內部會動態生成一個聚合函數,該聚合函數createAccumulators方法生成的是一個Row(0) 的accumulator 對象,其accumulate方法是一個空實現,也就是該聚合函數每次聚合之后返回的結果都是Row(0),通過之前對sql中聚合函數的分析(可查看GroupAggProcessFunction函數源碼), 如果聚合函數處理前后得到的值相同那么可能會不發送該條結果也可能發送一條撤回一條新增的結果,但是其最終的效果是不會影響下游計算的,在這里我們簡單理解為在處理相同的devId,datatime不會向下游發送數據即可,也就是每一對devId,datatime只會向下游發送一次數據;
外部就是一個簡單的按照時間維度的計數計算,由於內部每一組devId,datatime 只會發送一次數據到外部,那么外部對應datatime維度的每一個devId都是唯一的一次計數,得到的結果就是我們需要的去重計數結果。
兩種方式對比
- 這兩種方式最終都能得到相同的結果,但是經過分析其在內部實現上差異還是比較大,第一種在分組上選擇datatime ,內部使用的累加器DistinctAccumulator 每一個datatime都會與之對應一個對象,在該維度上所有的設備id, 都會存儲在該累加器對象的map中,而第二種選擇首先細化分組,使用datatime+devId分開存儲,然后外部使用時間維度進行計數,簡單歸納就是:
第一種: datatime->Value{devI1,devId2..}
第二種: datatime+devId->row(0)
聚合函數中accumulator 是存儲在ValueState中的,第二種方式的key會比第一種方式數量上多很多,但是其ValueState占用空間卻小很多,而在實際中我們通常會選擇Rocksdb方式作為狀態后端,rocksdb中value大小是有上限的,第一種方式很容易到達上限,那么使用第二種方式會更加合適; - 這兩種方式都是全量保存設備數據的,會消耗很大的存儲空間,但是我們的計算通常是帶有時間屬性的,那么可以通過配置StreamQueryConfig設置狀態ttl。