Flink Cumulate Window


Flink 累計窗口

接上篇: [如何用flink sql寫,3h,7h,1d內pv,uv] (https://www.cnblogs.com/Springmoon-venn/p/15179311.html)

如何用flink sql寫,3h,7h,1d內pv,uv

之前一直比較遺憾,Flink Sql 沒有 Trigger 功能,長時間的窗口不能在中途觸發計算,輸出中間結果。而很多實時指標是小時、天級的累集窗口,比如大屏中的當日 pv、uv,整體是一天中所有訪問的次數和用戶數,但是需要實時更新,比如每 10S 更新一次截止到當前的pv、uv。

這種場景使用 Streaming Api 很容易實現,就是個天的翻滾窗口加上 10S 的 Trigger 就可以了

.windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))

Flink 1.13 版本以前,Sql 中還不能實現這個功能,1.13 添加了 CUMULATE 窗口,可以支持這種場景。

(感謝 antigeneral 同學提醒有這個功能)

以下為官網介紹:

累積窗口在某些情況下非常有用,例如在固定窗口間隔內提前觸發的滾動窗口。例如,每日儀表板繪制從 00:00 到10:00 處每分鍾累積的 UV, UV 表示從 00:00 到 10:00 的 UV 總數。這可以通過 CUMULATE 窗口輕松地實現。

CUMULATE 函數將元素分配給覆蓋初始步長間隔內的窗口,並每一步擴展到一個步長(保持窗口開始固定),直到最大窗口大小。您可以將 CUMULATE 函數視為首先應用 TUMBLE 最大窗口大小的窗口,然后將每個滾動窗口拆分為具有相同窗口開始和窗口結束步長差異的多個窗口。所以累積窗口確實重疊並且沒有固定的大小。

例如,您可以有一個 1 小時步長和 1 天最大大小的累積窗口,並且您將獲得每天的窗口:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), ..., [00:00, 24:00)。

這些CUMULATE函數根據時間屬性分配窗口的返回值CUMULATE是一個新的關系,包括原始關系的所有列以及額外的 3 列名為“window_start”、“window_end”、“window_time”以指示分配的窗口。原始時間屬性“timecol”將是窗口 TVF 之后的常規時間戳列。

CUMULATE 需要三個必需的參數。

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size) 
  • data: 是一個表參數,可以與時間屬性列有任何關系。
  • timecol: 是一個列描述符,指示數據的哪些時間屬性列應該映射到滾動窗口。
  • step: 是指定順序累積窗口結束之間增加的窗口大小的持續時間。
  • size: 是指定累積窗口最大寬度的持續時間。size 必須是 step 的整數倍。

這是 Bid 表上的示例調用:

-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
--  window table-valued function should be used with aggregate operation,
--  this example is just used for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    CUMULATE(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      STEP => INTERVAL '2' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
 +------------------+-------+------+------------------+------------------+-------------------------+ | bidtime | price | item | window_start | window_end | window_time | +------------------+-------+------+------------------+------------------+-------------------------+ | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 | | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 | | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 | | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 | | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 | | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 | | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 | | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 | | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 | | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 | | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 | | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 | | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 | +------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the cumulating windowed table
> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +------------------+------------------+-------+

更多內容請查看官網: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/#cumulate

實現案例:

-- flink cumulate window tvf calc pv&uv
create table if not exists datagen_source (
    id        int
    ,name     string
    ,sex      string
    ,age      int
    ,birthday string
    ,proc_time as proctime()
) with (
    'connector' = 'datagen'
    ,'rows-per-second' = '10000'
    ,'fields.id.kind' = 'random'
    ,'fields.id.min' = '1'
    ,'fields.id.max' = '2000000'
);

create table if not exists print_sink(
    start_time string
    ,end_time string
    ,pv  bigint
    ,uv  bigint
) with (
    'connector' = 'print'
);

insert into print_sink
select
 date_format(window_start, 'HH:mm:ss')
 , date_format(window_end, 'HH:mm:ss')
 , count(id)
 , count(distinct id)
  FROM TABLE(
    CUMULATE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND, INTERVAL '1' DAY))
  GROUP BY window_start, window_end

輸出結果:

+I[00:00:00, 09:22:40, 8880000, 1976509]
+I[00:00:00, 09:22:50, 8980000, 1977652]
+I[00:00:00, 09:23:00, 9080000, 1978750]
+I[00:00:00, 09:23:10, 9180000, 1979766]
+I[00:00:00, 09:23:20, 9280000, 1980767]

完整案例參考 GitHub:  https://github.com/springMoon/sqlSubmit

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM