Flink 實時統計歷史 pv、uv


Flink 實時統計 pv、uv 的博客,我已經寫了三篇,最近這段時間又做了個嘗試,用 sql 來計算全量數據的 pv、uv。

Stream Api 寫實時、離線的 pv、uv ,除了要寫代碼沒什么其他的障礙

SQL api 來寫就有很多障礙,比如窗口沒有 trigger,不能操作 狀態,udf 不如 process 算子好用等

問題

預設兩個場景的問題:
1. 按天統計 pv、uv
2. 在解決問題 1 的基礎上,再解決歷史 pv、uv 的統計

實現思路

有以下幾種思路,來實現實時統計 pv、uv

  1. 直接使用 CUMULATE WINDOW 計算當日的 pv、uv
  2. 直接使用 CUMULATE WINDOW 計算當日的 pv、uv,再獲取昨天的 pv,累加可以得到基於歷史的 pv
  3. pv 計算同解法 2 ,uv 的計算采用 udaf,使用 bloom filter 來粗略的計算 uv
  4. pv 計算同解法 2 ,uv 的計算采用 udaf,用 redis 記錄 user_id ,每次計算的時候獲取 user_id 的數量即 uv
  5. pv 計算同解法 2 ,uv 的計算采用 udaf,每次啟動的時候獲取歷史的 user_id 緩存在內存中,加上新來的 user_id 計算 uv
  6. 全局窗口,直接計算全量的 pv、uv (沒意義,未實現)

注: 由於需要實時輸出結果,SQL 都選用了 CUMULATE WINDOW

建表語句

建表語句只有 數據流表、輸出表、lookup join 輸出表


CREATE TABLE user_log
(
     user_id     VARCHAR
    ,item_id     VARCHAR
    ,category_id VARCHAR
    ,behavior    VARCHAR
    ,ts          TIMESTAMP(3)
    ,proc_time   as PROCTIME()
    ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
      'connector' = 'kafka'
      ,'topic' = 'user_log'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
);

create table if not exists user_log_lookup_join(
    cal_day varchar
    ,behavior varchar
    ,pv  bigint
    ,uv  bigint
    ,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
    ) with (
          'connector' = 'jdbc'
          ,'url' = 'jdbc:mysql://localhost:3306/venn'
          ,'table-name' = 'pv_uv'
          ,'username' = 'root'
          ,'password' = '123456'
          ,'scan.partition.column' = 'cal_day'
          ,'scan.partition.num' = '1'
          ,'scan.partition.lower-bound' = '0'
          ,'scan.partition.upper-bound' = '9999'
          ,'lookup.cache.max-rows' = '1000'
        -- one day, once cache, the value will not update
          ,'lookup.cache.ttl' = '86400000' -- ttl time 超過這么長時間無數據才行
    );


create table if not exists user_log_sink(
    cal_day varchar
    ,behavior varchar
    ,start_time VARCHAR
    ,end_time VARCHAR
    ,pv  bigint
    ,uv  bigint
    ,last_pv  bigint
    ,last_uv  bigint
    ,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
) with (
--      'connector' = 'print'
      'connector' = 'jdbc'
      ,'url' = 'jdbc:mysql://venn:3306/venn'
      ,'table-name' = 'pv_uv'
      ,'username' = 'root'
      ,'password' = '123456'
);


思路 1

就是個簡單的 CUMULATE 的 一天的窗口,統計 count/count distinct ,窗口的觸發事件是 10 秒一次

sql 如下:


insert into user_log_sink
select
 date_format(window_start, 'yyyy-MM-dd') cal_day
 ,behavior
 ,date_format(window_start, 'HH:mm:ss') start_time
 , date_format(window_end, 'HH:mm:ss') end_time
 , count(user_id) pv
 , count(distinct user_id) uv
FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECOND, INTERVAL '1' DAY))
  GROUP BY window_start, window_end, behavior
;

結論: 這個只能實時輸出當天的 pv、uv,不能計算歷史的 pv、uv

思路 2

在 思路 1 的基礎上,關聯昨天的結果

sql 如下:

insert into user_log_sink
select
     a.cal_day
    ,a.behavior
    ,'' start_time
    ,date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss')
    ,a.pv + COALESCE(c.pv,0) -- add last
    ,a.uv
    ,c.pv last_uv
    ,c.uv last_uv
from(
    select
     date_format(window_start, 'yyyy-MM-dd') cal_day
     ,behavior
     ,max(proc_time) proc_time
     ,count(user_id) pv
     ,count(distinct user_id) uv
    FROM TABLE(
        CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECOND, INTERVAL '1' DAY))
      GROUP BY window_start, window_end, behavior
        )a
        left join user_log_lookup_join FOR SYSTEM_TIME AS OF a.proc_time AS c
                  ON  a.behavior = c.behavior
                      and udf_date_add_new(date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss'), -1) = c.cal_day
;


結論: CUMULATE 窗口計算當天的 pv、uv,加上昨天的 pv,即可拿到累加的 pv,uv 還是只有今天的(uv 的值累加沒有意義)

思路 3

在思路 2 的基礎上,使用 bloom filter 來計算 uv

sql 如下:

insert into user_log_sink
select
     a.cal_day
    ,a.behavior
    ,'' start_time
    ,date_format(a.ts, 'yyyy-MM-dd HH:mm:ss')
    ,a.pv + COALESCE(c.pv,0) -- add last
    ,a.uv + COALESCE(c.uv,0)
    ,c.pv last_uv
    ,c.uv last_uv
from(
    select
     date_format(window_start, 'yyyy-MM-dd') cal_day
     ,behavior
     ,max(ts) ts
     ,max(proc_time) proc_time
     ,count(user_id) pv
     ,udaf_uv_count(user_id) uv
    FROM TABLE(
        CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' minute, INTERVAL '1' day))
      GROUP BY window_start, window_end, behavior
        )a
        left join user_log_lookup_join FOR SYSTEM_TIME AS OF a.proc_time AS c
                  ON  a.behavior = c.behavior
                      and udf_date_add_new(date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss'), -1) = c.cal_day
;

bloom filter 在 udaf_uv_count 中實現的


public class BloomFilter extends AggregateFunction<Integer, CountAcc > {

    private final static Logger LOG = LoggerFactory.getLogger(BloomFilter.class);
    private com.google.common.hash.BloomFilter<byte[]> filter;
    @Override
    public void open(FunctionContext context) throws Exception {
        LOG.info("bloom filter open...");
        // 創建布隆過濾器對象, 預期數據量,誤判率
        filter = com.google.common.hash.BloomFilter.create(
                Funnels.byteArrayFunnel(),
                1000 * 10000,
                0.01);
    }

    public void accumulate(CountAcc acc, String userId) {

        if (userId == null || userId.length() == 0) {
            return;
        }
        // parse userId to byte
        byte[] arr = userId.getBytes(StandardCharsets.UTF_8);
        // check userId exists bloom filter
        if(!filter.mightContain(arr)){
            // not exists
            filter.put(arr);
            // count ++
            acc.count += 1;
        }

    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public Integer getValue(CountAcc acc) {
        // get
        return acc.count;
    }

    @Override
    public CountAcc createAccumulator() {
        CountAcc acc = new CountAcc();
        return acc;
    }

    public void merge(CountAcc acc, Iterable<CountAcc> it) {
        int last = acc.count;
        StringBuilder builder = new StringBuilder();
        for (CountAcc a : it) {
            acc.count += a.count;
        }
    }

}

結論: pv 如思路2, uv 值只能拿到當前窗口的
原因:
1. bloom filter 不能返回 uv 的數據
2. 累加器里面只有當前窗口的數據
3. udaf 里面無法獲取窗口狀態(開始、結束)無法用全局變量記錄上一窗口數據

注: 大佬們可以自己嘗試

思路 4

在思路 2 的基礎上,每次將新的 user_id 放入 redis中,getValue 的時候去redis 獲取全量的 user_id

SQL 如下:

insert into user_log_sink
select
     a.cal_day
    ,a.behavior
    ,'' start_time
    ,date_format(a.ts, 'yyyy-MM-dd HH:mm:ss')
    ,a.pv + COALESCE(c.pv,0) -- add last
    ,a.uv + COALESCE(c.uv,0)
    ,c.pv last_uv
    ,c.uv last_uv
from(
    select
     date_format(window_start, 'yyyy-MM-dd') cal_day
     ,behavior
     ,max(ts) ts
     ,max(proc_time) proc_time
     ,count(user_id) pv
     ,udaf_redis_uv_count('user_log_uv', user_id) uv
    FROM TABLE(
        CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' minute, INTERVAL '1' day))
      GROUP BY window_start, window_end, behavior
        )a
        left join user_log_lookup_join FOR SYSTEM_TIME AS OF a.proc_time AS c
                  ON  a.behavior = c.behavior
                      and udf_date_add_new(date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss'), -1) = c.cal_day
;

udf 實現如下:


/**
 * accumulate add user_id to redis
 * getValue: get all redis user_id, count the uv
 */
public class RedisUv extends AggregateFunction<Integer, Integer> {

    private final static Logger LOG = LoggerFactory.getLogger(RedisUv.class);
    // "redis://localhost"
    private String url;
    private StatefulRedisConnection<String, String> connection;
    private RedisClient redisClient;
    private RedisCommands<String, String> sync;
    private String key;

    public RedisUv(String url, String key ) {
        this.url = url;
        this.key = key;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        // connect redis
        reconnect();
    }

    public void reconnect() {
        redisClient = RedisClient.create(this.url);
        connection = redisClient.connect();
        sync = connection.sync();
    }

    public void accumulate(Integer acc, String key, String userId) {

//        if (this.key == null) {
//            this.key = key;
//        }
        int retry = 3;
        while (retry >= 1) {
            try {
                sync.hset(key, userId, "0");
                return;
            } catch (Exception e) {
                LOG.info("set redis error, retry");
                reconnect();
                retry -= 1;
            }
        }

    }

    @Override
    public Integer getValue(Integer accumulator) {
        long start = System.currentTimeMillis();
        int size = 0;
        if (this.key == null) {
            return size;
        }
        // get all userId, count size
        int retry = 3;
        while (retry >= 1) {
            try {
                size = sync.hgetall(this.key).size();
                break;
            } catch (Exception e) {
                LOG.info("set redis error, retry");
                reconnect();
                retry -= 1;
            }
        }
        long end = System.currentTimeMillis();
        LOG.info("count all cost : " + (end - start));
        return size;
    }

    @Override
    public Integer createAccumulator() {
        return 0;
    }

    public void merge(Integer acc, Iterable<Integer> it) {
        // do nothing
    }
}



結論: pv 計算如思路2,並且可以精確計算歷史的 uv,但是有個嚴重的性能問題(docker 單機 redis,百萬 user_id,計算一次耗時 500 ms 以上。隨着 用戶數據增多,耗時還會加長)

注: 有個問題,從 accumulate 傳入的 key,在 udaf 中不是全局可見的, accumulate 和 getValue 不在一個線程中執行(甚至不在一台服務器上)

思路 5

測試了一下 100 萬個數字,放在 map 中,gc 顯示,用了 300+ M 的內存,直接放棄


Heap
 PSYoungGen      total 547840K, used 295484K [0x0000000715580000, 0x0000000738180000, 0x00000007c0000000)
  eden space 526336K, 52% used [0x0000000715580000,0x000000072610f248,0x0000000735780000)
  from space 21504K, 100% used [0x0000000736c80000,0x0000000738180000,0x0000000738180000)
  to   space 21504K, 0% used [0x0000000735780000,0x0000000735780000,0x0000000736c80000)
 ParOldGen       total 349696K, used 158905K [0x00000005c0000000, 0x00000005d5580000, 0x0000000715580000)
  object space 349696K, 45% used [0x00000005c0000000,0x00000005c9b2e410,0x00000005d5580000)
 Metaspace       used 15991K, capacity 16444K, committed 16512K, reserved 1062912K
  class space    used 2022K, capacity 2173K, committed 2176K, reserved 1048576K

思路 6

直接全局窗口計算pv、uv 也不太顯示,首先沒有不能實時輸出結果,其次也沒有歷史值

結論

  1. 如果只要最近一段時間的,直接用 CUMULATE 窗口就可以了
  2. 統計歷史的 pv,可以用當日的pv,加上歷史值來計算
  3. 統計全量歷史的 uv,還是 stream api 比較好,不管是用狀態還是 bloom filter 這里的算法解決,都挺方便的

完整代碼參考:flink sqlSubmit

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM