Flink去重統計-基於自定義布隆過濾器


一、背景說明

在Flink中對流數據進行去重計算是常有操作,如流量域對獨立訪客之類的統計,去重思路一般有三個:

  • 基於Hashset來實現去重
    數據存在內存,容量小,服務重啟會丟失。
  • 使用狀態編程ValueState/MapState實現去重
    常用方式,可以使用內存/文件系統/RocksDB作為狀態后端存儲。
  • 結合Redis使用布隆過濾器實現去重
    適用對上億數據量進行去重實現,占用資源少效率高,有小概率誤判。

這里以自定義布隆過濾器的方式,實現Flink窗口計算中獨立訪客的統計,數據集樣例如下:

二、布隆過濾器部分說明

布隆過濾器簡單點說就是哈希算法+bitmap,如上圖,對字符串結合多種哈希算法,基於bitmap作為存儲,由於只用0/1存儲,所以可以大量節省存儲空間,也就特別適合在上百億數據里面做去重這種動作。在后續要進行字符串查找時,對要查找的字符串同樣計算這多個哈希算法,根據在bitmap上的位置,可以確認該字符串一定不在或者極大概率在(由於哈希沖突問題會有極小概率誤判)。

引申一下,如上所述,能對哈希沖突進行更好的優化,便能更好解決誤判問題,當然也不能無限的增加多種哈希算法的策略,會相應帶來計算效率的下降。

在本次開發中,使用自定義的布隆過濾器,其中對哈希算法部分做了幾點優化:

  • 結合Redis使用,Redis原生支持bitmap
  • 對bitmap容量擴容,一般為數據的3-10倍,這里使用2^30,使用2的整數冪,能讓后續查找輸出使用位與運算,實現比取模查找更高的效率。
myBloomFilter = new MyBloomFilter(1 << 30);
  • 優化哈希算法,這里對要查找的id轉為char類型,並行單個剔除后基於Unicode編碼乘以質數31再相加,來避免不同字符串計算出同樣哈希值的問題。
for (char c : value.toCharArray()){
                result += result * 31 + c;
            }

另外,谷歌提供的工具Guava也包含了布隆過濾器,加入相關依賴即可使用,主要參數如下源碼,輸入要建立的過濾器容器大小及誤判概率即可。

public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions, double fpp) {
        return create(funnel, (long)expectedInsertions, fpp);
    }

三、代碼部分

package com.test.UVbloomfilter;

import bean.UserBehavior;
import bean.UserVisitorCount;
import java.sql.Timestamp;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

public class UserVisitorTest {
    public static void main(String[] args) throws Exception {
        //建立環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        //指定時間語義
        WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
                .<UserBehavior>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
            @Override
            public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                return element.getTimestamp() * 1000L;
            }
        });
        //讀取數據、映射、過濾
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env
                .readTextFile("input/UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new UserBehavior(Long.parseLong(split[0])
                                , Long.parseLong(split[1])
                                , Integer.parseInt(split[2])
                                , split[3]
                                , Long.parseLong(split[4]));
                    }
                })
                //.filter(data -> "pv".equals(data.getBehavior()))  //lambda表達式寫法
                .filter(new FilterFunction<UserBehavior>() {
                    @Override
                    public boolean filter(UserBehavior value) throws Exception {
                        if (value.getBehavior().equals("pv")) {
                            return true;
                        }return false; }})
                .assignTimestampsAndWatermarks(wms);

        //去重按全局去重,故使用行為分組,僅為后續開窗使用、開窗
        WindowedStream<UserBehavior, String, TimeWindow> windowDS = userBehaviorDS.keyBy(UserBehavior::getBehavior)
                .window(TumblingEventTimeWindows.of(Time.hours(1)));

        SingleOutputStreamOperator<UserVisitorCount> processDS = windowDS
                .trigger(new MyTrigger()).process(new UserVisitorWindowFunc());

        processDS.print();
        env.execute();
    }

    //自定義觸發器:來一條計算一條(訪問Redis一次)
    private static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
        @Override
        public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; //觸發計算和清除窗口元素。
        }
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        }
    }

    private static class UserVisitorWindowFunc extends ProcessWindowFunction<UserBehavior,UserVisitorCount,String,TimeWindow>  {
        //聲明Redis連接
        private Jedis jedis;

        //聲明布隆過濾器
        private MyBloomFilter myBloomFilter;

        //聲明每個窗口總人數的key
        private String hourUVCountKey;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis = new Jedis("hadoop102",6379);
            hourUVCountKey = "HourUV";
            myBloomFilter = new MyBloomFilter(1 << 30); //2^30
        }

        @Override
        public void process(String s, Context context, java.lang.Iterable<UserBehavior> elements, Collector<UserVisitorCount> out) throws Exception {
            //1.取出數據
            UserBehavior userBehavior = elements.iterator().next();
            //2.提取窗口信息
            String windowEnd = new Timestamp(context.window().getEnd()).toString();
            //3.定義當前窗口的BitMap Key
            String bitMapKey = "BitMap_" + windowEnd;
            //4.查詢當前的UID是否已經存在於當前的bitMap中
            long offset = myBloomFilter.getOffset(userBehavior.getUserId().toString());
            Boolean exists = jedis.getbit(bitMapKey, offset);

            //5.根據數據是否存在做下一步操作
            if (!exists){
                //將對應offset位置改為1
                jedis.setbit(bitMapKey,offset,true);
                //累加當前窗口的綜合
                jedis.hincrBy(hourUVCountKey,windowEnd,1);
            }
            //輸出數據
            String hget = jedis.hget(hourUVCountKey, windowEnd);
            out.collect(new UserVisitorCount("UV",windowEnd,Integer.parseInt(hget)));
        }
    }

    private static class MyBloomFilter {
        //減少哈希沖突優化1:增加過濾器容量為數據3-10倍
        //定義布隆過濾器容量,最好傳入2的整次冪數據
        private long cap;

        public MyBloomFilter(long cap) {
            this.cap = cap;
        }
        //傳入一個字符串,獲取在BitMap中的位置
       public long getOffset(String value){
            long result = 0L;

            //減少哈希沖突優化2:優化哈希算法
            //對字符串每個字符的Unicode編碼乘以一個質數31再相加
            for (char c : value.toCharArray()){
                result += result * 31 + c;
            }
            //取模,使用位與運算代替取模效率更高
           return  result & (cap - 1);
       }}}

輸出結果在Redis查看如下:


學習交流,有任何問題還請隨時評論指出交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM