Table API 和 Flink SQL—第四章 窗口(Windows)


時間語義,要配合窗口操作才能發揮作用。最主要的用途,當然就是開窗口、根據時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎么利用時間字段做窗口操作。

在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over Windows

4.1      分組窗口(Group Windows)

分組窗口(Group Windows)會根據時間或行計數間隔,將行聚合到有限的組(Group) 中,並對每個組的數據執行一次聚合函數。

Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定義的,並且必須由 as 子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在group by 子句中,像常規的分組字段一樣引用。

val table = input

.window([w: GroupWindow] as 'w) // 定義窗口,別名 w

.groupBy('w, 'a)	// 以屬性 a 和窗口 w 作為分組的 key
.select('a, 'b.sum)	// 聚合字段 b 的值,求和
val table = input

.window([w: GroupWindow] as 'w)

.groupBy('w, 'a)

.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)

 

Table API 提供了一組具有特定語義的預定義 Window 類,這些類會被轉換為底層

DataStream 或 DataSet 的窗口操作。

Table  API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding)和會話(Session)。

 

4.1.2     滾動窗口

 

滾動窗口(Tumbling windows)要用 Tumble 類來定義,另外還有三個方法:

⚫ over:定義窗口長度

⚫ on:用來分組(按時間間隔)或者排序(按行數)的時間字段

⚫ as:別名,必須出現在后面的 groupBy 中

代碼如下:

// Tumbling Event-time Window(事件時間字段 rowtime)

.window(Tumble over 10.minutes on 'rowtime as 'w)


// Tumbling Processing-time Window(處理時間字段 proctime)

.window(Tumble over 10.minutes on 'proctime as 'w)


// Tumbling Row-count Window (類似於計數窗口,按處理時間排序,10 行一組)

.window(Tumble over 10.rows on 'proctime as 'w)

 

4.1.2	滑動窗口

滑動窗口(Sliding windows)要用 Slide 類來定義,另外還有四個方法:

⚫	over:定義窗口長度
⚫	every:定義滑動步長
⚫	on:用來分組(按時間間隔)或者排序(按行數)的時間字段
⚫	as:別名,必須出現在后面的 groupBy 中
// Sliding Event-time Window

.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)



// Sliding Processing-time window

.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)



// Sliding Row-count window

.window(Slide over 10.rows every 5.rows on 'proctime as 'w)

4.1.1      會話窗口

 

會話窗口(Session windows)要用 Session 類來定義,另外還有三個方法:

⚫ withGap:會話時間間隔

⚫ on:用來分組(按時間間隔)或者排序(按行數)的時間字段

⚫ as:別名,必須出現在后面的 groupBy 中

代碼如下:

// Session Event-time Window

.window(Session withGap 10.minutes on 'rowtime as 'w)

// Session Processing-time Window

 

.window(Session withGap 10.minutes on 'proctime as 'w)

4.2   Over Windows

Over window 聚合是標准SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。

Over window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合。Over windows

使用.window(w:overwindows*)子句定義,並在 select()方法中通過別名來引用。

比如這樣:

val table = input

.window([w: OverWindow] as 'w)

.select('a, 'b.sum over 'w, 'c.min over 'w)

Table API 提供了 Over 類,來配置 Over 窗口的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的范圍內,定義 Over windows。

無界的over window 是使用常量指定的。也就是說,時間間隔要指定UNBOUNDED_RANGE, 或者行計數間隔要指定UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。

實際代碼應用如下:

 

1)    無界的 over window

// 無界的事件時間 over window (時間字段 "rowtime")

.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)

//無界的處理時間 over window (時間字段"proctime")

.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)

// 無界的事件時間 Row-count over window (時間字段 "rowtime")

.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)

//無界的處理時間 Row-count over window (時間字段 "rowtime")

.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

 

2)    有界的 over window

// 有界的事件時間 over window (時間字段 "rowtime",之前 1 分鍾)

.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)



// 有界的處理時間 over window (時間字段 "rowtime",之前 1 分鍾)

.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)



// 有界的事件時間 Row-count over window (時間字段 "rowtime",之前 10 行)

.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)



// 有界的處理時間 Row-count over window (時間字段 "rowtime",之前 10 行)

4.2   SQL 中窗口的定義

 

我們已經了解了在 Table API 里 window 的調用方式,同樣,我們也可以在 SQL 中直接加入窗口的定義和使用。

 

4.2.1   Group Windows

 

Group Windows 在 SQL 查詢的Group BY 子句中定義。與使用常規 GROUP BY 子句的查詢一樣,使用GROUP BY 子句的查詢會計算每個組的單個結果行。

SQL 支持以下Group 窗口函數:

 

⚫ TUMBLE(time_attr, interval)

 

定義一個滾動窗口,第一個參數是時間字段,第二個參數是窗口長度。

 

⚫ HOP(time_attr, interval, interval)

 

定義一個滑動窗口,第一個參數是時間字段,第二個參數是窗口滑動步長,第三個是窗

 

口長度。

 

⚫ SESSION(time_attr, interval)

 

定義一個會話窗口,第一個參數是時間字段,第二個參數是窗口間隔(Gap)。

 

 

另外還有一些輔助函數,可以用來選擇Group Window 的開始和結束時間戳,以及時間屬性。

這里只寫 TUMBLE_*,滑動和會話窗口是類似的(HOP_*,SESSION_*)。

 

⚫ TUMBLE_START(time_attr, interval)

 

⚫ TUMBLE_END(time_attr, interval)

 

⚫ TUMBLE_ROWTIME(time_attr, interval)

 

⚫ TUMBLE_PROCTIME(time_attr, interval)

 

 

4.2.2   Over Windows

 

由於 Over 本來就是 SQL 內置支持的語法,所以這在SQL 中屬於基本的聚合操作。所有聚合必須在同一窗口上定義,也就是說,必須是相同的分區、排序和范圍。目前僅支持在當前行范圍之前的窗口(無邊界和有邊界)。

注意,ORDER BY 必須在單一的時間屬性上指定。

 

代碼如下:

SELECT COUNT(amount) OVER ( PARTITION BY user
ORDER BY proctime

ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

FROM Orders


// 也可以做多個聚合

SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders
WINDOW w AS (
PARTITION BY user

ORDER BY proctime

ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

4.4      代碼練習(以分組滾動窗口為例)

 

我們可以綜合學習過的內容,用一段完整的代碼實現一個具體的需求。例如,可以開一個滾動窗口,統計 10 秒內出現的每個sensor 的個數。

代碼如下:

 

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


val streamFromFile: DataStream[String] = env.readTextFile("sensor.txt")

val dataStream: DataStream[SensorReading] = streamFromFile 

.map( data => {

val dataArray = data.split(",")

SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
} )

.assignTimestampsAndWatermarks( new

BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1))

{

override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
} )



val settings: EnvironmentSettings = EnvironmentSettings
 
.newInstance()

.useOldPlanner()

.inStreamingMode()

.build()

val tableEnv: StreamTableEnvironment =

StreamTableEnvironment.create(env, settings)



val dataTable: Table = tableEnv

.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime)



val resultTable: Table = dataTable

.window(Tumble over 10.seconds on 'timestamp as 'tw)

.groupBy('id, 'tw)

.select('id, 'id.count)



val sqlDataTable: Table = dataTable

.select('id, 'temperature, 'timestamp as 'ts)

val resultSqlTable: Table = tableEnv

.sqlQuery("select id, count(id) from "

+ sqlDataTable

+ " group by id,tumble(ts,interval '10' second)")


// 把 Table 轉化成數據流 

val resultDstream: DataStream[(Boolean, (String, Long))] = resultSqlTable

.toRetractStream[(String, Long)]



resultDstream.filter(_._1).print() 

env.execute()

  

 

 

  

 

  

  

  

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM