Apache Flink SQL


本篇核心目標是讓大家概要了解一個完整的 Apache Flink SQL Job 的組成部分,以及 Apache Flink SQL 所提供的核心算子的語義,最后會應用 TumbleWindow 編寫一個 End-to-End 的頁面訪問的統計示例。

1.Apache Flink SQL Job 的組成

 

我們做任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,當然基於 ApacheFlink SQL 編寫的計算 Job 也離不開這個三部分,如下所所示:

 

如上所示,一個完整的 Apache Flink SQL Job 由如下三部分:

  • Source Operator – Soruce operator 是對外部數據源的抽象, 目前 Apache Flink 內置了很多常用的數據源實現,比如上圖提到的 Kafka。
  • Query Operators – 查詢算子主要完成如圖的 Query Logic,目前支持了Union,Join,Projection,Difference, Intersection 以及 window 等大多數傳統數據庫支持的操作。
  • Sink Operator – Sink operator 是對外結果表的抽象,目前 Apache Flink 也內置了很多常用的結果表的抽象,比如上圖提到的 Kafka。

大數據Flink討論群

2.Apache Flink SQL 核心算子

 

SQL 是 StructuredQuevy Language 的縮寫,最初是由美國計算機科學家 Donald D. Chamberlin 和 Raymond F. Boyce 在 20 世紀 70 年代早期從 Early History of SQL 中了解關系模型后在 IBM 開發的。該版本最初稱為[SEQUEL: A Structured EnglishQuery Language](結構化英語查詢語言),旨在操縱和檢索存儲在 IBM 原始准關系數據庫管理系統 System R 中的數據。直到 1986 年, ANSI 和 ISO 標准組正式采用了標准的”數據庫語言 SQL”語言定義。Apache Flink SQL 核心算子的語義設計也參考了 1992 、2011 等 ANSI-SQL 標准。接下來我們將簡單為大家介紹 Apache Flink SQL 每一個算子的語義。

2.1 SELECT

 

SELECT 用於從數據集/流中選擇數據,語法遵循 ANSI-SQL 標准,語義是關系代數中的投影(Projection),對關系進行垂直分割,消去某些列。

一個使用 Select 的語句如下:

SELECT ColA, ColC FROME tab ;

 

2.2 WHERE

 

WHERE 用於從數據集/流中過濾數據,與 SELECT 一起使用,語法遵循 ANSI-SQL 標准,語義是關系代數的 Selection,根據某些條件對關系做水平分割,即選擇符合條件的記錄,如下所示:

 

 

對應的 SQL 語句如下:

SELECT * FROM tab WHERE ColA <> 'a2' ;

 

2.3 GROUP BY

GROUP BY 是對數據進行分組的操作,比如我需要分別計算一下一個學生表里面女生和男生的人數分別是多少,如下:

 

 

對應的 SQL 語句如下:

SELECT sex, COUNT(name) AS count FROM tab GROUP BY sex ;

 

2.4 UNION ALL

 

UNION ALL 將兩個表合並起來,要求兩個表的字段完全一致,包括字段類型、字段順序,語義對應關系代數的 Union,只是關系代數是 Set 集合操作,會有去重復操作,UNION ALL 不進行去重,如下所示:

 

 

對應的 SQL 語句如下:

SELECT * FROM T1 UNION ALL SELECT * FROM T2

 

2.5 UNION

 

UNION 將兩個流給合並起來,要求兩個流的字段完全一致,包括字段類型、字段順序,並其 UNION 不同於 UNION ALL,UNION 會對結果數據去重,與關系代數的 Union 語義一致,如下:

 

 

對應的 SQL 語句如下:

SELECT * FROM T1 UNION SELECT * FROM T2

 

2.6 JOIN

 

JOIN 用於把來自兩個表的行聯合起來形成一個寬表,Apache Flink 支持的 JOIN 類型:

  • JOIN – INNER JOIN
  • LEFT JOIN – LEFT OUTER JOIN
  • RIGHT JOIN – RIGHT OUTER JOIN
  • FULL JOIN – FULL OUTER JOIN

JOIN 與關系代數的 Join 語義相同,具體如下:

 

對應的 SQL 語句如下(INNERJOIN):

 

SELECT ColA, ColB, T2.ColC, ColE FROM TI JOIN T2 ON T1.ColC = T2.ColC ;

 

LEFT JOIN 與 INNERJOIN 的區別是當右表沒有與左邊相 JOIN 的數據時候,右邊對應的字段補 NULL 輸出,如下:

 

 

對應的 SQL 語句如下(LEFTJOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;

 

說明:

  • 細心的讀者可能發現上面 T2.ColC 是添加了前綴 T2 了,這里需要說明一下,當兩張表有字段名字一樣的時候,我需要指定是從那個表里面投影的。
  • RIGHT JOIN 相當於 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN 相當於 RIGHT JOIN 和 LEFT JOIN 之后進行 UNION ALL 操作。

2.7 Window

 

在 Apache Flink 中有 2 種類型的 Window,一種是 OverWindow,即傳統數據庫的標准開窗,每一個元素都對應一個窗口。一種是 GroupWindow,目前在SQL中 GroupWindow 都是基於時間進行窗口划分的。

2.7.1 OverWindow

 

OVER Window 目前支持由如下三個元素組合的 8 種類型:

  • 時間 – ProcessingTime 和 EventTime
  • 數據集 – Bounded 和 UnBounded
  • 划分方式 – ROWS 和 RANGE 我們以的Bounded ROWS 和 Bounded RANGE 兩種常用類型,想大家介紹 Over Window 的語義

 

  • Bounded ROWS Over Window

 

Bounded ROWS OVER Window 每一行元素都視為新的計算行,即,每一行都是一個新的窗口。

語法

 

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     ROWS 
     BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1
  • value_expression – 進行分區的字表達式;
  • timeCol – 用於元素排序的時間字段;
  • rowCount – 是定義根據當前行開始向前追溯幾行元素;

語義

我們以 3 個元素(2PRECEDING)的窗口為例,如下圖:

 

 

上圖所示窗口 user 1 的 w5 和 w6, user 2 的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,但是他們仍然是在不同的窗口,這一點有別於 RANGEOVER Window.

  • Bounded RANGE Over Window

 

Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個窗口;

語法

Bounded RANGE OVER Window 的語法如下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     RANGE 
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1

 

  • value_expression – 進行分區的字表達式;
  • timeCol – 用於元素排序的時間字段;
  • timeInterval – 是定義根據當前行開始向前追溯指定時間的元素行;

語義

我們以 3 秒中數據(INTERVAL‘2’ SECOND)的窗口為例,如下圖:

 

 

 

 

注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別於 ROWS OVER Window.

2.7.2 GroupWindow

 

根據窗口數據划分的不同,目前 Apache Flink 有如下 3 種 Bounded Winodw:

  • Tumble – 滾動窗口,窗口數據有固定的大小,窗口數據無疊加;
  • Hop – 滑動窗口,窗口數據有固定大小,並且有固定的窗口重建頻率,窗口數據有疊加;
  • Session – 會話窗口,窗口數據沒有固定的大小,根據窗口數據活躍程度划分窗口,窗口數據無疊加;

說明:Aapche Flink 還支持 UnBounded的 Group Window,也就是全局 Window,流上所有數據都在一個窗口里面,語義非常簡單,這里不做詳細介紹了。

GroupWindow 的語法如下:

SELECT 
    [gk], 
    agg1(col1),
     ... 
    aggN(colN)
FROM Tab1
GROUP BY [WINDOW(definition)], [gk]

 

[WINDOW(definition)] – 在具體窗口語義介紹中介紹。

  • Tumble Window

 

Tumble 滾動窗口有固定 size,窗口數據不重疊,具體語義如下:

 

假設我們要寫一個 2 分鍾大小的 Tumble,示例SQL如下:

SELECT gk, COUNT(*) AS pv 
  FROM tab 
    GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE), gk

 

  • Hop Window

 

Hop 滑動窗口和滾動窗口類似,窗口有固定的 size,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的新建頻率。因此當 slide 值小於窗口 size 的值的時候多個滑動窗口會重疊,具體語義如下:

 

假設我們要寫一個統計連續的兩個訪問用戶之間的訪問時間間隔不超過 3 分鍾的的頁面訪問量(PV).

SELECT gk, COUNT(*) AS pv 
  FROM tab 
    GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), gk

 

  • Session Window

 

Session 會話窗口 是沒有固定大小的窗口,通過 session 的活躍度分組元素。不同於滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話窗口 分配器通過配置 session gap 來指定非活躍周期的時長,具體語義如下:

 

 

 

假設我們要寫一個統計連續的兩個訪問用戶之間的訪問時間間隔不超過 3 分鍾的的頁面訪問量(PV).

 

SELECT gk, COUNT(*) AS pv  
  FROM pageAccessSession_tab
    GROUP BY SESSION(rowtime, INTERVAL '3' MINUTE), gk

 

說明:很多場景用戶需要獲得 Window 的開始和結束時間,上面的 GroupWindow的SQL 示例中沒有體現,那么窗口的開始和結束時間應該怎樣獲取呢? Apache Flink 我們提供了如下輔助函數:

  • TUMBLE_START/TUMBLE_END
  • HOP_START/HOP_END
  • SESSION_START/SESSION_END

這些輔助函數如何使用,請參考如下完整示例的使用方式。

3.完整的 SQL Job 案例

 

上面我們介紹了 Apache Flink SQL 核心算子的語法及語義,這部分將選取Bounded EventTime Tumble Window 為例為大家編寫一個完整的包括 Source 和 Sink 定義的 ApacheFlink SQL Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,用戶 ID 和訪問時間。我們需要按不同地域統計每 2 分鍾的淘寶首頁的訪問量(PV). 具體數據如下:

 

 

3.1 Source 定義

 

自定義 Apache Flink Stream Source 需要實現 StreamTableSource, StreamTableSource 中通過 StreamExecutionEnvironment 的 addSource 方法獲取 DataStream, 所以我們需要自定義一個 SourceFunction, 並且要支持產生 WaterMark,也就是要實現 DefinedRowtimeAttributes 接口。出於代碼篇幅問題,我們如下只介紹核心部分,完整代碼 請查看: EventTimeTumbleWindowDemo.scala

3.1.1 Source Function 定義

 

支持接收攜帶 EventTime 的數據集合,Either 的數據結構 Right 是 WaterMark,Left 是元數據:

class MySourceFunction[T](dataList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
      case Right(w) => ctx.emitWatermark(new Watermark(w)) // emit watermark
    }
  }
}

 

3.1.2 定義 StreamTableSource

 

我們自定義的 Source 要攜帶我們測試的數據,以及對應的 WaterMark 數據,具體如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  // 頁面訪問表數據 rows with timestamps and watermarks
  val data = Seq(
     // Data
     Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
     // Watermark
     Right(1510365660000L),
    ..
  )
  
  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)
  
  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    // 添加數據源實現
    execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
  }
  ...
}

 

3.4 Sink 定義

 

我們簡單的將計算結果寫入到 Apache Flink 內置支持的 CSVSink 中,定義 Sink 如下:

def getCsvTableSink: TableSink[Row] = {
    val tempFile = ...
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }

 

3.5 構建主程序

 

主程序包括執行環境的定義,Source / Sink 的注冊以及統計查 SQL 的執行,具體如下:

def main(args: Array[String]): Unit = {
    // Streaming 環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 設置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便我們查出輸出數據
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 創建自定義source數據結構
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 創建CSV sink 數據結構
    val tableSink = getCsvTableSink

    // 注冊source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 注冊sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val sql =
      "SELECT  " +
      "  region, " +
      "  TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
      "  TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
      " FROM mySource " +
      " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"

    tEnv.sqlQuery(sql).insertInto(sinkTableName);
    env.execute()
  }

3.6 執行並查看運行結果

 

執行主程序后我們會在控制台得到 Sink 的文件路徑,如下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

 

Cat 方式查看計算結果,如下:

jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1

4.小結

 

本篇概要的介紹了 Apache Flink SQL 的所有核心算子,並以一個 End-to-End 的示例展示了如何編寫 Apache Flink SQL 的 Job . 希望對大家有所幫助。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM