Flink SQL 时间属性
Flink 可以基于几种不同的 时间 概念来处理数据。
- 处理时间 指的是执行具体操作时的机器时间(也称作”挂钟时间”)
- 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
- 摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。
像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。
每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在 DataStream
中指定、也可以在定义 TableSource
时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。
只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。
处理时间 & SQL自定义函数
package com.shujia.flink.table
import java.time.LocalDateTime
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
object Demo8ProcTime {
def main(args: Array[String]): Unit = {
//创建 flink 环境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置 table 环境的一些参数
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink计划器
.inStreamingMode() //流模式
.build()
// 创建flink table 环境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
/**
* 注册临时自定义函数 -- 只在当前环境中有用
* addTime -- 函数名
* new AddTime -- 自定义函数类的对象
*/
bsTableEnv.createTemporaryFunction("addTime", new AddTime)
/**
* 在当前版本中PROCTIME()获取到的时间时区差了8小时,后面需要将时间增加8小时
* 这个问题在新的版本中解决了
* -- 只在处理时间才有
*/
bsTableEnv.executeSql(
"""
|CREATE TABLE student (
| id STRING,
| name STRING,
| age INT,
| gender STRING,
| clazz STRING,
| //这里的 AS 不是取别名的意思,而是通过 PROCTIME() 函数生成 user_action_time 字段
| user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'student',
| 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin)
//这里print表的创建不能用LIKE的形式了,因为上面的时间不能被LIKE继承过来
bsTableEnv.executeSql(
"""
|CREATE TABLE print_table (
| clazz STRING,
| win_end TIMESTAMP(3),
| c BIGINT
|)
|WITH ('connector' = 'print')
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into print_table
|select
|clazz,
|addTime(TUMBLE_END(user_action_time, INTERVAL '5' SECOND)) as win_end,
|count(1) as c
|from student
|group by
|clazz,
|TUMBLE(user_action_time, INTERVAL '5' SECOND)
|//处理时间的滚动窗口
""".stripMargin)
}
}
/**
* 自定义函数
*
*/
class AddTime extends ScalarFunction {
/**
* 方法名必须是eval
*
* @param localDateTime : 时间戳在代码中的类型
* @return
*/
def eval(localDateTime: LocalDateTime): LocalDateTime = {
//时间增加8小时
localDateTime.plusHours(8)
}
}
事件时间 & 滑动窗口
topic 可以理解为Flink中的一张表
package com.shujia.flink.table
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Demo9EventTIme {
def main(args: Array[String]): Unit = {
//创建flink 环境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setParallelism(1)
//设置table 环境的一些参数
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink计划器
.inStreamingMode() //流模式
.build()
// 创建flink table 环境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
//建表读取CSV格式的数据的时候,字段的顺序要和数据中的顺序对应,因为是按照顺序进行解析的
bsTableEnv.executeSql(
"""
|CREATE TABLE clicks (
| u STRING,
| ts TIMESTAMP(3),
| url STRING,
| -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
| WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'clicks1',
| 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'csv',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin)
bsTableEnv.executeSql(
"""
|CREATE TABLE print_table (
| u STRING,
| win_start TIMESTAMP(3),
| win_end TIMESTAMP(3),
| c BIGINT
|)
|WITH ('connector' = 'print')
|
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into print_table
|select
| u,
| HOP_START(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_start, -- 开始时间
| HOP_END(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_end, -- 结束时间
| count(1) as c
| from
|clicks
|group by u,
|HOP(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
|//滑动窗口,'10' SECOND -- 滑动时间,INTERVAL '1' HOUR -- 窗口大小
|//ts -- 事件时间字段
""".stripMargin)
}
}
Flink SQL 中没有统计窗口,可以用其他的方式实现
会话窗口
package com.shujia.flink.table
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Demo10SessionWindow {
def main(args: Array[String]): Unit = {
//创建flink 环境
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setParallelism(1)
//设置table 环境的一些参数
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner() //使用blink计划器
.inStreamingMode() //流模式
.build()
// 创建flink table 环境
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv.executeSql(
"""
|CREATE TABLE clicks (
| u STRING,
| ts TIMESTAMP(3),
| url STRING,
| -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
| WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'clicks1',
| 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'csv',
| 'scan.startup.mode' = 'earliest-offset'
|)
""".stripMargin)
bsTableEnv.executeSql(
"""
|CREATE TABLE print_table (
| u STRING,
| win_start TIMESTAMP(3),
| win_end TIMESTAMP(3),
| c BIGINT
|)
|WITH ('connector' = 'print')
|
""".stripMargin)
bsTableEnv.executeSql(
"""
|insert into print_table
|select
| u,
| SESSION_START(ts, INTERVAL '10' SECOND) as win_start,
| SESSION_END(ts, INTERVAL '10' SECOND) as win_end,
| count(1) as c
| from
|clicks
|group by u,
|SESSION(ts, INTERVAL '10' SECOND)
|//会话窗口 -- 10秒没有数据算一次
""".stripMargin)
}
}