flink系列-8、Flink Table API & Flink Sql API


一、概述

  • 上圖是flink的分層模型,Table API 和 SQL 處於最頂端,是 Flink 提供的高級 API 操作。Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設計的一套符合標准 SQL 語義的開發語言。
  • Flink 在編程模型上提供了 DataStream 和 DataSet 兩套 API,並沒有做到事實上的批流統一,因為用戶和開發者還是開發了兩套代碼。正是因為 Flink Table & SQL 的加入,可以說 Flink 在某種程度上做到了事實上的批流一體。

原理

  • Flink  底層對 SQL 的解析,優化,執行用到了 Apache Calcite。
  • 下圖是一張經典的 Flink Table & SQL 實現原理圖,可以看到 Calcite 在整個架構中處於絕對核心地位。

二、Flink Table & SQL 算子和內置函數

Table API 操作

 以 Table -> GroupedTable -> Table 為例

 一個Table 經過 groupBy 后得到 GroupedTable,GroupedTable經過select后又得到Table。

Columns operators

 比如有一張100列的表去除一列,可以選擇 DropColumns

Columns Function

Flink sql API操作

Flink SQL 和傳統的 SQL 一樣,支持了包含查詢、連接、聚合等場景,另外還支持了包括窗口、排序等場景:

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'
...

主要來看一下flink sql的Windows操作

根據窗口數據划分的不同,目前 Apache Flink 有如下 3 種:

  • 滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;
  • 滑動窗口,窗口數據有固定大小,並且有生成間隔;
  • 會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行划分,窗口數據無疊加;

滾動窗口

  • 滾動窗口的特點是:有固定大小、窗口中的數據不會重疊,如下圖所示

語法:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)

 比如我們計算每個用戶每天的訂單量:

SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;

其中,TUMBLE_START 和 TUMBLE_END 代表窗口的開始時間和窗口的結束時間,TUMBLE (timeLine, INTERVAL '1' DAY) 中的 timeLine 代表時間字段所在的列,INTERVAL '1' DAY 表示時間間隔為一天。

滑動窗口

  • 滑動窗口有固定的大小,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的創建頻率。需要注意的是,多個滑動窗口可能會發生數據重疊,具體語義如下:

 

滑動窗口的語法與滾動窗口相比,只多了一個 slide 參數:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)

例如,我們要每間隔一小時計算一次過去 24 小時內每個商品的銷量:

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

上述案例中的 INTERVAL '1' HOUR 代表滑動窗口生成的時間間隔。

會話窗口

  • 會話窗口定義了一個非活動時間,假如在指定的時間間隔內沒有出現事件或消息,則會話窗口關閉。

 

語法如下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)

舉例,我們需要計算每個用戶過去 1 小時內的訂單量:

SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user

內置函數
Flink 中還有大量的內置函數,我們可以直接使用,將內置函數分類如下:

  • 比較函數
  •  邏輯函數

  • 算數函數
  • 字符串處理函數
  • 時間函數

 三、demo

代碼地址

import org.apache.flink.api.scala._
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{StreamQueryConfig, Table}

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Random

/**
  * @author xiandongxie
  */

// 商品類
case class Item(id: Int, name: String)

// 自定義實時數據源
class MyStreamSourceFunction extends SourceFunction[Item] {

  var isCancel: Boolean = false

  override def cancel(): Unit = {
    isCancel = true
  }

  override def run(ctx: SourceFunction.SourceContext[Item]): Unit = {
    while (!isCancel) {
      val item: Item = getItem()
      ctx.collect(item)
      Thread.sleep(1000)
    }
  }

  def getItem(): Item = {
    val random: Random = new Random()
    val id: Int = random.nextInt(100)
    val buffer: ArrayBuffer[String] = new ArrayBuffer[String]()
    buffer.append("HAT")
    buffer.append("TIE")
    buffer.append("SHOE")
    new Item(id, buffer(random.nextInt(3)))

  }

}


/**
  * 我們把實時的商品數據流進行分流,分成 even 和 odd 兩個流進行 JOIN,條件是名稱相同
  * 最后,把兩個流的 JOIN 結果輸出
  */
object StreamJoinDemo {
  def main(args: Array[String]): Unit = {
    val logPath: String = "/tmp/logs/flink_log"

    // 生成配置對象
    var conf: Configuration = new Configuration()
    // 開啟flink web UI
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    // 配置web UI的日志文件,否則打印日志到控制台
    conf.setString("web.log.path", logPath)
    // 配置taskManager的日志文件,否則打印到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取local運行環境
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //    val build: EnvironmentSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
    //    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, build)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv)

    val source: DataStream[Item] = streamEnv.addSource(new MyStreamSourceFunction)
    val split: SplitStream[Item] = source.split(f => {
      val out: ListBuffer[String] = new ListBuffer[String]
      val id: Int = f.id
      if (id % 2 == 0) {
        out.append("even")
      } else {
        out.append("odd")
      }
      out
    })

    val evenData: DataStream[Item] = split.select("even")
    val oddData: DataStream[Item] = split.select("odd")

    tableEnv.createTemporaryView("event_table", evenData)
    tableEnv.createTemporaryView("odd_table", oddData)

    val queryTable: Table = tableEnv.sqlQuery("select a.id,a.name,b.id,b.name from event_table as a join odd_table as b on a.name = b.name")
    queryTable.printSchema()
    tableEnv.toRetractStream[(Int, String, Int, String)](queryTable, new StreamQueryConfig())
      .print()

    
    streamEnv.execute()
  }
}

結果:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM