如何用flink sql寫,3h,7h,1d內pv,uv


一個同學在群里問的這個問題,剛好好久沒寫過window 的sql了,玩一玩

手上沒有環境,一起從簡了

使用 datagen 生成數據,id 隨機生成,最小值

輸出直接到 console 窗口

-- flink window tvf calc pv&uv
create table if not exists datagen_source (
    id        int
    ,name     string
    ,sex      string
    ,age      int
    ,birthday string
    ,proc_time as proctime()
) with (
    'connector' = 'datagen'
    ,'rows-per-second' = '10000'
    ,'fields.id.kind' = 'random'
    ,'fields.id.min' = '1'
    ,'fields.id.max' = '2000000'
);

create table if not exists print_sink(
    start_time string
    ,end_time string
    ,pv  bigint
    ,uv  bigint
) with (
    'connector' = 'print'
);

insert into print_sink
select
 date_format(window_start, 'HH:mm:ss')
 , date_format(window_end, 'HH:mm:ss')
 , count(id)
 , count(distinct id)
  FROM TABLE(
    TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND ))
  GROUP BY window_start, window_end
union all
select
 date_format(window_start, 'HH:mm:ss')
 , date_format(window_end, 'HH:mm:ss')
 , count(id)
 , count(distinct id)
  FROM TABLE(
    TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '20' SECOND ))
  GROUP BY window_start, window_end
  union all
select
 date_format(window_start, 'HH:mm:ss')
 , date_format(window_end, 'HH:mm:ss')
 , count(id)
 , count(distinct id)
  FROM TABLE(
    TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '30' SECOND ))
  GROUP BY window_start, window_end
  ;

查看結果:

+I[10:45:00, 10:45:20, 20000, 19900]   #  20 s
+I[10:45:10, 10:45:20, 20000, 19913]   #  10 s
+I[10:45:00, 10:45:30, 120000, 116420] #  30 s
+I[10:45:20, 10:45:30, 100000, 97497]
+I[10:45:30, 10:45:40, 100000, 97558]
+I[10:45:20, 10:45:40, 200000, 190314]

流圖:

功能倒是實現了,有點麻煩的是,現在 SQL api 沒有 trigger,不能中途輸出計算結果,幾分鍾的窗口結束的時候輸出數據還可以,小時、天的窗口,要窗口結束才輸出一次結果,那還不如跑離線

注: Window TVF 支持 GROUPING SETS、ROLLUP、CUBE

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM