Flink去重第一彈:MapState去重


去重計算應該是數據分析業務里面常見的指標計算,例如網站一天的訪問用戶數、廣告的點擊用戶數等等,離線計算是一個全量、一次性計算的過程通常可以通過distinct的方式得到去重結果,而實時計算是一種增量、長期計算過程,我們在面對不同的場景,例如數據量的大小、計算結果精准度要求等可以使用不同的方案。此篇介紹如何通過編碼方式實現精確去重,以一個實際場景為例:計算每個廣告每小時的點擊用戶數,廣告點擊日志包含:廣告位ID、用戶設備ID(idfa/imei/cookie)、點擊時間。

實現步驟分析:

  1. 為了當天的數據可重現,這里選擇事件時間也就是廣告點擊時間作為每小時的窗口期划分

  2. 數據分組使用廣告位ID+點擊事件所屬的小時

  3. 選擇processFunction來實現,一個狀態用來保存數據、另外一個狀態用來保存對應的數據量

  4. 計算完成之后的數據清理,按照時間進度注冊定時器清理

實現

廣告數據

  1. case class AdData(id:Int,devId:String,time:Long)

分組數據

  1. case class AdKey(id:Int,time:Long)

主流程

  1. val env=StreamExecutionEnvironment.getExecutionEnvironment

  2. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  3.  

  4. val kafkaConfig=new Properties()

  5. kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

  6. kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")

  7. val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)

  8. val ds=env.addSource(consumer)

  9. .map(x=>{

  10. val s=x.split(",")

  11. AdData(s(0).toInt,s(1),s(2).toLong)

  12. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) {

  13. override def extractTimestamp(element: AdData): Long = element.time

  14. })

  15. .keyBy(x=>{

  16. val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0,

  17. Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds

  18. AdKey(x.id,endTime)

  19. })

指定時間時間屬性,這里設置允許1min的延時,可根據實際情況調整;
時間的轉換選擇TimeWindow.getWindowStartWithOffset Flink在處理window中自帶的方法,使用起來很方便,第一個參數 表示數據時間,第二個參數offset偏移量,默認為0,正常窗口划分都是整點方式,例如從0開始划分,這個offset就是相對於0的偏移量,第三個參數表示窗口大小,得到的結果是數據時間所屬窗口的開始時間,這里加上了窗口大小,使用結束時間與廣告位ID作為分組的Key。

去重邏輯
自定義Distinct1ProcessFunction 繼承了KeyedProcessFunction, 方便起見使用輸出類型使用Void,這里直接使用打印控制台方式查看結果,在實際中可輸出到下游做一個批量的處理然后在輸出;
定義兩個狀態:MapState,key表示devId, value表示一個隨意的值只是為了標識,該狀態表示一個廣告位在某個小時的設備數據,如果我們使用rocksdb作為statebackend, 那么會將mapstate中key作為rocksdb中key的一部分,mapstate中value作為rocksdb中的value, rocksdb中value 大小是有上限的,這種方式可以減少rocksdb value的大小;另外一個ValueState,存儲當前MapState的數據量,是由於mapstate只能通過迭代方式獲得數據量大小,每次獲取都需要進行迭代,這種方式可以避免每次迭代。

  1. class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {

  2. var devIdState: MapState[String, Int] = _

  3. var devIdStateDesc: MapStateDescriptor[String, Int] = _

  4.  

  5. var countState: ValueState[Long] = _

  6. var countStateDesc: ValueStateDescriptor[Long] = _

  7.  

  8. override def open(parameters: Configuration): Unit = {

  9.  

  10. devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))

  11. devIdState = getRuntimeContext.getMapState(devIdStateDesc)

  12.  

  13. countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))

  14. countState = getRuntimeContext.getState(countStateDesc)

  15. }

  16.  

  17. override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {

  18.  

  19. val currW=ctx.timerService().currentWatermark()

  20. if(ctx.getCurrentKey.time+1<=currW) {

  21. println("late data:" + value)

  22. return

  23. }

  24.  

  25. val devId = value.devId

  26. devIdState.get(devId) match {

  27. case 1 => {

  28. //表示已經存在

  29. }

  30. case _ => {

  31. //表示不存在

  32. devIdState.put(devId, 1)

  33. val c = countState.value()

  34. countState.update(c + 1)

  35. //還需要注冊一個定時器

  36. ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)

  37. }

  38. }

  39. println(countState.value())

  40. }

  41.  

  42. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {

  43. println(timestamp + " exec clean~~~")

  44. println(countState.value())

  45. devIdState.clear()

  46. countState.clear()

  47. }

  48. }

數據清理通過注冊定時器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示當watermark大於該小時結束時間+1就會執行清理動作,調用onTimer方法。

在處理邏輯里面加了

  1. val currW=ctx.timerService().currentWatermark()

  2. if(ctx.getCurrentKey.time+1<=currW){

  3. println("late data:" + value)

  4. return

  5. }

主要考慮可能會存在滯后的數據比較嚴重,會影響之前的計算結果,做了一個類似window機制里面的一個延時判斷,將延時的數據過濾掉,也可以使用OutputTag 單獨處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM