去重計算應該是數據分析業務里面常見的指標計算,例如網站一天的訪問用戶數、廣告的點擊用戶數等等,離線計算是一個全量、一次性計算的過程通常可以通過distinct的方式得到去重結果,而實時計算是一種增量、長期計算過程,我們在面對不同的場景,例如數據量的大小、計算結果精准度要求等可以使用不同的方案。此篇介紹如何通過編碼方式實現精確去重,以一個實際場景為例:計算每個廣告每小時的點擊用戶數,廣告點擊日志包含:廣告位ID、用戶設備ID(idfa/imei/cookie)、點擊時間。
實現步驟分析:
-
為了當天的數據可重現,這里選擇事件時間也就是廣告點擊時間作為每小時的窗口期划分
-
數據分組使用廣告位ID+點擊事件所屬的小時
-
選擇processFunction來實現,一個狀態用來保存數據、另外一個狀態用來保存對應的數據量
-
計算完成之后的數據清理,按照時間進度注冊定時器清理
實現
廣告數據
-
case class AdData(id:Int,devId:String,time:Long)
分組數據
-
case class AdKey(id:Int,time:Long)
主流程
-
val env=StreamExecutionEnvironment.getExecutionEnvironment
-
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-
val kafkaConfig=new Properties()
-
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
-
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")
-
val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)
-
val ds=env.addSource(consumer)
-
.map(x=>{
-
val s=x.split(",")
-
AdData(s(0).toInt,s(1),s(2).toLong)
-
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) {
-
override def extractTimestamp(element: AdData): Long = element.time
-
})
-
.keyBy(x=>{
-
val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0,
-
Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds
-
AdKey(x.id,endTime)
-
})
指定時間時間屬性,這里設置允許1min的延時,可根據實際情況調整;
時間的轉換選擇TimeWindow.getWindowStartWithOffset Flink在處理window中自帶的方法,使用起來很方便,第一個參數 表示數據時間,第二個參數offset偏移量,默認為0,正常窗口划分都是整點方式,例如從0開始划分,這個offset就是相對於0的偏移量,第三個參數表示窗口大小,得到的結果是數據時間所屬窗口的開始時間,這里加上了窗口大小,使用結束時間與廣告位ID作為分組的Key。
去重邏輯
自定義Distinct1ProcessFunction 繼承了KeyedProcessFunction, 方便起見使用輸出類型使用Void,這里直接使用打印控制台方式查看結果,在實際中可輸出到下游做一個批量的處理然后在輸出;
定義兩個狀態:MapState,key表示devId, value表示一個隨意的值只是為了標識,該狀態表示一個廣告位在某個小時的設備數據,如果我們使用rocksdb作為statebackend, 那么會將mapstate中key作為rocksdb中key的一部分,mapstate中value作為rocksdb中的value, rocksdb中value 大小是有上限的,這種方式可以減少rocksdb value的大小;另外一個ValueState,存儲當前MapState的數據量,是由於mapstate只能通過迭代方式獲得數據量大小,每次獲取都需要進行迭代,這種方式可以避免每次迭代。
-
class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {
-
var devIdState: MapState[String, Int] = _
-
var devIdStateDesc: MapStateDescriptor[String, Int] = _
-
-
var countState: ValueState[Long] = _
-
var countStateDesc: ValueStateDescriptor[Long] = _
-
-
override def open(parameters: Configuration): Unit = {
-
-
devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
-
devIdState = getRuntimeContext.getMapState(devIdStateDesc)
-
-
countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
-
countState = getRuntimeContext.getState(countStateDesc)
-
}
-
-
override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {
-
-
val currW=ctx.timerService().currentWatermark()
-
if(ctx.getCurrentKey.time+1<=currW) {
-
println("late data:" + value)
-
return
-
}
-
-
val devId = value.devId
-
devIdState.get(devId) match {
-
case 1 => {
-
//表示已經存在
-
}
-
case _ => {
-
//表示不存在
-
devIdState.put(devId, 1)
-
val c = countState.value()
-
countState.update(c + 1)
-
//還需要注冊一個定時器
-
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)
-
}
-
}
-
println(countState.value())
-
}
-
-
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {
-
println(timestamp + " exec clean~~~")
-
println(countState.value())
-
devIdState.clear()
-
countState.clear()
-
}
-
}
數據清理通過注冊定時器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示當watermark大於該小時結束時間+1就會執行清理動作,調用onTimer方法。
在處理邏輯里面加了
-
val currW=ctx.timerService().currentWatermark()
-
if(ctx.getCurrentKey.time+1<=currW){
-
println("late data:" + value)
-
return
-
}
主要考慮可能會存在滯后的數據比較嚴重,會影響之前的計算結果,做了一個類似window機制里面的一個延時判斷,將延時的數據過濾掉,也可以使用OutputTag 單獨處理。