Flink基礎(十九):Table API 和 Flink SQL(四)窗口


1 常規窗口 

  時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看Table API和SQL中,怎么利用時間字段做窗口操作。

在Table API和SQL中,主要有兩種窗口:Group Windows和Over Windows

1.1 分組窗口

分組窗口(Group Windows)會根據時間或行計數間隔,將行聚合到有限的組(Group)中,並對每個組的數據執行一次聚合函數。

Table API中的Group Windows都是使用.window(w:GroupWindow)子句定義的,並且必須由as子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在group by子句中,像常規的分組字段一樣引用。

val table = input
  .window([w: GroupWindow] as $"w") // 定義窗口,別名 w
  .groupBy($"w", $"a")  // 以屬性a和窗口w作為分組的key
  .select($"a", $"b".sum)  // 聚合字段b的值,求和

或者,還可以把窗口的相關信息,作為字段添加到結果表中:

val table = input
  .window([w: GroupWindow] as $"w")
  .groupBy($"w", $"a")
  .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count)

Table API提供了一組具有特定語義的預定義Window類,這些類會被轉換為底層DataStream或DataSet的窗口操作。

Table API支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding)和會話(Session)。

1.2 滾動窗口

滾動窗口(Tumbling windows)要用Tumble類來定義,另外還有三個方法:

  • over:定義窗口長度
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的groupBy中

代碼如下:

// Tumbling Event-time Window(事件時間字段rowtime
.window(Tumble over 10.minutes on $"rowtime" as $"w")
// Tumbling Processing-time Window(處理時間字段proctime)
.window(Tumble over 10.minutes on $"proctime" as $"w")
// Tumbling Row-count Window (類似於計數窗口,按處理時間排序,10行一組)
.window(Tumble over 10.rows on $"proctime" as $"w")

1.3 滑動窗口

滑動窗口(Sliding windows)要用Slide類來定義,另外還有四個方法:

  • over:定義窗口長度
  • every:定義滑動步長
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的groupBy中

代碼如下:

// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
// Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")
// Sliding Row-count window
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")

1.4 會話窗口

會話窗口(Session windows)要用Session類來定義,另外還有三個方法:

  • withGap:會話時間間隔
  • on:用來分組(按時間間隔)或者排序(按行數)的時間字段
  • as:別名,必須出現在后面的groupBy中

代碼如下:

// Session Event-time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
// Session Processing-time Window
.window(Session withGap 10.minutes on $"proctime" as $"w")

2 Over Windows

Over window聚合是標准SQL中已有的(Over子句),可以在查詢的SELECT子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合。Over windows使用.window(w:overwindows*)子句定義,並在select()方法中通過別名來引用。

比如這樣:

val table = input
  .window([w: OverWindow] as $"w")
  .select($"a"", $"b".sum over $"w", $"c".min over $"w")

Table API提供了Over類,來配置Over窗口的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的范圍內,定義Over windows。

無界的over window是使用常量指定的。也就是說,時間間隔要指定UNBOUNDED_RANGE,或者行計數間隔要指定UNBOUNDED_ROW。而有界的over window是用間隔的大小指定的。

實際代碼應用如下:

  1. 無界的 over window
// 無界的事件時間over window (時間字段 "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w")
//無界的處理時間over window (時間字段"proctime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_RANGE as $"w")
// 無界的事件時間Row-count over window (時間字段 "rowtime")
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_ROW as $"w")
//無界的處理時間Row-count over window (時間字段 "rowtime")
.window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_ROW as $"w")
  1. 有界的over window
// 有界的事件時間over window (時間字段 "rowtime",之前1分鍾)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as $"w")
// 有界的處理時間over window (時間字段 "rowtime",之前1分鍾)
.window(Over partitionBy $"a" orderBy $"proctime" preceding 1.minutes as $"w")
// 有界的事件時間Row-count over window (時間字段 "rowtime",之前10行)
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 10.rows as $"w")
// 有界的處理時間Row-count over window (時間字段 "rowtime",之前10行)
.window(Over partitionBy $"a" orderBy $"proctime" preceding 10.rows as $"w")

3 SQL中窗口的定義

我們已經了解了在Table API里window的調用方式,同樣,我們也可以在SQL中直接加入窗口的定義和使用。

3.1 Group Windows

Group Windows在SQL查詢的Group BY子句中定義。與使用常規GROUP BY子句的查詢一樣,使用GROUP BY子句的查詢會計算每個組的單個結果行。

SQL支持以下Group窗口函數:

  • TUMBLE(time_attr, interval)

定義一個滾動窗口,第一個參數是時間字段,第二個參數是窗口長度。

  • HOP(time_attr, interval, interval)

定義一個滑動窗口,第一個參數是時間字段,第二個參數是窗口滑動步長,第三個是窗口長度。

  • SESSION(time_attr, interval)

定義一個會話窗口,第一個參數是時間字段,第二個參數是窗口間隔(Gap)。

另外還有一些輔助函數,可以用來選擇Group Window的開始和結束時間戳,以及時間屬性。

這里只寫TUMBLE_,滑動和會話窗口是類似的(HOP_,SESSION_*)。

  • TUMBLE_START(time_attr, interval)
  • TUMBLE_END(time_attr, interval)
  • TUMBLE_ROWTIME(time_attr, interval)
  • TUMBLE_PROCTIME(time_attr, interval)

3.2 Over Windows

由於Over本來就是SQL內置支持的語法,所以這在SQL中屬於基本的聚合操作。所有聚合必須在同一窗口上定義,也就是說,必須是相同的分區、排序和范圍。目前僅支持在當前行范圍之前的窗口(無邊界和有邊界)。

注意,ORDER BY必須在單一的時間屬性上指定。

代碼如下:

SELECT COUNT(amount) OVER (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders

// 也可以做多個聚合
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

4 代碼練習(以分組滾動窗口為例)

我們可以綜合學習過的內容,用一段完整的代碼實現一個具體的需求。例如,可以開一個滾動窗口,統計10秒內出現的每個sensor的個數。

代碼如下:

scala version

object TumblingWindowTempCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)

    val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1"))

    val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())

    // table api
    val tableResult = table
      .window(Tumble over 10.seconds() on $"pt" as $"w")
      .groupBy($"id", $"w") // .keyBy(r => r.id).timeWindow(Time.seconds(10))
      .select($"id", $"id".count())

    tableEnv.toRetractStream[Row](tableResult).print()

    // sql
    tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())

    val sqlResult = tableEnv
      .sqlQuery("SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)")

    tableEnv.toRetractStream[Row](sqlResult).print()

    env.execute()
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM