一個同學在群里問的這個問題,剛好好久沒寫過window 的sql了,玩一玩
手上沒有環境,一起從簡了
使用 datagen 生成數據,id 隨機生成,最小值
輸出直接到 console 窗口
-- flink window tvf calc pv&uv create table if not exists datagen_source ( id int ,name string ,sex string ,age int ,birthday string ,proc_time as proctime() ) with ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'fields.id.kind' = 'random' ,'fields.id.min' = '1' ,'fields.id.max' = '2000000' ); create table if not exists print_sink( start_time string ,end_time string ,pv bigint ,uv bigint ) with ( 'connector' = 'print' ); insert into print_sink select date_format(window_start, 'HH:mm:ss') , date_format(window_end, 'HH:mm:ss') , count(id) , count(distinct id) FROM TABLE( TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND )) GROUP BY window_start, window_end union all select date_format(window_start, 'HH:mm:ss') , date_format(window_end, 'HH:mm:ss') , count(id) , count(distinct id) FROM TABLE( TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '20' SECOND )) GROUP BY window_start, window_end union all select date_format(window_start, 'HH:mm:ss') , date_format(window_end, 'HH:mm:ss') , count(id) , count(distinct id) FROM TABLE( TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '30' SECOND )) GROUP BY window_start, window_end ;
查看結果:
+I[10:45:00, 10:45:20, 20000, 19900] # 20 s +I[10:45:10, 10:45:20, 20000, 19913] # 10 s +I[10:45:00, 10:45:30, 120000, 116420] # 30 s +I[10:45:20, 10:45:30, 100000, 97497] +I[10:45:30, 10:45:40, 100000, 97558] +I[10:45:20, 10:45:40, 200000, 190314]
流圖:
功能倒是實現了,有點麻煩的是,現在 SQL api 沒有 trigger,不能中途輸出計算結果,幾分鍾的窗口結束的時候輸出數據還可以,小時、天的窗口,要窗口結束才輸出一次結果,那還不如跑離線
注: Window TVF 支持 GROUPING SETS、ROLLUP、CUBE
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文