【翻譯】Flink Table Api & SQL —— Table API


本文翻譯自官網:Table API  https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tableApi.html

Flink Table Api & SQL 翻譯目錄

Table API是用於流和批處理的統一的關系API。 Table API查詢可以在批處理或流輸入上運行而無需修改。 Table API是SQL語言的超集,是專門為與Apache Flink配合使用而設計的。 Table API是用於Scala和Java的語言集成的API。 Table API查詢不是將查詢指定為SQL常見的String值,而是以Java或Scala中的語言嵌入樣式定義,並具有IDE支持,例如自動完成和語法驗證。

Table API與Flink的SQL集成共享其API的許多概念和部分。 看一下Common Concepts&API,了解如何注冊表或創建Table對象。 “流概念”頁面討論了流的特定概念,例如動態表和時間屬性。

下面的示例假定具有屬性(a,b,c,rowtime)的已注冊表 Orders。 rowtime字段是流中的邏輯時間屬性,或者是批處理中的常規時間戳字段。

概述與范例

 Table API可用於Scala和Java。 Scala Table API利用Scala表達式,Java Table API基於已解析並轉換為等效表達式的字符串。

以下示例顯示了Scala和Java Table API之間的區別。 該表程序在批處理環境中執行。 它將掃描 Orders 表,按字段 a 進行分組,並計算每組的結果行。 該表程序的結果將轉換為Row類型的數據集並進行打印。

通過導入org.apache.flink.api.scala._和org.apache.flink.table.api.scala._來啟用Scala Table API。

以下示例顯示了Scala Table API程序的構造方式。 表屬性是使用Scala符號引用的,Scala符號以撇號(')開頭。

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._

// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

// register Orders table in table environment
// ...

// specify table program
val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)

val result = orders
               .groupBy('a)
               .select('a, 'b.count as 'cnt)
               .toDataSet[Row] // conversion to DataSet
               .print()

下一個示例顯示了一個更復雜的Table API程序。 程序再次掃描 Orders
表。 它過濾空值,對String類型的字段 a 進行歸一化,並針對每個小時計算並產生a平均帳單金額b。

// environment configuration
// ...

// specify table program
val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)

val result: Table = orders
        .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
        .select('a.lowerCase() as 'a, 'b, 'rowtime)
        .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
        .groupBy('hourlyWindow, 'a)
        .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)

由於Table API是用於批處理和流數據的統一API,因此兩個示例程序都可以在批處理和流輸入上執行,而無需對表程序本身進行任何修改。 在這兩種情況下,只要流記錄不晚,程序都會產生相同的結果(有關詳細信息,請參見流概念)。

Operations

Table API支持以下操作。 請注意,不是所有的操作都可以批量和流式傳輸。 它們被相應地標記。

Scan, Projection, and Filter

Operators Description
Scan
Batch Streaming

類似於SQL查詢中的FROM子句。 掃描已注冊的表。

val orders: Table = tableEnv.scan("Orders")
Select
Batch Streaming

類似於SQL SELECT語句。 執行選擇操作。

val orders: Table = tableEnv.scan("Orders") val result = orders.select('a, 'c as 'd)

您可以使用星號(*)充當通配符,選擇表中的所有列。

val orders: Table = tableEnv.scan("Orders") val result = orders.select('*)
As
Batch Streaming

Renames fields.

val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't)
Where / Filter
Batch Streaming

類似於SQL WHERE子句。 過濾掉未通過過濾謂詞的行。

val orders: Table = tableEnv.scan("Orders") val result = orders.filter('a % 2 === 0)
or
val orders: Table = tableEnv.scan("Orders") val result = orders.where('b === "red")

Column Operations

Operators Description
AddColumns
Batch Streaming

執行字段添加操作。 如果添加的字段已經存在,它將引發異常。

val orders = tableEnv.scan("Orders"); val result = orders.addColumns(concat('c, "Sunny"))
AddOrReplaceColumns
Batch Streaming

執行字段添加操作。 如果添加列名稱與現有列名稱相同,則現有字段將被替換。 此外,如果添加的字段具有重復的字段名稱,則使用最后一個。

val orders = tableEnv.scan("Orders"); val result = orders.addOrReplaceColumns(concat('c, "Sunny") as 'desc)
DropColumns
Batch Streaming

執行字段刪除操作。 字段表達式應該是字段引用表達式,並且只能刪除現有字段。

val orders = tableEnv.scan("Orders"); val result = orders.dropColumns('b, 'c)
RenameColumns
Batch Streaming

執行字段重命名操作。 字段表達式應該是別名表達式,並且只能重命名現有字段。

val orders = tableEnv.scan("Orders"); val result = orders.renameColumns('b as 'b2, 'c as 'c2)

Aggregations

Operators Description
GroupBy Aggregation
Batch Streaming
Result Updating

類似於SQL GROUP BY子句。 使用以下正在運行的聚合運算符將分組鍵上的行分組,以逐行聚合行。

val orders: Table = tableEnv.scan("Orders") val result = orders.groupBy('a).select('a, 'b.sum as 'd)

注意:對於流式查詢,根據聚合的類型和不同的分組鍵的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置

GroupBy Window Aggregation
Batch Streaming

在 group window 和可能的一個或多個分組鍵上對表進行分組和聚集。

val orders: Table = tableEnv.scan("Orders") val result: Table = orders .window(Tumble over 5.minutes on 'rowtime as 'w) // define window .groupBy('a, 'w) // group by key and window .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate
Over Window Aggregation
Streaming

類似於SQL OVER子句。 基於前一行和后一行的窗口(范圍),為每一行計算窗口聚合。 有關更多詳細信息,請參見Windows部分

val orders: Table = tableEnv.scan("Orders") val result: Table = orders // define window .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate

注意:必須在同一窗口(即相同的分區,排序和范圍)上定義所有聚合。 當前,僅支持PRECEDING(無邊界和有界)到CURRENT ROW范圍的窗口。 目前尚不支持帶有FOLLOWING的范圍。 必須在單個時間屬性上指定ORDER BY。

Distinct Aggregation
Batch Streaming 
Result Updating

類似於SQL DISTINCT AGGREGATION子句,例如COUNT(DISTINCT a)。 不同的聚合聲明聚合函數(內置或用戶定義的)僅應用於不同的輸入值。 可以將不同應用於GroupBy聚合,GroupBy窗口聚合和Over Window聚合。

val orders: Table = tableEnv.scan("Orders"); // Distinct aggregation on group by val groupByDistinctResult = orders .groupBy('a) .select('a, 'b.sum.distinct as 'd) // Distinct aggregation on time window group by val groupByWindowDistinctResult = orders .window(Tumble over 5.minutes on 'rowtime as 'w).groupBy('a, 'w) .select('a, 'b.sum.distinct as 'd) // Distinct aggregation on over window val result = orders .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) .select('a, 'b.avg.distinct over 'w, 'b.max over 'w, 'b.min over 'w)

用戶定義的聚合函數也可以與DISTINCT修飾符一起使用。 要僅針對不同值計算聚合結果,只需向聚合函數添加distinct修飾符即可。

val orders: Table = tEnv.scan("Orders"); // Use distinct aggregation for user-defined aggregate functions val myUdagg = new MyUdagg(); orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctResult);

注意:對於流式查詢,計算查詢結果所需的狀態可能會無限增長,具體取決於不同字段的數量。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。.

Distinct
Batch Streaming 
Result Updating
類似於SQL DISTINCT子句。 返回具有不同值組合的記錄。val orders: Table = tableEnv.scan("Orders") val result = orders.distinct()

注意:對於流式查詢,計算查詢結果所需的狀態可能會無限增長,具體取決於不同字段的數量。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 如果啟用了狀態清除功能,那么distinct必須發出消息,

以防止下游運算符過早地退出狀態,這會導致distinct包含結果更新。 有關詳細信息,請參見查詢配置。

Joins

Operators Description
Inner Join
Batch Streaming

類似於SQL JOIN子句。 連接兩個表。 兩個表必須具有不同的字段名稱,並且至少一個相等的聯接謂詞必須通過聯接運算符或使用where或filter運算符進行定義。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'd, 'e, 'f) val result = left.join(right).where('a === 'd).select('a, 'b, 'e)

注意:對於流式查詢,根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。

Outer Join
Batch StreamingResult Updating

類似於SQL LEFT / RIGHT / FULL OUTER JOIN子句。 連接兩個表。 兩個表必須具有不同的字段名稱,並且必須至少定義一個相等聯接謂詞。

val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c) val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f) val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e) val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e) val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)

注意:對於流式查詢,根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。

Time-windowed Join
Batch Streaming

注意:時間窗口聯接是可以以流方式處理的常規聯接的子集。

時間窗聯接需要至少一個等聯接謂詞和在兩側限制時間的聯接條件。 可以通過兩個適當的范圍謂詞(<,<=,> =,>)或比較兩個輸入表的相同類型的時間屬性(即處理時間或事件時間)的單個相等謂詞來定義這種條件.

例如,以下謂詞是有效的窗口連接條件:

  • 'ltime === 'rtime
  • 'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes
val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime) val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime) val result = left.join(right) .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes) .select('a, 'b, 'e, 'ltime)
Inner Join with Table Function (UDTF)
Batch Streaming

用表函數的結果聯接表。 左(外)表的每一行都與表函數的相應調用產生的所有行連接在一起。 如果左表(外部)的表函數調用返回空結果,則該行將被刪除。

// instantiate User-Defined Table Function val split: TableFunction[_] = new MySplitUDTF() // join val result: Table = table .joinLateral(split('c) as ('s, 't, 'v)) .select('a, 'b, 's, 't, 'v)
Left Outer Join with Table Function (UDTF)
Batch Streaming

用表函數的結果聯接表。 左(外)表的每一行都與表函數的相應調用產生的所有行連接在一起。 如果表函數調用返回空結果,則將保留對應的外部行,並用空值填充結果。

注意:當前,左外部聯接的表函數的謂詞只能為空或字面值true。

// instantiate User-Defined Table Function val split: TableFunction[_] = new MySplitUDTF() // join val result: Table = table .leftOuterJoinLateral(split('c) as ('s, 't, 'v)) .select('a, 'b, 's, 't, 'v)
Join with Temporal Table
Streaming

時態表是跟蹤其隨時間變化的表。

時態表功能提供對特定時間點時態表狀態的訪問。 使用臨時表函數聯接表的語法與使用表函數進行內部聯接的語法相同。

當前僅支持使用臨時表的內部聯接。

val ratesHistory = tableEnv.scan("RatesHistory") // register temporal table function with a time attribute and primary key val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // join with "Orders" based on the time attribute and key val orders = tableEnv.scan("Orders") val result = orders .joinLateral(rates('o_rowtime), 'r_currency === 'o_currency)

有關更多信息,請檢查更詳細的時態表概念描述。

Set Operations

Operators Description
Union
Batch

類似於SQL UNION子句。 合並兩個已刪除重復記錄的表,兩個表必須具有相同的字段類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.union(right)
UnionAll
Batch Streaming

類似於SQL UNION ALL子句。 合並兩個表,兩個表必須具有相同的字段類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.unionAll(right)
Intersect
Batch

類似於SQL INTERSECT子句。 相交返回兩個表中都存在的記錄。 如果一個記錄在一個或兩個表中多次出現,則僅返回一次,即結果表中沒有重復的記錄。 兩個表必須具有相同的字段類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'e, 'f, 'g) val result = left.intersect(right)
IntersectAll
Batch

類似於SQL INTERSECT ALL子句。 IntersectAll返回兩個表中都存在的記錄。 如果一個記錄在兩個表中都存在一次以上,則返回的次數與兩個表中存在的次數相同,即,結果表可能具有重復的記錄。 兩個表必須具有相同的字段類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'e, 'f, 'g) val result = left.intersectAll(right)
Minus
Batch

類似於SQL EXCEPT子句。 減號從左表返回不存在於右表中的記錄。 左表中的重復記錄僅返回一次,即刪除了重復記錄。 兩個表必須具有相同的字段類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.minus(right)
MinusAll
Batch

類似於SQL EXCEPT ALL子句。 MinusAll返回右表中不存在的記錄。 將返回(n-m)次在左側表中出現n次,在右側表中出現m次的記錄,即刪除與右側表中存在的重復項一樣多的記錄。 兩個表必須具有相同的字段類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'a, 'b, 'c) val result = left.minusAll(right)
In
Batch Streaming

類似於SQL IN子句。 如果給定的表子查詢中存在表達式,則In返回true。 子查詢表必須由一列組成。 此列必須與表達式具有相同的數據類型。

val left = ds1.toTable(tableEnv, 'a, 'b, 'c) val right = ds2.toTable(tableEnv, 'a) val result = left.select('a, 'b, 'c).where('a.in(right))

注意:對於流查詢,該操作將在聯接和組操作中重寫。 根據不同輸入行的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。

OrderBy, Offset & Fetch

Operators Description
Order By
Batch
類似於SQL ORDER BY子句。 返回在所有並行分區上全局排序的記錄。val in = ds.toTable(tableEnv, 'a, 'b, 'c) val result = in.orderBy('a.asc)
Offset & Fetch
Batch
與SQL OFFSET和FETCH子句類似。 偏移量和提取限制從排序結果返回的記錄數。 偏移和提取在技術上是Order By運算符的一部分,因此必須在其之前。

val in = ds.toTable(tableEnv, 'a, 'b, 'c) // returns the first 5 records from the sorted result val result1: Table = in.orderBy('a.asc).fetch(5) // skips the first 3 records and returns all following records from the sorted result val result2: Table = in.orderBy('a.asc).offset(3) // skips the first 10 records and returns the next 5 records from the sorted result val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)

Insert

Operators Description
Insert Into
Batch Streaming

與SQL查詢中的INSERT INTO子句相似。 在已插入的輸出表中執行插入。

輸出表必須在TableEnvironment中注冊(請參閱注冊TableSink)。 此外,已注冊表的架構必須與查詢的架構匹配。

val orders: Table = tableEnv.scan("Orders") orders.insertInto("OutOrders")

Group Windows

 Group 窗口根據時間或行計數間隔將組行聚合為有限的組,並每組評估一次聚合函數。 對於批處理表,窗口是按時間間隔對記錄進行分組的便捷 shortcut 。

Windows是使用window(w:GroupWindow)子句定義的,並且需要使用as子句指定的別名。 為了按窗口對表進行分組,必須像常規分組屬性一樣在groupBy(...)子句中引用窗口別名。 以下示例顯示如何在表上定義窗口聚合。

val table = input
  .window([w: GroupWindow] as 'w)  // define window with alias w
  .groupBy('w)   // group the table by window w
  .select('b.sum)  // aggregate

在流式傳輸環境中,如果窗口聚合除對窗口進行分組以外,還對一個或多個屬性進行分組,則它們只能並行計算,即groupBy(...)子句引用窗口別名和至少一個其他屬性。 僅引用窗口別名的groupBy(...)子句(例如上例中的子句)只能由單個非並行任務求值。 以下示例顯示如何使用其他分組屬性定義窗口聚合。

val table = input
  .window([w: GroupWindow] as 'w) // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w 
  .select('a, 'b.sum)  // aggregate

可以在select語句中將窗口屬性(例如時間窗口的開始,結束或行時間時間戳)添加為窗口別名的屬性,分別為w.start,w.end和w.rowtime。 窗口開始和行時間時間戳是包含窗口的上下邊界。 相反,窗口結束時間戳是唯一的窗口上邊界。 例如,從下午2點開始的30分鍾滾動窗口將以14:00:00.000作為開始時間戳,以14:29:59.999作為行時間時間戳,以14:30:00.000作為結束時間戳。

val table = input
  .window([w: GroupWindow] as 'w)  // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w 
  .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps

Window參數定義行如何映射到窗口。 窗口不是用戶可以實現的接口。 相反,Table API提供了一組具有特定語義的預定義Window類,這些類被轉換為基礎的DataStream或DataSet操作。 支持的窗口定義在下面列出。

Tumble (Tumbling Windows)

 滾動窗口將行分配給固定長度的非重疊連續窗口。 例如,5分鍾的滾動窗口以5分鍾為間隔對行進行分組。 可以在事件時間,處理時間或行數上定義滾動窗口。

滾動窗口是使用Tumble類定義的,如下所示:

Method Description
over 將窗口的長度定義為時間或行計數間隔。
on 用於分組(時間間隔)或排序(行計數)的時間屬性。 對於批查詢,它可以是任何Long或Timestamp屬性。 對於流查詢,它必須是聲明的事件時間或處理時間時間屬性。
as 為窗口分配別名。 別名用於引用以下groupBy()子句中的窗口,並可以選擇在select()子句中選擇窗口屬性,例如窗口開始,結束或行時間時間戳。
// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on 'proctime as 'w)

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on 'proctime as 'w)

Slide (Sliding Windows)

 滑動窗口的大小固定,並以指定的滑動間隔滑動。 如果滑動間隔小於窗口大小,則滑動窗口重疊。 因此,可以將行分配給多個窗口。 例如,一個15分鍾大小的滑動窗口和5分鍾的滑動間隔將每行分配給3個15分鍾大小的不同窗口,它們以5分鍾的間隔進行評估。 可以在事件時間,處理時間或行數上定義滑動窗口。

滑動窗口是通過使用Slide類定義的,如下所示:

Method Description
over 將窗口的長度定義為時間或行計數間隔。
every 將幻燈片間隔定義為時間間隔或行計數間隔。 滑動間隔必須與尺寸間隔具有相同的類型。
on 用於分組(時間間隔)或排序(行計數)的時間屬性。 對於批查詢,它可以是任何Long或Timestamp屬性。 對於流查詢,它必須是聲明的事件時間或處理時間時間屬性。
as 為窗口分配別名。 別名用於引用以下groupBy()子句中的窗口,並可以選擇在select()子句中選擇窗口屬性,例如窗口開始,結束或行時間時間戳。
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)

Session (Session Windows)

 會話窗口沒有固定的大小,但其邊界由不活動的時間間隔定義,即,如果在定義的間隔時間段內未出現任何事件,則會話窗口關閉。 例如,間隔30分鍾的會話窗口在30分鍾不活動后觀察到一行時開始(否則該行將被添加到現有窗口),如果在30分鍾內未添加任何行,則關閉該窗口。 會話窗口可以在事件時間或處理時間工作。

通過使用Session類定義會話窗口,如下所示:

Method Description
withGap 將兩個窗口之間的間隔定義為時間間隔。
on 用於分組(時間間隔)或排序(行計數)的時間屬性。 對於批查詢,它可以是任何Long或Timestamp屬性。 對於流查詢,它必須是聲明的事件時間或處理時間時間屬性。
as 為窗口分配別名。 別名用於引用以下groupBy()子句中的窗口,並可以選擇在select()子句中選擇窗口屬性,例如窗口開始,結束或行時間時間戳。
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on 'proctime as 'w)

Over Windows

 窗口聚合是標准SQL(OVER子句)已知的,並在查詢的SELECT子句中定義。 與在GROUP BY子句中指定的組窗口不同,在窗口上方不會折疊行。 取而代之的是,在窗口聚合中,為每個輸入行在其相鄰行的范圍內計算一個聚合。

使用window(w:OverWindow *)子句(在Python API中使用over_window(* OverWindow))定義窗口,並在select()方法中通過別名引用。 以下示例顯示了如何在表上定義窗口聚合。

val table = input
  .window([w: OverWindow] as 'w)              // define over window with alias w
  .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w

OverWindow定義了計算聚合的行范圍。 OverWindow不是用戶可以實現的接口。 相反,Table API提供了Over類來配置over窗口的屬性。 可以在事件時間或處理時間以及指定為時間間隔或行計數的范圍上定義窗口上方。 受支持的over窗口定義作為Over(和其他類)上的方法公開,並在下面列出:

Method Required Description
partitionBy Optional

在一個或多個屬性上定義輸入的分區。 每個分區都經過單獨排序,並且聚合函數分別應用於每個分區。

注意:在流環境中,僅當窗口包含partition by子句時,才可以並行計算整個窗口聚合。 沒有partitionBy(...),流將由單個非並行任務處理。

orderBy Required

定義每個分區內的行順序,從而定義將聚合函數應用於行的順序。

注意:對於流查詢,它必須是聲明的事件時間或處理時間時間屬性。 當前,僅支持單個sort屬性。

preceding Optional

定義窗口中包含的並在當前行之前的行的間隔。 該間隔可以指定為時間間隔或行計數間隔。

用時間間隔的大小指定窗口上的邊界,例如,時間間隔為10分鍾,行計數間隔為10行。

使用常數來指定窗口上的無邊界,即對於時間間隔為UNBOUNDED_RANGE或對於行計數間隔為UNBOUNDED_ROW。 Windows上的無邊界從分區的第一行開始。

如果省略了前面的子句,則將UNBOUNDED_RANGE和CURRENT_RANGE用作窗口的默認前后

 

following Optional

定義窗口中包含並緊隨當前行的行的窗口間隔。 該間隔必須與前面的間隔(時間或行計數)以相同的單位指定。

目前,不支持具有當前行之后的行的窗口。 相反,您可以指定兩個常量之一:

  • CURRENT_ROW 將窗口的上限設置為當前行.
  • CURRENT_RANGE 將窗口的上限設置為當前行的排序鍵,即,與當前行具有相同排序鍵的所有行都包含在窗口中.

如果省略以下子句,則將時間間隔窗口的上限定義為CURRENT_RANGE,將行計數間隔窗口的上限定義為CURRENT_ROW。

as Required

為上方窗口分配別名。 別名用於引用以下select()子句中的over窗口。

注意:當前,同一select()調用中的所有聚合函數必須在相同的窗口范圍內計算。

Unbounded Over Windows

// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)

// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)

// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
 
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

Bounded Over Windows

// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)

// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)

// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
  
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

Row-based Operations

基於行的操作生成具有多列的輸出。

Operators Description
Map
Batch Streaming

使用用戶定義的標量函數或內置標量函數執行映射操作。 如果輸出類型是復合類型,則輸出將被展平.

class MyMapFunction extends ScalarFunction {
  def eval(a: String): Row = {
    Row.of(a, "pre-" + a)
  }

  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
    Types.ROW(Types.STRING, Types.STRING)
}

val func = new MyMapFunction()
val table = input
  .map(func('c)).as('a, 'b)
FlatMap
Batch Streaming

使用表格功能執行flatMap操作.

class MyFlatMapFunction extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(2)
        row.setField(0, s)
        row.setField(1, s.length)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    Types.ROW(Types.STRING, Types.INT)
  }
}

val func = new MyFlatMapFunction
val table = input
  .flatMap(func('c)).as('a, 'b)
Aggregate
Batch StreamingResult Updating

使用聚合函數執行聚合操作。 您必須使用select語句關閉“聚合”,並且select語句不支持聚合功能。 如果輸出類型是復合類型,則聚合的輸出將被展平.

case class MyMinMaxAcc(var min: Int, var max: Int)

class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {

  def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
    if (value < acc.min) {
      acc.min = value
    }
    if (value > acc.max) {
      acc.max = value
    }
  }

  override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0, 0)
  
  def resetAccumulator(acc: MyMinMaxAcc): Unit = {
    acc.min = 0
    acc.max = 0
  }

  override def getValue(acc: MyMinMaxAcc): Row = {
    Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(Types.INT, Types.INT)
  }
}

val myAggFunc = new MyMinMax
val table = input
  .groupBy('key)
  .aggregate(myAggFunc('a) as ('x, 'y))
  .select('key, 'x, 'y)
Group Window Aggregate
Batch Streaming

在組窗口和可能的一個或多個分組鍵上對表進行分組和聚集。 您必須使用select語句關閉“聚合”。 並且select語句不支持“ *”或聚合函數.

val myAggFunc = new MyMinMax
val table = input
    .window(Tumble over 5.minutes on 'rowtime as 'w) // define window     .groupBy('key, 'w) // group by key and window     .aggregate(myAggFunc('a) as ('x, 'y))
    .select('key, 'x, 'y, 'w.start, 'w.end) // access window properties and aggregate results
FlatAggregate
Streaming
Result Updating

類似於GroupBy聚合。 使用以下運行表聚合運算符將分組鍵上的行分組,以逐行聚合行。 與AggregateFunction的區別在於TableAggregateFunction可以為一個組返回0個或更多記錄。 您必須使用select語句關閉“ flatAggregate”。 並且select語句不支持聚合函數.

除了使用emitValue輸出結果外,還可以使用emitUpdateWithRetract方法。 與emittValue不同,emitUpdateWithRetract用於發出已更新的值。 此方法在撤消模式下增量輸出數據,即,一旦有更新,我們就必須在發送新的更新記錄之前撤回舊記錄。 如果在表聚合函數中定義了這兩種方法,則將優先使用emitUpdateWithRetract方法,因為這兩種方法比emitValue更有效,因為它可以增量輸出值。 有關詳細信息,請參見表聚合函數

import java.lang.{Integer => JInteger}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableAggregateFunction

/** * Accumulator for top2. */
class Top2Accum {
  var first: JInteger = _
  var second: JInteger = _
}

/** * The top2 user-defined table aggregate function. */
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {

  override def createAccumulator(): Top2Accum = {
    val acc = new Top2Accum
    acc.first = Int.MinValue
    acc.second = Int.MinValue
    acc
  }

  def accumulate(acc: Top2Accum, v: Int) {
    if (v > acc.first) {
      acc.second = acc.first
      acc.first = v
    } else if (v > acc.second) {
      acc.second = v
    }
  }

  def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      val top2 = iter.next()
      accumulate(acc, top2.first)
      accumulate(acc, top2.second)
    }
  }

  def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
    // emit the value and rank     if (acc.first != Int.MinValue) {
      out.collect(JTuple2.of(acc.first, 1))
    }
    if (acc.second != Int.MinValue) {
      out.collect(JTuple2.of(acc.second, 2))
    }
  }
}

val top2 = new Top2
val orders: Table = tableEnv.scan("Orders")
val result = orders
    .groupBy('key)
    .flatAggregate(top2('a) as ('v, 'rank))
    .select('key, 'v, 'rank)

Note: 對於流查詢,根據聚合類型和不同的分組鍵的數量,計算查詢結果所需的狀態可能會無限增長。 請提供具有有效保留間隔的查詢配置,以防止出現過多的狀態。 有關詳細信息,請參見查詢配置。

Group Window FlatAggregate
Streaming

在組窗口和可能的一個或多個分組鍵上對表進行分組和聚集。 您必須使用select語句關閉“ flatAggregate”。 並且select語句不支持聚合函數.

val top2 = new Top2
val orders: Table = tableEnv.scan("Orders")
val result = orders
    .window(Tumble over 5.minutes on 'rowtime as 'w) // define window     .groupBy('a, 'w) // group by key and window     .flatAggregate(top2('b) as ('v, 'rank))
    .select('a, w.start, 'w.end, 'w.rowtime, 'v, 'rank) // access window properties and aggregate results

Data Types

請參閱有關數據類型的專用頁面。

通用類型和(嵌套的)復合類型(例如POJO,元組,行,Scala case class)也可以是一行的字段。

可以使用值訪問功能訪問具有任意嵌套的復合類型的字段。

泛型類型被視為黑盒,可以通過用戶定義的函數傳遞或處理。

Expression Syntax

上一節中的某些運算符期望一個或多個表達式。 可以使用嵌入式Scala DSL或字符串指定表達式。 請參考上面的示例以了解如何指定表達式。

這是用於表達式的EBNF語法:

expressionList = expression , { "," , expression } ;

expression = overConstant | alias ;

alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;

logic = comparison , [ ( "&&" | "||" ) , comparison ] ;

comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;

term = product , [ ( "+" | "-" ) , product ] ;

product = unary , [ ( "*" | "/" | "%") , unary ] ;

unary = [ "!" | "-" | "+" ] , composite ;

composite = over | suffixed | nullLiteral | prefixed | atom ;

suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ;

prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ;

interval = timeInterval | rowInterval ;

timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;

rowInterval = composite , "." , "rows" ;

suffixCast = composite , ".cast(" , dataType , ")" ;

prefixCast = "cast(" , expression , dataType , ")" ;

dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;

suffixAs = composite , ".as(" , fieldReference , ")" ;

prefixAs = "as(" , expression, fieldReference , ")" ;

suffixIf = composite , ".?(" , expression , "," , expression , ")" ;

prefixIf = "?(" , expression , "," , expression , "," , expression , ")" ;

suffixDistinct = composite , "distinct.()" ;

prefixDistinct = functionIdentifier , ".distinct" , [ "(" , [ expression , { "," , expression } ] , ")" ] ;

suffixFunctionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;

prefixFunctionCall = functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;

atom = ( "(" , expression , ")" ) | literal | fieldReference ;

fieldReference = "*" | identifier ;

nullLiteral = "nullOf(" , dataType , ")" ;

timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;

timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;

over = composite , "over" , fieldReference ;

overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;

timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;

Literals: 在這里,文字是有效的Java文字。 字符串文字可以使用單引號或雙引號指定。 復制引號以進行轉義(例如,'It''s me.' or "I ""like"" dogs.")。

Null literals: 空文字必須附加一個類型。 使用nullOf(type)(例如nullOf(INT))創建空值。

Field references: fieldReference指定數據中的一列(如果使用*,則指定所有列),而functionIdentifier指定受支持的標量函數。 列名和函數名遵循Java標識符語法。

Function calls: 指定為字符串的表達式也可以使用前綴表示法而不是后綴表示法來調用運算符和函數。

Decimals: 如果需要使用精確的數值或大的十進制數,則Table API還支持Java的BigDecimal類型。 在Scala Table API中,小數可以由BigDecimal(“ 123456”)定義,而在Java中,可以通過附加“ p”來精確定義例如 123456p

Time representation: 為了使用時間值,Table API支持Java SQL的日期,時間和時間戳類型。 在Scala Table API中,可以使用java.sql.Date.valueOf(“ 2016-06-27”),java.sql.Time.valueOf(“ 10:10:42”)或java.sql定義文字。 Timestamp.valueOf(“ 2016-06-27 10:10:42.123”)。 Java和Scala表API還支持調用“ 2016-06-27” .toDate(),“ 10:10:42” .toTime()和“ 2016-06-27 10:10:42.123” .toTimestamp() 用於將字符串轉換為時間類型。 注意:由於Java的時態SQL類型取決於時區,因此請確保Flink Client和所有TaskManager使用相同的時區。

Temporal intervals:  時間間隔可以表示為月數(Types.INTERVAL_MONTHS)或毫秒數(Types.INTERVAL_MILLIS)。 可以添加或減去相同類型的間隔(例如1.小時+ 10分鍾)。 可以將毫秒間隔添加到時間點(例如“ 2016-08-10” .toDate + 5.days)。

Scala expressions:  Scala表達式使用隱式轉換。 因此,請確保將通配符導入org.apache.flink.table.api.scala._添加到程序中。 如果文字不被視為表達式,請使用.toExpr(如3.toExpr)強制轉換文字。

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 

注意:在流環境中,僅當窗口包含partition by子句時,才可以並行計算整個窗口聚合。 沒有partitionBy(...),流將由單個非並行任務處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM