Flink SQL 时间属性、处理时间 & SQL自定义函数、事件时间 & 滑动窗口、topic 可以理解为Flink中的一张表、会话窗口


Flink SQL 时间属性

Flink 可以基于几种不同的 时间 概念来处理数据。

  • 处理时间 指的是执行具体操作时的机器时间(也称作”挂钟时间”)
  • 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
  • 摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。

像窗口(在 Table APISQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

每种类型的表都可以有时间属性,可以在用CREATE TABLE DDL创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用

只要时间属性没有被修改,而是简单地从一个表传递到另一个表,它就仍然是一个有效的时间属性。时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中,它就会被物化,进而变成一个普通的时间戳。普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的,所以普通的时间戳就无法用在时间相关的操作中。

处理时间 & SQL自定义函数

package com.shujia.flink.table

import java.time.LocalDateTime

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction

object Demo8ProcTime {
  def main(args: Array[String]): Unit = {
    //创建 flink 环境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //设置 table 环境的一些参数
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blink计划器
      .inStreamingMode() //流模式
      .build()

    // 创建flink table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)

    /**
      * 注册临时自定义函数 -- 只在当前环境中有用
      * addTime -- 函数名
      * new AddTime -- 自定义函数类的对象
      */

    bsTableEnv.createTemporaryFunction("addTime", new AddTime)

    /**
      * 在当前版本中PROCTIME()获取到的时间时区差了8小时,后面需要将时间增加8小时
      * 这个问题在新的版本中解决了
      * -- 只在处理时间才有
      */
    bsTableEnv.executeSql(
      """
        |CREATE TABLE student (
        |  id STRING,
        |  name STRING,
        |  age INT,
        |  gender STRING,
        |  clazz STRING,
        |  //这里的 AS 不是取别名的意思,而是通过 PROCTIME() 函数生成 user_action_time 字段
        |  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'student',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
      """.stripMargin)

    //这里print表的创建不能用LIKE的形式了,因为上面的时间不能被LIKE继承过来
    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table (
        |  clazz STRING,
        |  win_end TIMESTAMP(3),
        |  c BIGINT
        |)
        |WITH ('connector' = 'print')
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        |clazz,
        |addTime(TUMBLE_END(user_action_time, INTERVAL '5' SECOND)) as win_end,
        |count(1) as c
        |from student
        |group by
        |clazz,
        |TUMBLE(user_action_time, INTERVAL '5' SECOND)
        |//处理时间的滚动窗口 
      """.stripMargin)

  }
}

/**
  * 自定义函数
  *
  */
class AddTime extends ScalarFunction {

  /**
    * 方法名必须是eval
    *
    * @param localDateTime : 时间戳在代码中的类型
    * @return
    */
  def eval(localDateTime: LocalDateTime): LocalDateTime = {
    //时间增加8小时
    localDateTime.plusHours(8)
    
  }
}

事件时间 & 滑动窗口

topic 可以理解为Flink中的一张表

package com.shujia.flink.table

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo9EventTIme {
  def main(args: Array[String]): Unit = {
    //创建flink 环境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    bsEnv.setParallelism(1)

    //设置table 环境的一些参数
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blink计划器
      .inStreamingMode() //流模式
      .build()

    // 创建flink table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)

    //建表读取CSV格式的数据的时候,字段的顺序要和数据中的顺序对应,因为是按照顺序进行解析的
    bsTableEnv.executeSql(
      """
        |CREATE TABLE clicks (
        |  u STRING,
        |  ts TIMESTAMP(3),
        |  url STRING,
        |  -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
        |  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'clicks1',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'csv',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table (
        |  u STRING,
        |  win_start TIMESTAMP(3),
        |  win_end TIMESTAMP(3),
        |  c BIGINT
        |)
        |WITH ('connector' = 'print')
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        | u,
        | HOP_START(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_start, -- 开始时间
        | HOP_END(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_end, -- 结束时间
        | count(1) as c
        | from
        |clicks
        |group by u,
        |HOP(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
        |//滑动窗口,'10' SECOND -- 滑动时间,INTERVAL '1' HOUR -- 窗口大小
        |//ts -- 事件时间字段
      """.stripMargin)

  }
}

Flink SQL 中没有统计窗口,可以用其他的方式实现

会话窗口

package com.shujia.flink.table

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo10SessionWindow {
  def main(args: Array[String]): Unit = {

    //创建flink 环境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    bsEnv.setParallelism(1)

    //设置table 环境的一些参数
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blink计划器
      .inStreamingMode() //流模式
      .build()

    // 创建flink table 环境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE clicks (
        |  u STRING,
        |  ts TIMESTAMP(3),
        |  url STRING,
        |  -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
        |  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'clicks1',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'csv',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table (
        |  u STRING,
        |  win_start TIMESTAMP(3),
        |  win_end TIMESTAMP(3),
        |  c BIGINT
        |)
        |WITH ('connector' = 'print')
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        | u,
        | SESSION_START(ts, INTERVAL '10' SECOND) as win_start,
        | SESSION_END(ts, INTERVAL '10' SECOND) as win_end,
        | count(1) as c
        | from
        |clicks
        |group by u,
        |SESSION(ts, INTERVAL '10' SECOND)
        |//会话窗口 -- 10秒没有数据算一次
      """.stripMargin)
  }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM