Flink SQL 時間屬性、處理時間 & SQL自定義函數、事件時間 & 滑動窗口、topic 可以理解為Flink中的一張表、會話窗口


Flink SQL 時間屬性

Flink 可以基於幾種不同的 時間 概念來處理數據。

  • 處理時間 指的是執行具體操作時的機器時間(也稱作”掛鍾時間”)
  • 事件時間 指的是數據本身攜帶的時間。這個時間是在事件產生時的時間。
  • 攝入時間 指的是數據進入 Flink 的時間;在系統內部,會把它當做事件時間來處理。

像窗口(在 Table APISQL )這種基於時間的操作,需要有時間信息。因此,Table API 中的表就需要提供邏輯時間屬性來表示時間,以及支持時間相關的操作。

每種類型的表都可以有時間屬性,可以在用CREATE TABLE DDL創建表的時候指定、也可以在 DataStream 中指定、也可以在定義 TableSource 時指定。一旦時間屬性定義好,它就可以像普通列一樣使用,也可以在時間相關的操作中使用

只要時間屬性沒有被修改,而是簡單地從一個表傳遞到另一個表,它就仍然是一個有效的時間屬性。時間屬性可以像普通的時間戳的列一樣被使用和計算。一旦時間屬性被用在了計算中,它就會被物化,進而變成一個普通的時間戳。普通的時間戳是無法跟 Flink 的時間以及watermark等一起使用的,所以普通的時間戳就無法用在時間相關的操作中。

處理時間 & SQL自定義函數

package com.shujia.flink.table

import java.time.LocalDateTime

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction

object Demo8ProcTime {
  def main(args: Array[String]): Unit = {
    //創建 flink 環境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //設置 table 環境的一些參數
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blink計划器
      .inStreamingMode() //流模式
      .build()

    // 創建flink table 環境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)

    /**
      * 注冊臨時自定義函數 -- 只在當前環境中有用
      * addTime -- 函數名
      * new AddTime -- 自定義函數類的對象
      */

    bsTableEnv.createTemporaryFunction("addTime", new AddTime)

    /**
      * 在當前版本中PROCTIME()獲取到的時間時區差了8小時,后面需要將時間增加8小時
      * 這個問題在新的版本中解決了
      * -- 只在處理時間才有
      */
    bsTableEnv.executeSql(
      """
        |CREATE TABLE student (
        |  id STRING,
        |  name STRING,
        |  age INT,
        |  gender STRING,
        |  clazz STRING,
        |  //這里的 AS 不是取別名的意思,而是通過 PROCTIME() 函數生成 user_action_time 字段
        |  user_action_time AS PROCTIME() -- 聲明一個額外的列作為處理時間屬性
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'student',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'json',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
      """.stripMargin)

    //這里print表的創建不能用LIKE的形式了,因為上面的時間不能被LIKE繼承過來
    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table (
        |  clazz STRING,
        |  win_end TIMESTAMP(3),
        |  c BIGINT
        |)
        |WITH ('connector' = 'print')
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        |clazz,
        |addTime(TUMBLE_END(user_action_time, INTERVAL '5' SECOND)) as win_end,
        |count(1) as c
        |from student
        |group by
        |clazz,
        |TUMBLE(user_action_time, INTERVAL '5' SECOND)
        |//處理時間的滾動窗口 
      """.stripMargin)

  }
}

/**
  * 自定義函數
  *
  */
class AddTime extends ScalarFunction {

  /**
    * 方法名必須是eval
    *
    * @param localDateTime : 時間戳在代碼中的類型
    * @return
    */
  def eval(localDateTime: LocalDateTime): LocalDateTime = {
    //時間增加8小時
    localDateTime.plusHours(8)
    
  }
}

事件時間 & 滑動窗口

topic 可以理解為Flink中的一張表

package com.shujia.flink.table

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo9EventTIme {
  def main(args: Array[String]): Unit = {
    //創建flink 環境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    bsEnv.setParallelism(1)

    //設置table 環境的一些參數
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blink計划器
      .inStreamingMode() //流模式
      .build()

    // 創建flink table 環境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)

    //建表讀取CSV格式的數據的時候,字段的順序要和數據中的順序對應,因為是按照順序進行解析的
    bsTableEnv.executeSql(
      """
        |CREATE TABLE clicks (
        |  u STRING,
        |  ts TIMESTAMP(3),
        |  url STRING,
        |  -- 聲明 ts 是事件時間屬性,並且用 延遲 5 秒的策略來生成 watermark
        |  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'clicks1',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'csv',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table (
        |  u STRING,
        |  win_start TIMESTAMP(3),
        |  win_end TIMESTAMP(3),
        |  c BIGINT
        |)
        |WITH ('connector' = 'print')
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        | u,
        | HOP_START(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_start, -- 開始時間
        | HOP_END(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR) as win_end, -- 結束時間
        | count(1) as c
        | from
        |clicks
        |group by u,
        |HOP(ts, INTERVAL '10' SECOND, INTERVAL '1' HOUR)
        |//滑動窗口,'10' SECOND -- 滑動時間,INTERVAL '1' HOUR -- 窗口大小
        |//ts -- 事件時間字段
      """.stripMargin)

  }
}

Flink SQL 中沒有統計窗口,可以用其他的方式實現

會話窗口

package com.shujia.flink.table

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo10SessionWindow {
  def main(args: Array[String]): Unit = {

    //創建flink 環境
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    bsEnv.setParallelism(1)

    //設置table 環境的一些參數
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() //使用blink計划器
      .inStreamingMode() //流模式
      .build()

    // 創建flink table 環境
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE clicks (
        |  u STRING,
        |  ts TIMESTAMP(3),
        |  url STRING,
        |  -- 聲明 ts 是事件時間屬性,並且用 延遲 5 秒的策略來生成 watermark
        |  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
        |) WITH (
        | 'connector' = 'kafka',
        | 'topic' = 'clicks1',
        | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        | 'properties.group.id' = 'testGroup',
        | 'format' = 'csv',
        | 'scan.startup.mode' = 'earliest-offset'
        |)
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |CREATE TABLE print_table (
        |  u STRING,
        |  win_start TIMESTAMP(3),
        |  win_end TIMESTAMP(3),
        |  c BIGINT
        |)
        |WITH ('connector' = 'print')
        |
      """.stripMargin)

    bsTableEnv.executeSql(
      """
        |insert into print_table
        |select
        | u,
        | SESSION_START(ts, INTERVAL '10' SECOND) as win_start,
        | SESSION_END(ts, INTERVAL '10' SECOND) as win_end,
        | count(1) as c
        | from
        |clicks
        |group by u,
        |SESSION(ts, INTERVAL '10' SECOND)
        |//會話窗口 -- 10秒沒有數據算一次
      """.stripMargin)
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM