Flink SQL 時間屬性
Flink 可以基於幾種不同的 時間 概念來處理數據。
- 處理時間 指的是執行具體操作時的機器時間(也稱作”掛鍾時間”)
- 事件時間 指的是數據本身攜帶的時間。這個時間是在事件產生時的時間。
- 攝入時間 指的是數據進入 Flink 的時間;在系統內部,會把它當做事件時間來處理。
像窗口(在 Table API 和 SQL )這種基於時間的操作,需要有時間信息。因此,Table API 中的表就需要提供邏輯時間屬性來表示時間,以及支持時間相關的操作。
每種類型的表都可以有時間屬性,可以在用CREATE TABLE DDL創建表的時候指定、也可以在 DataStream
中指定、也可以在定義 TableSource
時指定。一旦時間屬性定義好,它就可以像普通列一樣使用,也可以在時間相關的操作中使用。
只要時間屬性沒有被修改,而是簡單地從一個表傳遞到另一個表,它就仍然是一個有效的時間屬性。時間屬性可以像普通的時間戳的列一樣被使用和計算。一旦時間屬性被用在了計算中,它就會被物化,進而變成一個普通的時間戳。普通的時間戳是無法跟 Flink 的時間以及watermark等一起使用的,所以普通的時間戳就無法用在時間相關的操作中。
處理時間 & SQL自定義函數
package com.shujia.flink.table
import java.time.LocalDateTime
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
object Demo8ProcTime {
def main(args: Array[String]): Unit = {
//創建 flink 環境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//設置 table 環境的一些參數
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink計划器
.inStreamingMode() //流模式
.build()
// 創建flink table 環境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
/**
* 注冊臨時自定義函數 -- 只在當前環境中有用
* addTime -- 函數名
* new AddTime -- 自定義函數類的對象
*/
bsTableEnv.createTemporaryFunction("addTime", new AddTime)
/**
* 在當前版本中PROCTIME()獲取到的時間時區差了8小時,后面需要將時間增加8小時
* 這個問題在新的版本中解決了
* -- 只在處理時間才有
*/
bsTableEnv.executeSql(
"""
|CREATE TABLE student (
| id STRING,
| name STRING,
| age INT,
| gender STRING,
| clazz STRING,
| //這里的 AS 不是取別名的意思,而是通過 PROCTIME() 函數生成 user_action_time 字段
| user_action_time AS PROCTIME() -- 聲明一個額外的列作為處理時間屬性
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'student',
| 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin)
//這里print表的創建不能用LIKE的形式了,因為上面的時間不能被LIKE繼承過來
bsTableEnv.executeSql(
"""
|CREATE TABLE print_table (
| clazz STRING,
| win_end TIMESTAMP(3),
| c BIGINT
|)
|WITH ('connector' = 'print')
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into print_table
|select
|clazz,
|addTime(TUMBLE_END(user_action_time, INTERVAL '5' SECOND)) as win_end,
|count(1) as c
|from student
|group by
|clazz,
|TUMBLE(user_action_time, INTERVAL '5' SECOND)
|//處理時間的滾動窗口
""".stripMargin)
}
}
/**
* 自定義函數
*
*/
class AddTime extends ScalarFunction {
/**
* 方法名必須是eval
*
* @param localDateTime : 時間戳在代碼中的類型
* @return
*/
def eval(localDateTime: LocalDateTime): LocalDateTime = {
//時間增加8小時
localDateTime.plusHours(8)
}
}
事件時間 & 滑動窗口
topic 可以理解為Flink中的一張表
package com.shujia.flink.table
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Demo9EventTIme {
def main(args: Array[String]): Unit = {
//創建flink 環境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setParallelism(1)
//設置table 環境的一些參數
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink計划器
.inStreamingMode() //流模式
.build()
// 創建flink table 環境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
//建表讀取CSV格式的數據的時候,字段的順序要和數據中的順序對應,因為是按照順序進行解析的
bsTableEnv.executeSql(
"""
|CREATE TABLE clicks (
| u STRING,
| ts TIMESTAMP(3),
| url STRING,
| -- 聲明 ts 是事件時間屬性,並且用 延遲 5 秒的策略來生成 watermark
| WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'clicks1',
| 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'csv',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin)
bsTableEnv.executeSql(
"""
|CREATE TABLE print_table (
| u STRING,
| win_start TIMESTAMP(3),
| win_end TIMESTAMP(3),
| c BIGINT
|)
|WITH ('connector' = 'print')
|
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into print_table
|select
| u,
| HOP_START(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_start, -- 開始時間
| HOP_END(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_end, -- 結束時間
| count(1) as c
| from
|clicks
|group by u,
|HOP(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
|//滑動窗口,'10' SECOND -- 滑動時間,INTERVAL '1' HOUR -- 窗口大小
|//ts -- 事件時間字段
""".stripMargin)
}
}
Flink SQL 中沒有統計窗口,可以用其他的方式實現
會話窗口
package com.shujia.flink.table
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Demo10SessionWindow {
def main(args: Array[String]): Unit = {
//創建flink 環境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setParallelism(1)
//設置table 環境的一些參數
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink計划器
.inStreamingMode() //流模式
.build()
// 創建flink table 環境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv.executeSql(
"""
|CREATE TABLE clicks (
| u STRING,
| ts TIMESTAMP(3),
| url STRING,
| -- 聲明 ts 是事件時間屬性,並且用 延遲 5 秒的策略來生成 watermark
| WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'clicks1',
| 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'csv',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin)
bsTableEnv.executeSql(
"""
|CREATE TABLE print_table (
| u STRING,
| win_start TIMESTAMP(3),
| win_end TIMESTAMP(3),
| c BIGINT
|)
|WITH ('connector' = 'print')
|
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into print_table
|select
| u,
| SESSION_START(ts, INTERVAL '10' SECOND) as win_start,
| SESSION_END(ts, INTERVAL '10' SECOND) as win_end,
| count(1) as c
| from
|clicks
|group by u,
|SESSION(ts, INTERVAL '10' SECOND)
|//會話窗口 -- 10秒沒有數據算一次
""".stripMargin)
}
}